From 8345026736ac255cc31169a73a9c4394f3981fbe Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 17 Mar 2023 05:36:02 -0400 Subject: [PATCH] *Rewrite to use new IPFS proxy protocol over relay network --- build.js | 3 +- package.json | 89 ++++++-- src-build/build.ts | 434 +++++++++++++++------------------------- src/constants.ts | 8 + src/index.ts | 416 +++++++++++++++----------------------- src/libp2p/transport.ts | 211 +++++++++++++++++++ src/net.ts | 3 + src/peerManager.ts | 315 +++++++++++++++++++++++++++++ src/socket.ts | 158 +++++++++++++++ src/types.ts | 40 ++++ timers.js | 8 + 11 files changed, 1138 insertions(+), 547 deletions(-) create mode 100644 src/constants.ts create mode 100644 src/libp2p/transport.ts create mode 100644 src/net.ts create mode 100644 src/peerManager.ts create mode 100644 src/socket.ts create mode 100644 src/types.ts create mode 100644 timers.js diff --git a/build.js b/build.js index e2353b2..d0f4f54 100644 --- a/build.js +++ b/build.js @@ -9,5 +9,6 @@ esbuild.buildSync({ // minify: true define: { global: "self", - } + }, + inject:['timers.js'] }); diff --git a/package.json b/package.json index 5733aee..771ffb6 100644 --- a/package.json +++ b/package.json @@ -9,30 +9,89 @@ "build": "npm run compile && node ./dist-build/build.mjs dev" }, "dependencies": { - "@lumeweb/kernel-dht-client": "https://github.com/LumeWeb/kernel-dht-client.git", - "@lumeweb/kernel-rpc-client": "https://github.com/LumeWeb/kernel-rpc-client.git", - "@lumeweb/kernel-utils": "https://github.com/LumeWeb/kernel-utils.git", + "@chainsafe/libp2p-noise": "^11.0.1", + "@chainsafe/libp2p-yamux": "^3.0.7", + "@dao-xyz/libp2p-noise": "^11.1.3", + "@helia/unixfs": "^1.1.0", + "@libp2p/crypto": "^1.0.14", + "@libp2p/delegated-content-routing": "^4.0.1", + "@libp2p/delegated-peer-routing": "^4.0.1", + "@libp2p/interface-metrics": "^4.0.5", + "@libp2p/interfaces": "^3.3.1", + "@libp2p/logger": "^2.0.6", + "@libp2p/mplex": "^7.1.1", + "@libp2p/tcp": "^6.1.2", + "@libp2p/utils": "^3.0.4", + "@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git", + "@multiformats/mafmt": "^11.1.0", + "b4a": "^1.6.2", + "blockstore-core": "^3.0.0", "buffer": "^6.0.3", - "is-ipfs": "^6.0.2", - "libkmodule": "^0.2.12", - "libskynet": "^0.0.62", - "p-queue": "^7.3.0", + "compact-encoding": "^2.11.0", + "datastore-core": "^8.0.4", + "helia": "^0.0.1", + "ipfs-http-client": "^60.0.0", + "ipfs-in-memory-repo": "^0.1.0", + "libkmodule": "^0.2.53", + "libp2p": "^0.42.2", + "multiformats": "^11.0.2", + "p-queue": "^7.3.4", + "private-ip": "^3.0.0", + "rewire": "^6.0.0", + "serialize-error": "^11.0.0", + "sodium-universal": "^4.0.0", + "streamx": "^2.13.2", "timers-browserify": "^2.0.12" }, "devDependencies": { - "@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git", + "@helia/interface": "^0.0.0", + "@libp2p/interface-connection": "^3.1.0", + "@libp2p/interface-peer-info": "^1.0.8", + "@libp2p/interface-transport": "^2.1.1", + "@libp2p/kad-dht": "^7.0.3", + "@libp2p/peer-id": "^2.0.2", + "@libp2p/peer-id-factory": "^2.0.2", + "@libp2p/websockets": "^5.0.5", + "@lumeweb/kernel-dht-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git", + "@lumeweb/kernel-rpc-client": "git+https://git.lumeweb.com/LumeWeb/kernel-rpc-client.git", + "@lumeweb/libhyperproxy": "git+https://git.lumeweb.com/LumeWeb/libhyperproxy.git", + "@multiformats/multiaddr": "^11.6.1", + "@scure/bip39": "^1.1.1", + "@skynetlabs/skynet-nodejs": "^2.9.0", + "@types/b4a": "^1.6.0", "@types/events": "^3.0.0", - "@types/node": "^18.0.3", + "@types/node": "^18.15.3", "@types/read": "^0.0.29", - "esbuild": "^0.14.49", - "libskynetnode": "^0.1.3", - "prettier": "^2.7.1", + "@types/rewire": "^2.5.28", + "@types/streamx": "^2.9.1", + "@types/ws": "^8.5.4", + "cli-progress": "^3.12.0", + "esbuild": "^0.14.54", + "http-browserify": "^1.7.0", + "hyperswarm": "^4.3.7", + "interface-store": "^3.0.4", + "it-all": "^2.0.1", + "it-ws": "^5.0.6", + "libskynet": "^0.1.9", + "libskynetnode": "^0.1.4", + "net-browserify": "^0.2.4", + "os-browserify": "^0.3.0", + "p-timeout": "^6.1.1", + "path-browserify": "^1.0.1", + "prettier": "^2.8.4", + "prom-client": "^14.2.0", "read": "^1.0.7", - "tslib": "^2.4.0", - "typescript": "^4.7.4" + "stream-browserify": "^3.0.0", + "tslib": "^2.5.0", + "typescript": "^4.9.5", + "url-browser": "^0.1.7", + "util-browser": "^0.0.2" }, "browser": { "libkernel": false, - "timers": "timers-browserify" + "timers": "timers-browserify", + "path": "path-browserify", + "os": "os-browserify", + "net": "./src/net.ts" } } diff --git a/src-build/build.ts b/src-build/build.ts index 1ca3cad..c0833e3 100644 --- a/src-build/build.ts +++ b/src-build/build.ts @@ -1,100 +1,50 @@ // This is the standard build script for a kernel module. -import * as fs from "fs" -import { - addContextToErr, - b64ToBuf, - bufToHex, - deriveRegistryEntryID, - entryIDToSkylink, - generateSeedPhraseDeterministic, - seedPhraseToSeed, - sha512, - taggedRegistryEntryKeys, -} from "libskynet" -import { generateSeedPhraseRandom, overwriteRegistryEntry, upload } from "libskynetnode" -import read from "read" +import * as fs from "fs"; +import read from "read"; +import * as bip39 from "@scure/bip39"; +import { wordlist } from "@scure/bip39/wordlists/english.js"; +//@ts-ignore +import { SkynetClient } from "@skynetlabs/skynet-nodejs"; // Helper variables to make it easier to return empty values alongside errors. -const nu8 = new Uint8Array(0) +const nu8 = new Uint8Array(0); const nkp = { - publicKey: nu8, - secretKey: nu8, -} + publicKey: nu8, + secretKey: nu8, +}; // readFile is a wrapper for fs.readFileSync that handles the try-catch for the // caller. function readFile(fileName: string): [string, string | null] { - try { - let data = fs.readFileSync(fileName, "utf8") - return [data, null] - } catch (err) { - return ["", "unable to read file: " + JSON.stringify(err)] - } + try { + let data = fs.readFileSync(fileName, "utf8"); + return [data, null]; + } catch (err) { + return ["", "unable to read file: " + JSON.stringify(err)]; + } } // readFileBinary is a wrapper for fs.readFileSync that handles the try-catch // for the caller. function readFileBinary(fileName: string): [Uint8Array, string | null] { - try { - let data = fs.readFileSync(fileName, null) - return [data, null] - } catch (err) { - return [nu8, "unable to read file: " + JSON.stringify(err)] - } + try { + let data = fs.readFileSync(fileName, null); + return [data, null]; + } catch (err) { + return [nu8, "unable to read file: " + JSON.stringify(err)]; + } } // writeFile is a wrapper for fs.writeFileSync which handles the try-catch in a // non-exception way. function writeFile(fileName: string, fileData: string): string | null { - try { - fs.writeFileSync(fileName, fileData) - return null - } catch (err) { - return "unable to write file: " + JSON.stringify(err) - } -} - -// hardenedSeedPhrase will take a password, harden it with 100,000 iterations -// of hashing, and then turn it into a seed phrase. -function hardenedSeedPhrase(password: string): [string, string | null] { - let pw = password - // Add some hashing iterations to the password to make it stronger. - for (let i = 0; i < 1000000; i++) { - let passU8 = new TextEncoder().encode(password) - let hashIter = sha512(passU8) - password = bufToHex(hashIter) - } - return generateSeedPhraseDeterministic(password) -} - -// seedPhraseToRegistryKeys will convert a seed phrase to the set of registry -// keys that govern the registry entry where the module is published. -function seedPhraseToRegistryKeys(seedPhrase: string): [any, Uint8Array, string | null] { - let [seed, errVSP] = seedPhraseToSeed(seedPhrase) - if (errVSP !== null) { - return [nkp, nu8, addContextToErr(errVSP, "unable to compute seed phrase")] - } - let [keypair, datakey, errTREK] = taggedRegistryEntryKeys(seed, "module-build", "module-key") - if (errTREK !== null) { - return [nkp, nu8, addContextToErr(errTREK, "unable to compute registry entry keys")] - } - return [keypair, datakey, null] -} - -// seedPhraseToRegistryLink will take a seedPhrase as input and convert it to -// the registry link for the module. -function seedPhraseToRegistryLink(seedPhrase: string): [string, string | null] { - let [keypair, datakey, errSPTRK] = seedPhraseToRegistryKeys(seedPhrase) - if (errSPTRK !== null) { - return ["", addContextToErr(errSPTRK, "unable to compute registry keys")] - } - let [entryID, errDREID] = deriveRegistryEntryID(keypair.publicKey, datakey) - if (errDREID !== null) { - return ["", addContextToErr(errDREID, "unable to compute registry entry id")] - } - let registryLink = entryIDToSkylink(entryID) - return [registryLink, null] + try { + fs.writeFileSync(fileName, fileData); + return null; + } catch (err) { + return "unable to write file: " + JSON.stringify(err); + } } // handlePass handles all portions of the script that occur after the password @@ -103,238 +53,166 @@ function seedPhraseToRegistryLink(seedPhrase: string): [string, string | null] { // password reader is async and we can only access the password when using a // callback. function handlePass(password: string) { - try { - // If we are running prod and the seed file does not exist, we - // need to confirm the password and also warn the user to use a - // secure password. - if (!fs.existsSync(seedFile) && process.argv[2] === "prod") { - // The file does not exist, we need to confirm the - // password. - console.log() - console.log("No production entry found for module. Creating new production module...") - console.log("If someone can guess the password, they can push arbitrary changes to your module.") - console.log("Please use a secure password.") - console.log() - read({ prompt: "Confirm Password: ", silent: true }, function (err: any, confirmPassword: string) { - if (err) { - console.error("unable to fetch password:", err) - process.exit(1) - } - if (password !== confirmPassword) { - console.error("passwords do not match") - process.exit(1) - } - password = password + moduleSalt - handlePassConfirm(password) - }) - } else { - // If the seed file does exist, or if we are using dev, - // there's no need to confirm the password but we do - // need to pass the logic off to the handlePassConfirm - // callback. - password = password + moduleSalt - handlePassConfirm(password) - } - } catch (err) { - console.error("Unable to read seedFile:", err) - process.exit(1) - } + try { + // If we are running prod and the seed file does not exist, we + // need to confirm the password and also warn the user to use a + // secure password. + if (!fs.existsSync(seedFile) && process.argv[2] === "prod") { + // The file does not exist, we need to confirm the + // password. + console.log(); + console.log( + "No production entry found for module. Creating new production module..." + ); + console.log( + "If someone can guess the password, they can push arbitrary changes to your module." + ); + console.log("Please use a secure password."); + console.log(); + read( + { prompt: "Confirm Password: ", silent: true }, + function (err: any, confirmPassword: string) { + if (err) { + console.error("unable to fetch password:", err); + process.exit(1); + } + if (password !== confirmPassword) { + console.error("passwords do not match"); + process.exit(1); + } + handlePassConfirm(moduleSalt, password); + } + ); + } else { + // If the seed file does exist, or if we are using dev, + // there's no need to confirm the password but we do + // need to pass the logic off to the handlePassConfirm + // callback. + handlePassConfirm(moduleSalt, password); + } + } catch (err) { + console.error("Unable to read seedFile:", err); + process.exit(1); + } } // handlePassConfirm handles the full script after the confirmation password // has been provided. If not confirmation password is needed, this function // will be called anyway using the unconfirmed password as input. -function handlePassConfirm(password: string) { - // Create the seedFile if it does not exist. For dev we just save the - // seed to disk outright, because this is a dev build and therefore not - // security sensitive. Also the dev seed does not get pushed to the - // github repo. - // - // For prod, we use the seed to create a new seed (called the shield) - // which allows us to verify that the developer has provided the right - // password when deploying the module. The shield does get pushed to - // the github repo so that the production module is the same on all - // devices. - if (!fs.existsSync(seedFile) && process.argv[2] !== "prod") { - // Generate the seed phrase and write it to the file. - let [seedPhrase, errGSP] = generateSeedPhraseRandom() - if (errGSP !== null) { - console.error("Unable to generate seed phrase:", errGSP) - process.exit(1) - } - let errWF = writeFile(seedFile, seedPhrase) - if (errWF !== null) { - console.error("unable to write file:", errWF) - process.exit(1) - } - } else if (!fs.existsSync(seedFile) && process.argv[2] === "prod") { - // Generate the seed phrase. - let [seedPhrase, errGSP] = hardenedSeedPhrase(password) - if (errGSP !== null) { - console.error("Unable to generate seed phrase:", errGSP) - process.exit(1) - } - let [registryLink, errSPTRL] = seedPhraseToRegistryLink(seedPhrase) - if (errSPTRL !== null) { - console.error("Unable to generate registry link:", errSPTRL) - process.exit(1) - } +function handlePassConfirm(seed: string, password: string) { + // Create the seedFile if it does not exist. For dev we just save the + // seed to disk outright, because this is a dev build and therefore not + // security sensitive. Also the dev seed does not get pushed to the + // github repo. + // + // For prod, we use the seed to create a new seed (called the shield) + // which allows us to verify that the developer has provided the right + // password when deploying the module. The shield does get pushed to + // the github repo so that the production module is the same on all + // devices. + if (!fs.existsSync(seedFile) && process.argv[2] !== "prod") { + // Generate the seed phrase and write it to the file. + let seedPhrase = bip39.generateMnemonic(wordlist); + let errWF = writeFile(seedFile, seedPhrase); + if (errWF !== null) { + console.error("unable to write file:", errWF); + process.exit(1); + } + } else if (!fs.existsSync(seedFile) && process.argv[2] === "prod") { + // Generate the seed phrase. + let seedPhrase = bip39.generateMnemonic(wordlist); + // Write the registry link to the file. + } - // Write the registry link to the file. - let errWF = writeFile(seedFile, registryLink) - if (errWF !== null) { - console.error("unable to write registry link file:", errWF) - process.exit(1) - } - } + // Load or verify the seed. If this is prod, the password is used to + // create and verify the seed. If this is dev, we just load the seed + // with no password. + let seedPhrase: string; + let registryLink: string; + if (process.argv[2] === "prod") { + // Generate the seed phrase from the password. + seedPhrase = bip39.generateMnemonic(wordlist); + } else { + let [sp, errRF] = readFile(seedFile); + if (errRF !== null) { + console.error("unable to read seed phrase for dev command from disk"); + process.exit(1); + } + seedPhrase = sp; + } - // Load or verify the seed. If this is prod, the password is used to - // create and verify the seed. If this is dev, we just load the seed - // with no password. - let seedPhrase: string - let registryLink: string - if (process.argv[2] === "prod") { - // Generate the seed phrase from the password. - let [sp, errGSP] = hardenedSeedPhrase(password) - if (errGSP !== null) { - console.error("Unable to generate seed phrase: ", errGSP) - process.exit(1) - } - let [rl, errSPTRL] = seedPhraseToRegistryLink(sp) - registryLink = rl - if (errSPTRL !== null) { - console.error("Unable to generate registry link:", errSPTRL) - process.exit(1) - } - let [registryLinkVerify, errRF] = readFile(seedFile) - if (errRF !== null) { - console.error("unable to read seedFile") - process.exit(1) - } - registryLinkVerify = registryLinkVerify.replace(/\n$/, "") - if (registryLink !== registryLinkVerify) { - console.error("Incorrect password") - process.exit(1) - } - seedPhrase = sp - } else { - let [sp, errRF] = readFile(seedFile) - if (errRF !== null) { - console.error("unable to read seed phrase for dev command from disk") - process.exit(1) - } - let [rl, errSPTRL] = seedPhraseToRegistryLink(sp) - registryLink = rl - if (errSPTRL !== null) { - console.error("Unable to generate registry link:", errSPTRL) - process.exit(1) - } - // Write the registry link to the module skylink dev file. - let errWF = writeFile("build/module-skylink-dev", registryLink) - if (errWF !== null) { - console.error("unable to write registry link file:", errWF) - process.exit(1) - } - seedPhrase = sp - } - - // Upload the module to Skynet. - let [distFile, errRF] = readFileBinary("dist/index.js") - if (errRF !== null) { - console.error("unable to read dist file for module") - process.exit(1) - } - let metadata = { - Filename: "index.js", - } - console.log("Uploading module...") - upload(distFile, metadata) - .then((result) => { - console.log("Updating module's registry entry...") - // Update the v2 skylink. - let [keypair, datakey, errSPTRK] = seedPhraseToRegistryKeys(seedPhrase) - if (errSPTRK !== null) { - return ["", addContextToErr(errSPTRK, "unable to compute registry keys")] - } - let [bufLink, errBTB] = b64ToBuf(result) - if (errBTB !== null) { - return ["", addContextToErr(errBTB, "unable to decode skylink")] - } - overwriteRegistryEntry(keypair, datakey, bufLink) - .then(() => { - console.log("registry entry is updated") - console.log("Immutable Link for Module:", result) - console.log("Resolver Link for Module:", registryLink) - }) - .catch((err: any) => { - console.log("unable to update registry entry:", err) - }) - }) - .catch((err) => { - console.error("unable to upload file", err) - process.exit(1) - }) + let metadata = { + Filename: "index.js", + }; + const client = new SkynetClient("https://web3portal.com"); + client + .uploadFile("dist/index.js") + .then((result: any) => { + console.log("Immutable Link for module:", result); + }) + .catch((err: any) => { + console.error("unable to upload file", err); + process.exit(1); + }); } // Add a newline for readability. -console.log() +console.log(); // Check for a 'dev' or 'prod' input to the script. if (process.argv.length !== 3) { - console.error("need to provide either 'dev' or 'prod' as an input") - process.exit(1) + console.error("need to provide either 'dev' or 'prod' as an input"); + process.exit(1); } // Create the build folder if it does not exist. if (!fs.existsSync("build")) { - fs.mkdirSync("build") + fs.mkdirSync("build"); } // Determine the seed file. -let seedFile: string +let seedFile: string; if (process.argv[2] === "prod") { - seedFile = "module-skylink" + seedFile = "module-skylink"; } else if (process.argv[2] === "dev") { - seedFile = "build/dev-seed" + seedFile = "build/dev-seed"; } else { - console.error("need to provide either 'dev' or 'prod' as an input") - process.exit(1) + console.error("need to provide either 'dev' or 'prod' as an input"); + process.exit(1); } // If doing a prod deployment, check whether the salt file exists. If it does // not, create it. -let moduleSalt: string +let moduleSalt: string; if (!fs.existsSync(".module-salt")) { - let [ms, errGSPR] = generateSeedPhraseRandom() - if (errGSPR !== null) { - console.error("unable to generate module salt:", errGSPR) - process.exit(1) - } - moduleSalt = ms - let errWF = writeFile(".module-salt", moduleSalt) - if (errWF !== null) { - console.error("unable to write module salt file:", errWF) - process.exit(1) - } + moduleSalt = bip39.generateMnemonic(wordlist); + let errWF = writeFile(".module-salt", moduleSalt); + if (errWF !== null) { + console.error("unable to write module salt file:", errWF); + process.exit(1); + } } else { - let [ms, errRF] = readFile(".module-salt") - if (errRF !== null) { - console.error("unable to read moduleSalt") - process.exit(1) - } - ms = ms.replace(/\n$/, "") - moduleSalt = ms + let [ms, errRF] = readFile(".module-salt"); + if (errRF !== null) { + console.error("unable to read moduleSalt"); + process.exit(1); + } + ms = ms.replace(/\n$/, ""); + moduleSalt = ms; } // Need to get a password if this is a prod build. if (process.argv[2] === "prod") { - read({ prompt: "Password: ", silent: true }, function (err: any, password: string) { - if (err) { - console.error("unable to fetch password:", err) - process.exit(1) - } - handlePass(password) - }) + read( + { prompt: "Password: ", silent: true }, + function (err: any, password: string) { + if (err) { + console.error("unable to fetch password:", err); + process.exit(1); + } + handlePass(password); + } + ); } else { - handlePass("") + handlePass(""); } diff --git a/src/constants.ts b/src/constants.ts new file mode 100644 index 0000000..bb814ac --- /dev/null +++ b/src/constants.ts @@ -0,0 +1,8 @@ +export const PROTOCOL = "lumeweb.proxy.ipfs"; + +export const DELEGATE_LIST = [ + "/dns4/node0.delegate.ipfs.io/tcp/443/https", + "/dns4/node1.delegate.ipfs.io/tcp/443/https", + "/dns4/node2.delegate.ipfs.io/tcp/443/https", + "/dns4/node3.delegate.ipfs.io/tcp/443/https", +]; diff --git a/src/index.ts b/src/index.ts index 3d002c3..e5e3dc7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,278 +1,188 @@ -import { addHandler, handleMessage } from "libkmodule"; -import type { ActiveQuery } from "libkmodule"; -import PQueue from "p-queue"; -import { ipfsPath, ipnsPath } from "is-ipfs"; -import { DataFn } from "libskynet"; -import { - RpcNetwork, - SimpleRpcQuery, - StreamingRpcQuery, -} from "@lumeweb/kernel-rpc-client"; -import { RPCResponse } from "@lumeweb/relay-types"; - -interface StatFileResponse { - exists: boolean; - contentType: string | null; - error: any; - directory: boolean; - files: string[]; -} -interface PingRPCResponse extends RPCResponse { - data?: "pong"; -} - -interface MethodsRPCResponse extends RPCResponse { - data?: string[]; -} +import { createLibp2p } from "libp2p"; +import { MemoryDatastore } from "datastore-core"; +import { MemoryBlockstore } from "blockstore-core"; +import { createHelia } from "helia"; +import { yamux } from "@chainsafe/libp2p-yamux"; +// @ts-ignore +import Hyperswarm from "hyperswarm"; +import { Peer, Proxy } from "@lumeweb/libhyperproxy"; +// @ts-ignore +import sodium from "sodium-universal"; +// @ts-ignore +import { CustomEvent } from "@libp2p/interfaces/events"; +// @ts-ignore +import { fixed32, raw } from "compact-encoding"; +import { mplex } from "@libp2p/mplex"; +import PeerManager from "./peerManager.js"; +import { hypercoreTransport } from "./libp2p/transport.js"; +import { UnixFS, unixfs } from "@helia/unixfs"; +// @ts-ignore +import { delegatedPeerRouting } from "@libp2p/delegated-peer-routing"; +import { noise } from "@chainsafe/libp2p-noise"; +import { create as createIpfsHttpClient } from "ipfs-http-client"; +import { delegatedContentRouting } from "@libp2p/delegated-content-routing"; +import type { Options } from "ipfs-core"; +import { multiaddr } from "@multiformats/multiaddr"; +import { DELEGATE_LIST, PROTOCOL } from "./constants.js"; +import { ActiveQuery, addHandler, handleMessage } from "libkmodule"; +import { createClient } from "@lumeweb/kernel-swarm-client"; onmessage = handleMessage; -let blockingGatewayUpdate = Promise.resolve(); - -let activeRelays: string | any[] = []; -let relays = [ - "25c2a0a833782d64213c08879b95dd5a60af244b44a058f3a7a70d6722f4bda7", -]; - -let network: RpcNetwork; - -addHandler("presentSeed", handlePresentSeed); -addHandler("refreshGatewayList", handleRefreshGatewayList); -addHandler("statIpfs", handleStatIpfs); -addHandler("fetchIpfs", handleFetchIpfs, { receiveUpdates: true }); -addHandler("statIpns", handleStatIpns); -addHandler("fetchIpns", handleFetchIpns, { receiveUpdates: true }); - -let readyPromiseResolve: any; -let readyPromise = new Promise((resolve) => { - readyPromiseResolve = resolve; +let moduleLoadedResolve: Function; +let moduleLoaded: Promise = new Promise((resolve) => { + moduleLoadedResolve = resolve; }); -async function handlePresentSeed() { - network = new RpcNetwork(false); - for (const relay of relays) { - await network.addRelay(relay); - } +let swarm; +let proxy: Proxy; +let fs: UnixFS; - refreshGatewayList(); - readyPromiseResolve(); +// @ts-ignore +BigInt.prototype.toJSON = function () { + return this.toString(); +}; + +addHandler("presentSeed", handlePresentSeed); +addHandler("stat", handleStat); +addHandler("ls", handleLs, { receiveUpdates: true }); +addHandler("cat", handleCat, { receiveUpdates: true }); + +async function handlePresentSeed() { + swarm = createClient(); + + const client = createIpfsHttpClient(getDelegateConfig()); + + PeerManager.instance.ipfs = await createHelia({ + blockstore: new MemoryBlockstore(), + datastore: new MemoryDatastore(), + libp2p: await createLibp2p({ + transports: [hypercoreTransport({ peerManager: PeerManager.instance })], + connectionEncryption: [noise()], + streamMuxers: [yamux(), mplex()], + start: false, + contentRouters: [delegatedContentRouting(client)], + peerRouters: [delegatedPeerRouting(client)], + relay: { + enabled: false, + }, + }), + }); + + proxy = new Proxy({ + swarm, + listen: true, + protocol: PROTOCOL, + autostart: true, + emulateWebsocket: true, + createDefaultMessage: false, + onchannel(peer: Peer, channel: any) { + PeerManager.instance.handleNewPeerChannel(peer, channel); + }, + onopen() { + PeerManager.instance.handleNewPeer(); + }, + onclose(peer: Peer) { + PeerManager.instance.handleClosePeer(peer); + }, + }); + + swarm.join(PROTOCOL); + await swarm.start(); + // @ts-ignore + fs = unixfs(PeerManager.instance.ipfs); + moduleLoadedResolve(); } -async function handleRefreshGatewayList(aq: ActiveQuery) { - await readyPromise; - await blockingGatewayUpdate; - await refreshGatewayList(); +async function handleStat(aq: ActiveQuery) { + await moduleLoaded; + + if (!("cid" in aq.callerInput)) { + aq.reject("cid required"); + return; + } + + let aborted = false; + + aq.setReceiveUpdate?.(() => { + aborted = true; + }); + + try { + aq.respond( + JSON.parse( + JSON.stringify( + await fs.stat(aq.callerInput.cid, aq.callerInput.options ?? {}) + ) + ) + ); + } catch (e) { + aq.reject((e as Error).message); + } +} + +async function handleLs(aq: ActiveQuery) { + await moduleLoaded; + if (!("cid" in aq.callerInput)) { + aq.reject("cid required"); + return; + } + + let aborted = false; + + aq.setReceiveUpdate?.(() => { + aborted = true; + }); + + const iterable = fs.ls(aq.callerInput.cid, aq.callerInput.options ?? {}); + + for await (const item of iterable) { + if (aborted) { + break; + } + aq.sendUpdate(JSON.parse(JSON.stringify(item))); + } + aq.respond(); } -async function handleStatIpfs(aq: ActiveQuery) { - return handleStat(aq, "stat_ipfs", "ipfs"); -} +async function handleCat(aq: ActiveQuery) { + await moduleLoaded; -async function handleFetchIpfs(aq: ActiveQuery) { - return handleFetch(aq, "fetch_ipfs", "ipfs"); -} - -async function handleStatIpns(aq: ActiveQuery) { - return handleStat(aq, "stat_ipns", "ipns"); -} - -async function handleFetchIpns(aq: ActiveQuery) { - return handleFetch(aq, "fetch_ipns", "ipns"); -} - -async function validateInputs(aq: ActiveQuery, type: "ipns" | "ipfs") { - const { hash = null } = aq.callerInput; - const { path = "" } = aq.callerInput; - if (!hash) { - aq.reject("hash missing"); + if (!("cid" in aq.callerInput)) { + aq.reject("cid required"); return; } - if (type === "ipfs" && !ipfsPath(`/ipfs/${hash}`)) { - aq.reject("ipfs hash is invalid"); - return; - } + let aborted = false; - if (type === "ipns" && !ipnsPath(`/ipns/${hash}`)) { - aq.reject("ipns hash is invalid"); - return; - } - await readyPromise; - await blockingGatewayUpdate; - - return { hash, path }; -} - -async function handleStat( - aq: ActiveQuery, - method: string, - type: "ipns" | "ipfs" -): Promise { - const valid = await validateInputs(aq, type); - if (!valid) { - return; - } - const { hash, path } = valid; - try { - let resp = (await fetchFromRelays(hash, path, method)) as StatFileResponse; - aq.respond(resp); - } catch (e: any) { - aq.reject(e); - } -} - -async function handleFetch( - aq: ActiveQuery, - method: string, - type: "ipns" | "ipfs" -): Promise { - const valid = await validateInputs(aq, type); - if (!valid) { - return; - } - const { hash, path } = valid; - - try { - await fetchFromRelays( - hash, - path, - method, - aq.sendUpdate, - aq.setReceiveUpdate - ); - aq.respond(); - } catch (e: any) { - aq.reject(e); - } -} - -async function fetchFromRelays( - hash: string, - path: string, - method: string, - stream: DataFn | undefined = undefined, - receiveUpdate: ((receiveUpdate: DataFn) => void) | undefined = undefined -) { - let error = new Error("NOT_FOUND"); - if (0 == activeRelays.length) { - await refreshGatewayList(); - } - for (const relay of activeRelays) { - let query: any; - if (stream) { - query = network.streamingQuery( - relay, - method, - "ipfs", - stream, - { - hash, - path, - }, - { queryTimeout: 30, relayTimeout: 30 } - ); - receiveUpdate?.((message) => { - if (message && message.cancel) { - query.cancel(); - } - }); - } else { - query = network.simpleQuery( - relay, - method, - "ipfs", - { - hash, - path, - }, - { queryTimeout: 30, relayTimeout: 30 } - ); - } - let resp = await query.result; - if (resp.error) { - throw new Error(resp.error); - } - - return !!stream ? null : resp.data; - } - - throw error; -} - -async function relayHasMethods( - methodList: string[], - relay: string -): Promise { - let methods: string[] = []; - let query = network.simpleQuery(relay, "get_methods", "core"); - - let resp = (await query.result) as MethodsRPCResponse; - - if (resp.data) { - methods = resp.data; - } - - let has = true; - - methodList.forEach((item) => { - if (!methods.includes(item)) { - has = false; - } + aq.setReceiveUpdate?.(() => { + aborted = true; }); - return has; -} -async function refreshGatewayList() { - let processResolve: any; - blockingGatewayUpdate = new Promise((resolve) => { - processResolve = resolve; - }); - const queue = new PQueue({ concurrency: 10 }); + const iterable = fs.cat(aq.callerInput.cid, aq.callerInput.options ?? {}); - let latencies: any[] = []; - - relays.forEach((item) => { - queue.add(checkRelayLatency(item, latencies)); - }); - - await queue.onIdle(); - - activeRelays = latencies - .sort((a: any[], b: any[]) => { - return a[0] - b[0]; - }) - .map((item: any[]) => item[1]); - processResolve(); -} -function checkRelayLatency(relay: string, list: any[]) { - return async () => { - const start = Date.now(); - - let query = network.simpleQuery(relay, "ping", "core"); - - let resp = (await query.result) as PingRPCResponse; - - if (resp?.data !== "pong") { - return; + for await (const chunk of iterable) { + if (aborted) { + break; } - const end = Date.now() - start; + aq.sendUpdate(chunk); + } - if ( - !(await relayHasMethods( - [ - "ipfs.stat_ipfs", - "ipfs.stat_ipns", - "ipfs.fetch_ipfs", - "ipfs.fetch_ipns", - ], - relay - )) - ) { - return; - } + aq.respond(); +} - list.push([end, relay]); +function getDelegateConfig(): Options { + const delegateString = + DELEGATE_LIST[Math.floor(Math.random() * DELEGATE_LIST.length)]; + const delegateAddr = multiaddr(delegateString).toOptions(); + + return { + // @ts-ignore + host: delegateAddr.host, + // @ts-ignore + protocol: parseInt(delegateAddr.port) === 443 ? "https" : "http", + port: delegateAddr.port, }; } diff --git a/src/libp2p/transport.ts b/src/libp2p/transport.ts new file mode 100644 index 0000000..7a66a33 --- /dev/null +++ b/src/libp2p/transport.ts @@ -0,0 +1,211 @@ +import { symbol } from "@libp2p/interface-transport"; +// @ts-ignore +import { TCP, TCPComponents, TCPDialOptions, TCPOptions } from "@libp2p/tcp"; +import PeerManager from "../peerManager.js"; +import { Multiaddr } from "@multiformats/multiaddr"; +import { IpcSocketConnectOpts, TcpSocketConnectOpts } from "net"; +import { logger } from "@libp2p/logger"; +import { AbortError, CodeError } from "@libp2p/interfaces/errors"; +// @ts-ignore +import { multiaddrToNetConfig } from "@libp2p/tcp/utils"; +import { Socket } from "../socket.js"; +import { Connection } from "@libp2p/interface-connection"; +// @ts-ignore +import { toMultiaddrConnection } from "@libp2p/tcp/socket-to-conn"; +import * as mafmt from "@multiformats/mafmt"; + +const log = logger("libp2p:hypercore"); + +import isPrivateIp from "private-ip"; + +const CODE_P2P = 421; +const CODE_CIRCUIT = 290; +const CODE_UNIX = 400; + +export interface HypercoreOptions extends TCPOptions { + peerManager?: PeerManager; +} +class HypercoreTransport extends TCP { + private readonly opts?: HypercoreOptions; + private metrics: any; + constructor(components: TCPComponents, options: HypercoreOptions = {}) { + super(components, options); + this.opts = options; + if (!options.peerManager) { + throw new Error("options.peerManager is required"); + } + this.opts?.peerManager; + } + + get [symbol](): true { + return true; + } + + get [Symbol.toStringTag](): string { + return "@libp2p/hypercore"; + } + + async dial(ma: Multiaddr, options: TCPDialOptions): Promise { + options.keepAlive = options.keepAlive ?? true; + + // options.signal destroys the socket before 'connect' event + const socket = await this._connect(ma, options); + + // Avoid uncaught errors caused by unstable connections + socket.on("error", (err) => { + log("socket error", err); + }); + + const maConn = toMultiaddrConnection(socket as any, { + remoteAddr: ma, + socketInactivityTimeout: this.opts?.outboundSocketInactivityTimeout, + socketCloseTimeout: this.opts?.socketCloseTimeout, + metrics: this.metrics?.dialerEvents, + }); + + const onAbort = (): void => { + maConn.close().catch((err: any) => { + log.error("Error closing maConn after abort", err); + }); + }; + options.signal?.addEventListener("abort", onAbort, { once: true }); + + log("new outbound connection %s", maConn.remoteAddr); + const conn = await options.upgrader.upgradeOutbound(maConn); + log("outbound connection %s upgraded", maConn.remoteAddr); + + options.signal?.removeEventListener("abort", onAbort); + + if (options.signal?.aborted === true) { + conn.close().catch((err) => { + log.error("Error closing conn after abort", err); + }); + + throw new AbortError(); + } + + return conn; + } + + async _connect(ma: Multiaddr, options: TCPDialOptions): Promise { + if (options.signal?.aborted === true) { + throw new AbortError(); + } + + return await new Promise(async (resolve, reject) => { + const start = Date.now(); + const cOpts = multiaddrToNetConfig(ma) as IpcSocketConnectOpts & + TcpSocketConnectOpts; + const cOptsStr = cOpts.path ?? `${cOpts.host ?? ""}:${cOpts.port}`; + + log("dialing %j", cOpts); + + let rawSocket: Socket; + + const onError = (err: Error): void => { + err.message = `connection error ${cOptsStr}: ${err.message}`; + this.metrics?.dialerEvents.increment({ error: true }); + + done(err); + }; + + const onTimeout = (): void => { + log("connection timeout %s", cOptsStr); + this.metrics?.dialerEvents.increment({ timeout: true }); + + const err = new CodeError( + `connection timeout after ${Date.now() - start}ms`, + "ERR_CONNECT_TIMEOUT" + ); + // Note: this will result in onError() being called + rawSocket?.emit("error", err); + }; + + const onConnect = (): void => { + log("connection opened %j", cOpts); + this.metrics?.dialerEvents.increment({ connect: true }); + done(); + }; + + const onAbort = (): void => { + log("connection aborted %j", cOpts); + this.metrics?.dialerEvents.increment({ abort: true }); + rawSocket?.destroy(); + done(new AbortError()); + }; + + const done = (err?: any): void => { + rawSocket?.removeListener("error", onError); + // @ts-ignore + rawSocket?.removeListener("timeout", onTimeout); + // @ts-ignore + rawSocket?.removeListener("connect", onConnect); + + if (options.signal != null) { + options.signal.removeEventListener("abort", onAbort); + } + + if (err != null) { + reject(err); + return; + } + + resolve(rawSocket as Socket); + }; + + try { + rawSocket = (await this.opts?.peerManager?.createSocket( + cOpts + )) as Socket; + } catch (e: any) { + onError(e); + } + + // @ts-ignore + rawSocket = rawSocket as Socket; + + // @ts-ignore + rawSocket?.on("error", onError); + // @ts-ignore + rawSocket?.on("timeout", onTimeout); + // @ts-ignore + rawSocket?.on("connect", onConnect); + + if (options.signal != null) { + options.signal.addEventListener("abort", onAbort); + } + + rawSocket?.connect(); + }); + } + + filter(multiaddrs: Multiaddr[]): Multiaddr[] { + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]; + + return multiaddrs.filter((ma) => { + if (ma.protoCodes().includes(CODE_CIRCUIT)) { + return false; + } + + if (ma.protoCodes().includes(CODE_UNIX)) { + return true; + } + + const addr = ma.nodeAddress(); + + if (isPrivateIp(addr.address)) { + return false; + } + + return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P)); + }); + } +} + +export function hypercoreTransport( + init: HypercoreOptions = {} +): (components?: TCPComponents) => HypercoreTransport { + return (components: TCPComponents = {}) => { + return new HypercoreTransport(components, init); + }; +} diff --git a/src/net.ts b/src/net.ts new file mode 100644 index 0000000..0304802 --- /dev/null +++ b/src/net.ts @@ -0,0 +1,3 @@ +export default { + +} diff --git a/src/peerManager.ts b/src/peerManager.ts new file mode 100644 index 0000000..f938f73 --- /dev/null +++ b/src/peerManager.ts @@ -0,0 +1,315 @@ +import { Peer } from "@lumeweb/libhyperproxy"; +import b4a from "b4a"; +// @ts-ignore +import { fixed32, json, raw, uint } from "compact-encoding"; +import { TcpSocketConnectOpts } from "net"; +import { Helia } from "@helia/interface"; +import { deserializeError } from "serialize-error"; +import { + CloseSocketRequest, + ErrorSocketRequest, + PeerEntity, + PeerInfoResult, + SocketRequest, + WriteSocketRequest, +} from "./types.js"; +import { Socket } from "./socket.js"; + +function idFactory(start: number, step = 1, limit = 2 ** 32) { + let id = start; + + return function nextId() { + const nextId = id; + id += step; + if (id >= limit) id = start; + return nextId; + }; +} + +const nextSocketId = idFactory(1); + +function roundRobinFactory(list: Map) { + let index = 0; + + return (): PeerEntity => { + const keys = [...list.keys()].sort(); + if (index >= keys.length) { + index = 0; + } + + return list.get(keys[index++]); + }; +} + +const socketEncoding = { + preencode(state: any, m: SocketRequest) { + uint.preencode(state, m.id); + uint.preencode(state, m.remoteId); + }, + encode(state: any, m: SocketRequest) { + uint.encode(state, m.id); + uint.encode(state, m.remoteId); + }, + decode(state: any, m: any): SocketRequest { + return { + remoteId: uint.decode(state, m), + id: uint.decode(state, m), + }; + }, +}; + +const writeSocketEncoding = { + preencode(state: any, m: WriteSocketRequest) { + socketEncoding.preencode(state, m); + raw.preencode(state, m.data); + }, + encode(state: any, m: WriteSocketRequest) { + socketEncoding.encode(state, m); + raw.encode(state, m.data); + }, + decode(state: any, m: any): WriteSocketRequest { + const socket = socketEncoding.decode(state, m); + return { + ...socket, + data: raw.decode(state, m), + }; + }, +}; + +const errorSocketEncoding = { + decode(state: any, m: any): ErrorSocketRequest { + const socket = socketEncoding.decode(state, m); + return { + ...socket, + err: deserializeError(json.decode(state, m)), + }; + }, +}; + +export default class PeerManager { + private static _instance: PeerManager; + + public static get instance(): PeerManager { + if (!PeerManager._instance) { + PeerManager._instance = new PeerManager(); + } + + return PeerManager._instance; + } + + private _sockets = new Map(); + + get sockets(): Map { + return this._sockets; + } + + private _socketMap = new Map(); + + get socketMap(): Map { + return this._socketMap; + } + + private _peers: Map = new Map(); + + private _nextPeer = roundRobinFactory(this._peers); + + get peers(): Map { + return this._peers; + } + + private _ipfs?: Helia; + + get ipfs(): Helia { + return this._ipfs as Helia; + } + + set ipfs(value: Helia) { + this._ipfs = value as Helia; + } + + private _ipfsReady?: Promise; + private _ipfsResolve?: () => void; + + get ipfsReady(): Promise { + if (!this._ipfsReady) { + this._ipfsReady = new Promise((resolve) => { + this._ipfsResolve = resolve; + }); + } + + return this._ipfsReady as Promise; + } + + handleNewPeerChannel(peer: Peer, channel: any) { + this.update(peer.socket.remotePublicKey, { peer }); + + this._registerOpenSocketMessage(peer, channel); + this._registerWriteSocketMessage(peer, channel); + this._registerCloseSocketMessage(peer, channel); + this._registerTimeoutSocketMessage(peer, channel); + this._registerErrorSocketMessage(peer, channel); + } + + async handleNewPeer() { + if (!this.ipfs.libp2p.isStarted()) { + await this.ipfs.libp2p.start(); + this._ipfsResolve?.(); + } + } + + async handleClosePeer(peer: Peer) { + for (const item of this._sockets) { + if (item[1].peer.peer === peer) { + item[1].end(); + } + } + + const pubkey = this._toString(peer.socket.remotePublicKey); + + if (this._peers.has(pubkey)) { + this._peers.delete(pubkey); + } + } + + public get(pubkey: Uint8Array): PeerEntity | undefined { + if (this._peers.has(this._toString(pubkey))) { + return this._peers.get(this._toString(pubkey)) as PeerEntity; + } + + return undefined; + } + + public update(pubkey: Uint8Array, data: Partial): void { + const peer = this.get(pubkey) ?? ({} as PeerEntity); + + this._peers.set(this._toString(pubkey), { + ...peer, + ...data, + ...{ + messages: { + ...peer?.messages, + ...data?.messages, + }, + }, + } as PeerEntity); + } + + public async createSocket(options: TcpSocketConnectOpts): Promise { + if (!this.peers.size) { + throw new Error("no peers found"); + } + + const peer = this._nextPeer(); + const socketId = nextSocketId(); + const socket = new Socket(socketId, this, peer, options); + this._sockets.set(socketId, socket); + + return socket; + } + + /* private _registerKeyExchangeMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: { + preencode(state: any, m: any) { + fixed32.preencode(state, m); + }, + encode(state: any, m: any) { + fixed32.encode(state, m); + }, + decode(state: any, m: any): PeerInfoResult { + return { + publicKey: fixed32.decode(state, m), + libp2pPublicKey: raw.decode(state, m), + }; + }, + }, + async onmessage(m: PeerInfoResult) { + self.get(m.publicKey)?.submitKeyExchange(m.libp2pPublicKey); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { keyExchange: message }, + }); + }*/ + + private _registerOpenSocketMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: { + preencode: json.preencode, + encode: json.encode, + decode: socketEncoding.decode, + }, + async onmessage(m: SocketRequest) { + const socket = self._sockets.get(m.id); + if (socket) { + socket.remoteId = m.remoteId; + // @ts-ignore + socket.emit("connect"); + } + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { openSocket: message }, + }); + } + + private _registerWriteSocketMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: writeSocketEncoding, + onmessage(m: WriteSocketRequest) { + self._sockets.get(m.id)?.push(m.data); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { writeSocket: message }, + }); + } + + private _registerCloseSocketMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: socketEncoding, + onmessage(m: CloseSocketRequest) { + self._sockets.get(m.id)?.end(); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { closeSocket: message }, + }); + } + + private _registerTimeoutSocketMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: socketEncoding, + onmessage(m: SocketRequest) { + // @ts-ignore + self._sockets.get(m.id)?.emit("timeout"); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { timeoutSocket: message }, + }); + } + + private _registerErrorSocketMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: errorSocketEncoding, + onmessage(m: ErrorSocketRequest) { + // @ts-ignore + self._sockets.get(m.id)?.emit("error", m.err); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { errorSocket: message }, + }); + } + + private _toString(pubkey: Uint8Array) { + return b4a.from(pubkey).toString("hex"); + } +} diff --git a/src/socket.ts b/src/socket.ts new file mode 100644 index 0000000..20962b7 --- /dev/null +++ b/src/socket.ts @@ -0,0 +1,158 @@ +import { Callback, Duplex } from "streamx"; +import { TcpSocketConnectOpts } from "net"; +import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js"; +import PeerManager from "./peerManager.js"; +import { clearTimeout } from "timers"; + +const asyncIterator = Symbol.asyncIterator || Symbol("asyncIterator"); + +const STREAM_DESTROYED = new Error("Stream was destroyed"); +const PREMATURE_CLOSE = new Error("Premature close"); +const READ_DONE = 0b0010000000000 << 4; +const DESTROYED = 0b1000; + +export class Socket extends Duplex { + private _options: TcpSocketConnectOpts; + private _id: number; + private _manager: PeerManager; + + private _connectTimeout?: number; + + constructor( + id: number, + manager: PeerManager, + peer: PeerEntity, + options: TcpSocketConnectOpts + ) { + super(); + this._id = id; + this._manager = manager; + this._peer = peer; + this._options = options; + + // @ts-ignore + this.on("timeout", () => { + if (this._connectTimeout) { + clearTimeout(this._connectTimeout); + } + }); + } + + private _remoteId = 0; + + set remoteId(value: number) { + this._remoteId = value; + this._manager.socketMap.set(this._id, value); + } + + private _peer; + + get peer() { + return this._peer; + } + + public _write(data: any, cb: any): void { + this._peer.messages.writeSocket?.send({ + id: this._id, + remoteId: this._remoteId, + data, + } as WriteSocketRequest); + cb(); + } + + public _destroy(cb: Callback) { + this._peer.messages.closeSocket?.send({ + id: this._id, + remoteId: this._remoteId, + } as SocketRequest); + this._manager.socketMap.delete(this._id); + this._manager.sockets.delete(this._id); + } + + public connect() { + this._peer.messages.openSocket?.send({ + ...this._options, + id: this._id, + }); + } + + public setTimeout(ms: number, cb: Function) { + if (this._connectTimeout) { + clearTimeout(this._connectTimeout); + } + + this._connectTimeout = setTimeout(() => { + cb && cb(); + }, ms) as any; + } + + [asyncIterator]() { + const stream = this; + + let error: Error | null = null; + let promiseResolve: ((arg0: { value: any; done: boolean }) => void) | null = + null; + let promiseReject: ((arg0: Error) => void) | null = null; + + this.on("error", (err) => { + error = err; + }); + this.on("data", ondata); + this.on("close", onclose); + + return { + [asyncIterator]() { + return this; + }, + next() { + return new Promise(function (resolve, reject) { + promiseResolve = resolve; + promiseReject = reject; + const data = stream.read(); + if (data !== null) ondata(data); + else { + // @ts-ignore + if ((stream._duplexState & DESTROYED) !== 0) ondata(null); + } + }); + }, + return() { + return destroy(null); + }, + throw(err: any) { + return destroy(err); + }, + }; + + function onreadable() { + if (promiseResolve !== null) ondata(stream.read()); + } + + function onclose() { + if (promiseResolve !== null) ondata(null); + } + + function ondata(data: any) { + if (promiseReject === null) return; + if (error) promiseReject(error); + // @ts-ignore + else if (data === null && (stream._duplexState & READ_DONE) === 0) + promiseReject(STREAM_DESTROYED); + else promiseResolve?.({ value: data, done: data === null }); + promiseReject = promiseResolve = null; + } + + function destroy(err: any) { + stream.destroy(err); + return new Promise((resolve, reject) => { + // @ts-ignore + if (stream._duplexState & DESTROYED) + return resolve({ value: undefined, done: true }); + stream.once("close", function () { + if (err) reject(err); + else resolve({ value: undefined, done: true }); + }); + }); + } + } +} diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..f1d8a2b --- /dev/null +++ b/src/types.ts @@ -0,0 +1,40 @@ +import { Peer } from "@lumeweb/libhyperproxy"; + +type Message = { + send: (pubkey: Uint8Array | any) => void; +}; + +export interface PeerEntityMessages { + keyExchange: Message; + openSocket: Message; + writeSocket: Message; + closeSocket: Message; + timeoutSocket: Message; + errorSocket: Message; +} + +export interface PeerEntity { + messages: PeerEntityMessages | Partial; + submitKeyExchange: (pubkey: Uint8Array) => void; + peer: Peer; +} + +export interface PeerInfoResult { + publicKey: Uint8Array; + libp2pPublicKey: Uint8Array; +} + +export interface SocketRequest { + remoteId: number; + id: number; +} + +export type CloseSocketRequest = SocketRequest; + +export interface WriteSocketRequest extends SocketRequest { + data: Uint8Array; +} + +export interface ErrorSocketRequest extends SocketRequest { + err: Error; +} diff --git a/timers.js b/timers.js new file mode 100644 index 0000000..14c81c4 --- /dev/null +++ b/timers.js @@ -0,0 +1,8 @@ +import { setTimeout, setInterval, clearTimeout, clearInterval } from 'timers-browserify' + +var scope = typeof self !== "undefined" && self || typeof self !== "undefined" && self || window; + +scope.setTimeout = setTimeout; +scope.setInterval = setInterval; +scope.clearTimeout = clearTimeout; +scope.clearInterval = clearInterval;