import { Multihash } from "../multihash.js"; import NodeId from "../nodeId.js"; import { equalBytes } from "@noble/curves/abstract/utils"; import { Logger, Peer, S5Config, SignedMessage } from "../types.js"; import KeyPairEd25519 from "../ed25519.js"; import * as crypto from "crypto"; import { mkeyEd25519, protocolMethodAnnouncePeers, protocolMethodHandshakeDone, protocolMethodHandshakeOpen, protocolMethodHashQuery, protocolMethodSignedMessage, recordTypeStorageLocation, storageLocationTypeFull, } from "../constants.js"; import defer from "p-defer"; import { calculateScore, decodeEndian, encodeEndian } from "#util.js"; import Packer from "#serialization/pack.js"; import Unpacker from "#serialization/unpack.js"; import { ed25519 } from "@noble/curves/ed25519"; import { AbstractLevel } from "abstract-level"; import StorageLocation from "#storage.js"; import { addStorageLocation, stringifyNode } from "#node.js"; import { URL } from "url"; import { Buffer } from "buffer"; import { connect as tcpConnect, TcpPeer } from "../peer/tcp.js"; import { connect as wsConnect, WebSocketPeer } from "../peer/webSocket.js"; export class P2PService { get peers(): Map { return this._peers; } private config: S5Config; private logger: Logger; private nodeKeyPair: KeyPairEd25519; private localNodeId?: NodeId; private networkId?: string; private _peers: Map = new Map(); private reconnectDelay: Map = new Map(); private selfConnectionUris: Array = []; private nodesDb?: AbstractSublevel< AbstractLevel, Uint8Array, string, Uint8Array >; private hashQueryRoutingTable: Map> = new Map(); constructor(config: S5Config) { this.config = config; this.networkId = config?.p2p?.network; this.nodeKeyPair = config.keyPair; this.logger = config.logger; config.services.p2p = this; } async init(): Promise { this.localNodeId = new NodeId(this.nodeKeyPair.publicKey); // Define the NodeId constructor this.nodesDb = this.config.db.sublevel("s5-nodes", {}); } async start(): Promise { const initialPeers = this.config?.p2p?.peers?.initial || []; for (const p of initialPeers) { this.connectToNode([new URL(p)]); } } async onNewPeer(peer: Peer, verifyId: boolean): Promise { peer.challenge = crypto.randomBytes(32); const initialAuthPayloadPacker = new Packer(); initialAuthPayloadPacker.packInt(protocolMethodHandshakeOpen); initialAuthPayloadPacker.packBinary(Buffer.from(peer.challenge)); if (this.networkId) { initialAuthPayloadPacker.packString(this.networkId); } const completer = defer(); const supportedFeatures = 3; // 0b00000011 peer.listenForMessages( async (event: Uint8Array) => { let u = Unpacker.fromPacked(event); const method = u.unpackInt(); if (method === protocolMethodHandshakeOpen) { const p = new Packer(); p.packInt(protocolMethodHandshakeDone); p.packBinary(u.unpackBinary()); let peerNetworkId: string | null = null; try { peerNetworkId = u.unpackString(); } catch {} if (peerNetworkId !== this.networkId) { throw `Peer is in different network: ${peerNetworkId}`; } p.packInt(supportedFeatures); p.packInt(this.selfConnectionUris.length); for (const uri of this.selfConnectionUris) { p.packString(uri.toString()); } // TODO Protocol version // p.packInt(protocolVersion); peer.sendMessage(await this.signMessageSimple(p.takeBytes())); return; } /*else if (method === recordTypeRegistryEntry) { const sre = node.registry.deserializeRegistryEntry(event); await node.registry.set(sre, { receivedFrom: peer }); return; }*/ else if (method === recordTypeStorageLocation) { const hash = new Multihash(event.subarray(1, 34)); const type = event[34]; const expiry = decodeEndian(event.subarray(35, 39)); const partCount = event[39]; const parts: string[] = []; let cursor = 40; for (let i = 0; i < partCount; i++) { const length = decodeEndian(event.subarray(cursor, cursor + 2)); cursor += 2; parts.push( new TextDecoder().decode(event.subarray(cursor, cursor + length)), ); cursor += length; } cursor++; const publicKey = event.subarray(cursor, cursor + 33); const signature = event.subarray(cursor + 33); if (publicKey[0] !== mkeyEd25519) { throw `Unsupported public key type ${mkeyEd25519}`; } if ( !ed25519.verify( signature, event.subarray(0, cursor), publicKey.subarray(1), ) ) { return; } const nodeId = new NodeId(publicKey); await addStorageLocation({ hash, nodeId, location: new StorageLocation(type, parts, expiry), message: event, config: this.config, }); const list = this.hashQueryRoutingTable.get(hash) || new Set(); for (const peerId of list) { if (peerId.equals(nodeId)) { continue; } if (peerId.equals(peer.id)) { continue; } if (this._peers.has(peerId)) { try { this._peers.get(peerId)?.sendMessage(event); } catch (e) { this.logger.catched(e); } } } this.hashQueryRoutingTable.delete(hash); } if (method === protocolMethodSignedMessage) { const sm = await this.unpackAndVerifySignature(u); u = Unpacker.fromPacked(sm.message); const method2 = u.unpackInt(); if (method2 === protocolMethodHandshakeDone) { const challenge = u.unpackBinary(); if (!equalBytes(peer.challenge, challenge)) { throw "Invalid challenge"; } const pId = sm.nodeId; if (!verifyId) { peer.id = pId; } else { if (!peer.id.equals(pId)) { throw "Invalid peer id on initial list"; } } peer.isConnected = true; const supportedFeatures = u.unpackInt(); if (supportedFeatures !== 3) { throw "Remote node does not support required features"; } this._peers.set(peer.id, peer); this.reconnectDelay.set(peer.id, 1); const connectionUrisCount = u.unpackInt() as number; peer.connectionUris = []; for (let i = 0; i < connectionUrisCount; i++) { peer.connectionUris.push(new URL(u.unpackString() as string)); } this.logger.info( `[+] ${peer.id.toString()} (${peer .renderLocationUri() .toString()})`, ); this.sendPublicPeersToPeer(peer, Array.from(this._peers.values())); for (const p of this._peers.values()) { if (p.id.equals(peer.id)) continue; if (p.isConnected) { this.sendPublicPeersToPeer(p, [peer]); } } return; } else if (method2 === protocolMethodAnnouncePeers) { const length = u.unpackInt() as number; for (let i = 0; i < length; i++) { const peerIdBinary = u.unpackBinary(); const id = new NodeId(peerIdBinary); const isConnected = u.unpackBool() as boolean; const connectionUrisCount = u.unpackInt() as number; const connectionUris: URL[] = []; for (let i = 0; i < connectionUrisCount; i++) { connectionUris.push(new URL(u.unpackString() as string)); } if (connectionUris.length > 0) { // TODO Fully support multiple connection uris const uri = new URL(connectionUris[0].toString()); uri.username = id.toBase58(); if (!this.reconnectDelay.has(NodeId.decode(uri.username))) { this.connectToNode([uri]); } } } } } /* else if (method === protocolMethodRegistryQuery) { const pk = u.unpackBinary(); const sre = node.registry.getFromDB(pk); if (sre !== null) { peer.sendMessage(node.registry.serializeRegistryEntry(sre)); } }*/ }, { onDone: async () => { try { if (this._peers.has(peer.id)) { this._peers.delete(peer.id); this.logger.info( `[-] ${peer.id.toString()} (${peer .renderLocationUri() .toString()})`, ); } } catch (_) { this.logger.info(`[-] ${peer.renderLocationUri()}`); } completer.reject("onDone"); }, onError: (e) => { this.logger.warn(`${peer.id}: ${e}`); }, logger: this.logger, }, ); peer.sendMessage(initialAuthPayloadPacker.takeBytes()); return completer.promise; } async prepareProvideMessage( hash: Multihash, location: StorageLocation, ): Promise { const list: number[] = [ recordTypeStorageLocation, ...hash.fullBytes, location.type, ...encodeEndian(location.expiry, 4), location.parts.length, ]; for (const part of location.parts) { const bytes = new TextEncoder().encode(part); list.push(...encodeEndian(bytes.length, 2)); list.push(...Array.from(bytes)); } list.push(0); const signature = ed25519.sign( new Uint8Array(list), this.nodeKeyPair.extractBytes(), ); return new Uint8Array([ ...list, ...Array.from(this.nodeKeyPair.publicKey), ...Array.from(signature), ]); } async sendPublicPeersToPeer(peer: Peer, peersToSend: Peer[]): Promise { const p = new Packer(); p.packInt(protocolMethodAnnouncePeers); p.packInt(peersToSend.length); for (const pts of peersToSend) { p.packBinary(Buffer.from(pts.id.bytes)); p.packBool(pts.isConnected); p.packInt(pts.connectionUris.length); for (const uri of pts.connectionUris) { p.packString(uri.toString()); } } peer.sendMessage(await this.signMessageSimple(p.takeBytes())); } async getNodeScore(nodeId: NodeId): Promise { if (nodeId.equals(this.localNodeId)) { return 1; } const node = await this.nodesDb.get(stringifyNode(nodeId)); if (!node) { return 0.5; } const map = Unpacker.fromPacked(node).unpackMap(); return calculateScore(map.get(1), map.get(2)); } private async _vote(nodeId: NodeId, upvote: boolean): Promise { const node = await this.nodesDb.get(stringifyNode(nodeId)); const map = node ? Unpacker.fromPacked(node).unpackMap() : new Map( Object.entries({ 1: 0, 2: 0 }).map(([k, v]) => [+k, v]), ); if (upvote) { map.set(1, (map.get(1) ?? 0) + 1); } else { map.set(2, (map.get(2) ?? 0) + 1); } await this.nodesDb.put( stringifyNode(nodeId), new Packer().pack(map).takeBytes(), ); } async upvote(nodeId: NodeId): Promise { await this._vote(nodeId, true); } async downvote(nodeId: NodeId): Promise { await this._vote(nodeId, false); } // TODO add a bit of randomness with multiple options async sortNodesByScore(nodes: NodeId[]): Promise { const nodePromises = nodes.map( (item): [NodeId, Promise | number] => [ item, this.getNodeScore(item), ], ); await Promise.all(nodePromises.map((item) => item[1])); for (let i = 0; i < nodePromises.length; i++) { nodePromises[i][1] = await nodePromises[i][1]; } return nodePromises .sort((a: [NodeId, any], b: [NodeId, any]) => b[1] - a[1]) .map((item) => item[0]); } async signMessageSimple(message: Uint8Array): Promise { const packer = new Packer(); const signature = ed25519.sign(this.nodeKeyPair.extractBytes(), message); packer.packInt(protocolMethodSignedMessage); packer.packBinary(Buffer.from(this.localNodeId!.bytes)); packer.packBinary(Buffer.from(signature)); packer.packBinary(Buffer.from(message)); return packer.takeBytes(); } async unpackAndVerifySignature(u: Unpacker): Promise { const nodeId = new NodeId(u.unpackBinary()); const signature = u.unpackBinary(); const message = u.unpackBinary(); const isValid = ed25519.verify( signature, message, nodeId.bytes.subarray(1), ); if (!isValid) { throw new Error("Invalid signature found"); } return { nodeId: nodeId, message: message, }; } sendHashRequest( hash: Multihash, types: number[] = [storageLocationTypeFull], ): void { const p = new Packer(); p.packInt(protocolMethodHashQuery); p.packBinary(Buffer.from(hash.fullBytes)); p.pack(types); // TODO Maybe add int for hop count (or not because privacy concerns) const req = p.takeBytes(); for (const peer of this._peers.values()) { peer.sendMessage(req); } } async connectToNode(connectionUris: URL[]): Promise { const unsupported = new URL(""); unsupported.protocol = "unsupported"; const connectionUri = connectionUris.find((uri) => ["ws", "wss"].includes(uri.protocol)) || connectionUris.find((uri) => uri.protocol === "tcp") || unsupported; if (connectionUri.protocol === "unsupported") { throw new Error( `None of the available connection URIs are supported (${connectionUris})`, ); } const protocol = connectionUri.protocol; if (!connectionUri.username) { throw new Error("Connection URI does not contain node id"); } const id = NodeId.decode(connectionUri.username); this.reconnectDelay.set(id, this.reconnectDelay.get(id) || 1); if (id.equals(this.localNodeId)) { return; } let retried = false; try { this.logger.verbose(`[connect] ${connectionUri}`); if (protocol === "tcp") { const ip = connectionUri.host; const port = parseInt(connectionUri.port); const socket = await tcpConnect(port, ip); const peer = new TcpPeer(socket, [connectionUri]); peer.id = id; await this.onNewPeer(peer, true); } else { const locationUri = new URL(connectionUri.toString()); locationUri.username = ""; const channel = await wsConnect(locationUri.toString()); const peer = new WebSocketPeer(channel, [connectionUri]); peer.id = id; await this.onNewPeer(peer, true); } } catch (e) { if (retried) return; retried = true; this.logger.catched(e); /* if (e instanceof SocketException) { if (e.message === "Connection refused") { this.logger.warn(`[!] ${id}: ${e}`); } else { this.logger.catched(e); } } else { this.logger.catched(e); }*/ const delay = this.reconnectDelay.get(id)!; this.reconnectDelay.set(id, delay * 2); await new Promise((resolve) => setTimeout(resolve, delay * 1000)); await this.connectToNode(connectionUris); } } }