diff --git a/build.js b/build.js index 2076e06..35c7c51 100644 --- a/build.js +++ b/build.js @@ -1,12 +1,12 @@ import esbuild from 'esbuild' esbuild.buildSync({ - entryPoints: ['src/index.ts'], - outfile: 'dist/ipfs.js', - format: 'cjs', - bundle: true, - platform: "node", - define: { - __dirname: '"./plugins/leveldown"' - } -}) + entryPoints: ["src/index.ts"], + outfile: "dist/ipfs.js", + format: "cjs", + bundle: true, + platform: "node", + define: { + __dirname: '"./plugins/leveldown"', + } +}); diff --git a/package.json b/package.json index edd9e46..fb9c72a 100644 --- a/package.json +++ b/package.json @@ -3,19 +3,30 @@ "type": "module", "version": "0.1.0", "scripts": { - "build": "rimraf node_modules/*/node_modules/ipfs-utils node_modules/@achingbrain/nat-port-mapper/node_modules/default-gateway && bash build.sh" + "build": "node build.js" }, "devDependencies": { - "@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git", - "esbuild": "^0.15.5", + "@libp2p/interface-connection-encrypter": "^3.0.6", + "@libp2p/interface-peer-id": "^2.0.1", + "@libp2p/multistream-select": "^3.1.2", + "@lumeweb/relay-types": "git+https://git.lumeweb.com/LumeWeb/relay-types.git", + "@types/b4a": "^1.6.0", + "@types/streamx": "^2.9.1", + "esbuild": "^0.15.18", + "prettier": "^2.8.4", "rimraf": "^3.0.2" }, "dependencies": { - "@achingbrain/ssdp": "https://github.com/LumeWeb/ssdp.git", - "default-gateway": "https://github.com/LumeWeb/default-gateway.git", - "ipfs-core": "^0.15.4", - "ipfs-http-response": "^3.0.4", - "ipfs-utils": "https://github.com/LumeWeb/js-ipfs-utils.git", - "multiformats": "^9.7.1" + "@chainsafe/libp2p-noise": "^11.0.1", + "@libp2p/interface-connection": "^3.1.0", + "@libp2p/peer-id": "^2.0.2", + "@lumeweb/libhyperproxy": "git+https://git.lumeweb.com/LumeWeb/libhyperproxy.git", + "@multiformats/multiaddr": "^11.6.1", + "b4a": "^1.6.2", + "compact-encoding": "^2.11.0", + "debug-stream": "^3.0.1", + "serialize-error": "^11.0.0", + "stream-to-it": "^0.2.4", + "streamx": "^2.13.2" } } diff --git a/pkg/load-ipfs.json b/pkg/load-ipfs.json new file mode 100644 index 0000000..ae2c0da --- /dev/null +++ b/pkg/load-ipfs.json @@ -0,0 +1,5 @@ +{ + "plugins": [ + "ipfs" + ] +} diff --git a/src/index.ts b/src/index.ts index 2c5e76b..18e595b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,224 +1,40 @@ -import type { - Plugin, - PluginAPI, - RPCRequest, - RPCResponse, -} from "@lumeweb/relay-types"; - -import { CID } from "multiformats/cid"; +import type { Plugin, PluginAPI } from "@lumeweb/relay-types"; +import { Peer, Proxy, Socket } from "@lumeweb/libhyperproxy"; // @ts-ignore -import toStream from "it-to-stream"; -import type { StatResult } from "ipfs-core/dist/src/components/files/stat"; -import * as IPFS from "ipfs-http-client"; +import debugStream from "debug-stream"; +// @ts-ignore +import toIterable from "stream-to-it"; +// @ts-ignore +import { fixed32, raw } from "compact-encoding"; +import PeerManager from "./peerManager"; -interface StatFileResponse { - exists: boolean; - contentType: string | null; - error: any; - directory: boolean; - files: StatFileSubfile[]; - timeout: boolean; - size: number; -} +const PROTOCOL = "lumeweb.proxy.ipfs"; -interface StatFileSubfile { - name: string; - size: number; -} - -let client: IPFS.IPFSHTTPClient; - -import { utils } from "ipfs-http-response"; - -const { detectContentType } = utils; - -function normalizeCidPath(path: any) { - if (path instanceof Uint8Array) { - return CID.decode(path).toString(); - } - - path = path.toString(); - - if (path.indexOf("/ipfs/") === 0) { - path = path.substring("/ipfs/".length); - } - - if (path.charAt(path.length - 1) === "/") { - path = path.substring(0, path.length - 1); - } - - return path; -} - -function normalizePath( - hash?: string, - path?: string, - fullPath?: string -): string { - if (!fullPath) { - if (!path) { - path = "/"; - } - - fullPath = `${hash}/${path}`; - } - - fullPath = fullPath.replace(/\/{2,}/, "/"); - return normalizeCidPath(fullPath); -} - -async function fetchFile( - hash?: string, - path?: string, - fullPath?: string -): Promise> { - let data = await fileExists(hash, path, fullPath); - - if (data instanceof Error) { - return data; - } - - if (data?.type === "directory") { - return new Error("ERR_HASH_IS_DIRECTORY"); - } - - return client.cat(data.cid); -} - -async function statFile(hash?: string, path?: string, fullPath?: string) { - let stats: StatFileResponse = { - exists: false, - contentType: null, - error: null, - directory: false, - files: [], - timeout: false, - size: 0, - }; - - let exists = await fileExists(hash, path, fullPath); - fullPath = normalizePath(hash, path, fullPath); - - if (exists instanceof Error) { - stats.error = exists.toString(); - - if (exists.message.includes("aborted")) { - stats.timeout = true; - } - - return stats; - } - stats.exists = true; - - if (exists?.type === "directory") { - stats.directory = true; - for await (const item of client.ls(exists.cid)) { - stats.files.push({ - name: item.name, - size: item.size, - } as StatFileSubfile); - } - return stats; - } - - const { size } = await client.files.stat(`/ipfs/${exists.cid}`); - stats.size = size; - - const { contentType } = await detectContentType( - fullPath, - client.cat(exists.cid) - ); - stats.contentType = contentType ?? null; - - return stats; -} - -async function fileExists( - hash?: string, - path?: string, - fullPath?: string -): Promise { - let ipfsPath = normalizePath(hash, path, fullPath); - try { - const ret = await client.files.stat(`/ipfs/${ipfsPath}`); - return ret; - } catch (err: any) { - return err; - } -} - -async function resolveIpns( - hash: string, - path: string -): Promise { - for await (const result of client.name.resolve(hash)) { - return normalizePath(undefined, undefined, `${result}/${path}`); - } - - return false; +interface PeerInfoResult { + publicKey: Uint8Array; + libp2pPublicKey: Uint8Array; } const plugin: Plugin = { name: "ipfs", async plugin(api: PluginAPI): Promise { - client = await IPFS.create({ host: "127.0.0.1" }); - api.registerMethod("stat_ipfs", { - cacheable: false, - async handler(request: RPCRequest): Promise { - return await statFile(request.data?.hash, request.data?.path); - }, + api.swarm.join(api.util.crypto.createHash(PROTOCOL)); + const proxy = new Proxy({ + swarm: api.swarm, + protocol: PROTOCOL, }); - api.registerMethod("stat_ipns", { - cacheable: false, - async handler(request: RPCRequest): Promise { - let ipfsPath = await resolveIpns( - request.data?.hash, - request.data?.path - ); - if (!ipfsPath) { - throw new Error("ipns lookup failed"); - } - return statFile(undefined, undefined, ipfsPath as string); - }, - }); - api.registerMethod("fetch_ipfs", { - cacheable: false, - async handler( - request: RPCRequest, - sendStream: (stream: AsyncIterable) => void - ): Promise { - const ret = await fetchFile(request.data?.hash, request.data?.path); - if (ret instanceof Error) { - throw ret; - } - - sendStream(ret); - - return null; - }, - }); - api.registerMethod("fetch_ipns", { - cacheable: false, - async handler( - request: RPCRequest, - sendStream: (stream: AsyncIterable) => void - ): Promise { - let ipfsPath = await resolveIpns( - request.data?.hash, - request.data?.path - ); - if (!ipfsPath) { - throw new Error("ipns lookup failed"); - } - const ret = await fetchFile(undefined, undefined, ipfsPath as string); - if (ret instanceof Error) { - throw ret; - } - - sendStream(ret); - - return null; - }, + api.protocols.register(PROTOCOL, (peer: any, muxer: any) => { + proxy.handlePeer({ + peer, + muxer, + createDefaultMessage: false, + onchannel(peer: Peer, channel: any) { + PeerManager.instance(api).handleNewPeerChannel(peer, channel); + }, + onclose(peer: Peer) { + PeerManager.instance(api).handleClosePeer(peer); + }, + }); }); }, }; diff --git a/src/peerManager.ts b/src/peerManager.ts new file mode 100644 index 0000000..51ec2d1 --- /dev/null +++ b/src/peerManager.ts @@ -0,0 +1,250 @@ +import { PluginAPI } from "@lumeweb/relay-types"; +import { Peer, Socket } from "@lumeweb/libhyperproxy"; +import net from "net"; +// @ts-ignore +import { fixed32, json, raw, uint } from "compact-encoding"; +import b4a from "b4a"; +import { + CloseSocketRequest, + ErrorSocketRequest, + PeerEntity, + PeerInfoResult, + SocketRequest, + WriteSocketRequest, +} from "./types"; +import { TCPSocket } from "./socket"; +import { serializeError } from "serialize-error"; + +const socketEncoding = { + preencode(state: any, m: SocketRequest) { + uint.preencode(state, m.id); + uint.preencode(state, m.remoteId); + }, + encode(state: any, m: SocketRequest) { + uint.encode(state, m.id); + uint.encode(state, m.remoteId); + }, + decode(state: any, m: any): SocketRequest { + return { + remoteId: uint.decode(state, m), + id: uint.decode(state, m), + }; + }, +}; + +const writeSocketEncoding = { + preencode(state: any, m: WriteSocketRequest) { + socketEncoding.preencode(state, m); + raw.preencode(state, m.data); + }, + encode(state: any, m: WriteSocketRequest) { + socketEncoding.encode(state, m); + raw.encode(state, m.data); + }, + decode(state: any, m: any): WriteSocketRequest { + const socket = socketEncoding.decode(state, m); + return { + ...socket, + data: raw.decode(state, m), + }; + }, +}; + +const errorSocketEncoding = { + preencode(state: any, m: ErrorSocketRequest) { + socketEncoding.preencode(state, m); + json.preencode(state, serializeError(m.err)); + }, + encode(state: any, m: ErrorSocketRequest) { + socketEncoding.encode(state, m); + json.encode(state, serializeError(m.err)); + }, +}; + +function idFactory(start: number, step = 1, limit = 2 ** 32) { + let id = start; + + return function nextId() { + const nextId = id; + id += step; + if (id >= limit) id = start; + return nextId; + }; +} + +const nextSocketId = idFactory(1); + +export default class PeerManager { + private static _instance: PeerManager; + private _api: PluginAPI; + private _peers: Map = new Map(); + + constructor(api: PluginAPI) { + this._api = api; + } + + private _sockets = new Map(); + + get sockets(): Map { + return this._sockets; + } + + private _socketMap = new Map(); + + get socketMap(): Map { + return this._socketMap; + } + + public static instance(api?: PluginAPI): PeerManager { + if (!PeerManager._instance) { + if (!api) { + throw new Error("api argument required"); + } + PeerManager._instance = new PeerManager(api as PluginAPI); + } + + return PeerManager._instance; + } + + handleNewPeerChannel(peer: Peer, channel: any) { + this._registerOpenSocketMessage(peer, channel); + this._registerWriteSocketMessage(peer, channel); + this._registerCloseSocketMessage(peer, channel); + this._registerTimeoutSocketMessage(peer, channel); + this._registerErrorSocketMessage(peer, channel); + } + + public get(pubkey: Uint8Array): PeerEntity | undefined { + if (this._peers.has(this._toString(pubkey))) { + return this._peers.get(this._toString(pubkey)) as PeerEntity; + } + + return undefined; + } + + public update(pubkey: Uint8Array, data: Partial): void { + const peer = this.get(pubkey) ?? ({} as PeerEntity); + + this._peers.set(this._toString(pubkey), { + ...peer, + ...data, + ...{ + messages: { + ...peer?.messages, + ...data?.messages, + }, + }, + } as PeerEntity); + } + async handleClosePeer(peer: Peer) { + for (const item of this._sockets) { + if (item[1].peer.peer === peer) { + item[1].end(); + } + } + + const pubkey = this._toString(peer.socket.remotePublicKey); + + if (this._peers.has(pubkey)) { + this._peers.delete(pubkey); + } + } + /* + + private _registerKeyExchangeMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: { + preencode(state: any, m: PeerInfoResult) { + fixed32.preencode(state, m.publicKey); + raw.preencode(state, m.libp2pPublicKey); + }, + encode(state: any, m: PeerInfoResult) { + fixed32.encode(state, m.publicKey); + raw.encode(state, m.libp2pPublicKey); + }, + decode(state: any, m: any): Uint8Array { + return fixed32.decode(state, m); + }, + }, + onmessage: async (pubkey: Uint8Array) => { + this.get(peer.socket.remotePublicKey)?.messages.keyExchange?.send({ + publicKey: pubkey, + libp2pPublicKey: (await getIpfsPeerId(self._api)).toBytes(), + }); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { keyExchange: message }, + }); + } +*/ + + private _registerOpenSocketMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: { + ...socketEncoding, + decode: json.decode, + }, + async onmessage(m: any) { + // @ts-ignore + new TCPSocket( + nextSocketId(), + m.id, + self, + self.get(peer.socket.remotePublicKey) as PeerEntity, + m + ).connect(); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { openSocket: message }, + }); + } + + private _registerWriteSocketMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: writeSocketEncoding, + onmessage(m: WriteSocketRequest) { + self._sockets.get(m.id)?.push(m.data); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { writeSocket: message }, + }); + } + private _registerCloseSocketMessage(peer: Peer, channel: any) { + const self = this; + const message = channel.addMessage({ + encoding: socketEncoding, + async onmessage(m: CloseSocketRequest) { + self._sockets.get(m.id)?.end(); + }, + }); + this.update(peer.socket.remotePublicKey, { + messages: { closeSocket: message }, + }); + } + private _registerTimeoutSocketMessage(peer: Peer, channel: any) { + const message = channel.addMessage({ + encoding: socketEncoding, + }); + this.update(peer.socket.remotePublicKey, { + messages: { timeoutSocket: message }, + }); + } + private _registerErrorSocketMessage(peer: Peer, channel: any) { + const message = channel.addMessage({ + encoding: errorSocketEncoding, + }); + this.update(peer.socket.remotePublicKey, { + messages: { errorSocket: message }, + }); + } + + private _toString(pubkey: Uint8Array) { + return b4a.from(pubkey).toString("hex"); + } +} diff --git a/src/socket.ts b/src/socket.ts new file mode 100644 index 0000000..3b22410 --- /dev/null +++ b/src/socket.ts @@ -0,0 +1,101 @@ +import { + Callback, + Duplex, + DuplexEvents, + EventName, + EventListener, +} from "streamx"; +import net, { TcpSocketConnectOpts } from "net"; +import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types"; +import PeerManager from "./peerManager"; +import { Socket } from "net"; + +export class TCPSocket extends Duplex { + private _options; + private _id: number; + private _remoteId: number; + private _manager: PeerManager; + + private _socket?: Socket; + + constructor( + id: number, + remoteId: number, + manager: PeerManager, + peer: PeerEntity, + options: TcpSocketConnectOpts + ) { + super(); + this._remoteId = remoteId; + this._manager = manager; + this._id = id; + this._peer = peer; + this._options = options; + + this._manager.sockets.set(this._id, this); + this._manager.socketMap.set(this._id, this._remoteId); + console.log(options); + } + + private _peer; + + get peer() { + return this._peer; + } + + public _write(data: any, cb: any): void { + this._peer.messages.writeSocket?.send({ + ...this._getSocketRequest(), + data, + } as WriteSocketRequest); + cb(); + } + + public _destroy(cb: Callback) { + this._manager.sockets.delete(this._id); + this._manager.socketMap.delete(this._id); + this._peer.messages.closeSocket?.send(this._getSocketRequest()); + } + + public connect() { + ["timeout", "error", "connect", "end", "destroy", "close"].forEach( + (event) => { + this._socket?.on(event, (...args: any) => + this.emit(event as any, ...args) + ); + } + ); + + this.on("error", (err: Error) => { + this._peer.messages.errorSocket?.send({ + ...this._getSocketRequest(), + err, + }); + }); + + // @ts-ignore + this.on("timeout", () => { + this._peer.messages.timeoutSocket?.send(this._getSocketRequest()); + }); + // @ts-ignore + this.on("connect", () => { + this._peer.messages.openSocket?.send(this._getSocketRequest()); + }); + + if (![4001, 4002].includes(this._options.port)) { + this.emit("error", new Error(`port ${this._options.port} not allowed`)); + return; + } + + this._socket = net.connect(this._options); + this._socket.pipe(this as any); + this.pipe(this._socket); + } + + private _getSocketRequest(): SocketRequest { + return { + id: this._id, + remoteId: this._remoteId, + }; + } +} diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..424ccf0 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,36 @@ +import { Peer } from "@lumeweb/libhyperproxy"; + +export interface PeerInfoResult { + publicKey: Uint8Array; + libp2pPublicKey: Uint8Array; +} + +export type Message = { + send: (pubkey: Uint8Array | any) => void; +}; + +export interface PeerEntityMessages { + keyExchange: Message; + openSocket: Message; + writeSocket: Message; + closeSocket: Message; + timeoutSocket: Message; + errorSocket: Message; +} + +export interface PeerEntity { + messages: PeerEntityMessages | Partial; + peer: Peer; +} +export interface SocketRequest { + remoteId: number; + id: number; +} +export type CloseSocketRequest = SocketRequest; + +export interface WriteSocketRequest extends SocketRequest { + data: Uint8Array; +} +export interface ErrorSocketRequest extends SocketRequest { + err: Error; +}