diff --git a/src/constants.ts b/src/constants.ts index f97ae79..caa77ae 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -133,3 +133,4 @@ export const storageLocationTypeArchive = 0; export const storageLocationTypeFile = 3; export const storageLocationTypeFull = 5; export const storageLocationTypeBridge = 7; +export const supportedFeatures = 3; diff --git a/src/messages/handshakeOpen.ts b/src/messages/handshakeOpen.ts new file mode 100644 index 0000000..3a18024 --- /dev/null +++ b/src/messages/handshakeOpen.ts @@ -0,0 +1,33 @@ +import Packer from "#serialization/pack.js"; +import { protocolMethodHandshakeDone, supportedFeatures } from "#constants.js"; +import { S5Node } from "#node.js"; +import { Peer } from "#types.js"; +import Unpacker from "#serialization/unpack.js"; + +export default async function ( + node: S5Node, + peer: Peer, + data: Unpacker, + rawData: Uint8Array, +) { + const p = new Packer(); + p.packInt(protocolMethodHandshakeDone); + p.packBinary(data.unpackBinary()); + let peerNetworkId: string | null = null; + try { + peerNetworkId = data.unpackString(); + } catch {} + + if (this.networkId && peerNetworkId !== this.networkId) { + throw `Peer is in different network: ${peerNetworkId}`; + } + + p.packInt(supportedFeatures); + p.packInt(node.services.p2p.selfConnectionUris.length); + for (const uri of this.selfConnectionUris) { + p.packString(uri.toString()); + } + // TODO Protocol version + // p.packInt(protocolVersion); + peer.sendMessage(await this.signMessageSimple(p.takeBytes())); +} diff --git a/src/messages/index.ts b/src/messages/index.ts new file mode 100644 index 0000000..425d71c --- /dev/null +++ b/src/messages/index.ts @@ -0,0 +1,12 @@ +import { P2PMessageHandler } from "#types.js"; +import handshakeOpen from "#messages/handshakeOpen.js"; + +const messages = new Map( + Object.entries({ + protocolMethodHandshakeOpen: handshakeOpen, + }).map(([key, value]) => [Number(key), value]), +); + +Object.freeze(messages); + +export default messages; diff --git a/src/messages/registryEntry.ts b/src/messages/registryEntry.ts new file mode 100644 index 0000000..eb76168 --- /dev/null +++ b/src/messages/registryEntry.ts @@ -0,0 +1,13 @@ +import { S5Node } from "#node.js"; +import { Peer } from "#types.js"; +import Unpacker from "#serialization/unpack.js"; + +export default async function ( + node: S5Node, + peer: Peer, + data: Unpacker, + rawData: Uint8Array, +) { + const sre = node.services.registry.deserializeRegistryEntry(rawData); + await node.services.registry.set(sre, false, peer); +} diff --git a/src/messages/registryQuery.ts b/src/messages/registryQuery.ts new file mode 100644 index 0000000..b063256 --- /dev/null +++ b/src/messages/registryQuery.ts @@ -0,0 +1,16 @@ +import { S5Node } from "#node.js"; +import { Peer } from "#types.js"; +import Unpacker from "#serialization/unpack.js"; + +export default async function ( + node: S5Node, + peer: Peer, + data: Unpacker, + rawData: Uint8Array, +) { + const pk = data.unpackBinary(); + const sre = await node.services.registry.getFromDB(pk); + if (sre !== null) { + peer.sendMessage(node.services.registry.serializeRegistryEntry(sre)); + } +} diff --git a/src/messages/signedMessage.ts b/src/messages/signedMessage.ts new file mode 100644 index 0000000..fbd4ec3 --- /dev/null +++ b/src/messages/signedMessage.ts @@ -0,0 +1,20 @@ +import { S5Node } from "#node.js"; +import { Peer } from "#types.js"; +import Unpacker from "#serialization/unpack.js"; +import messages from "#messages/signedMessages/index.js"; + +export default async function ( + node: S5Node, + peer: Peer, + data: Unpacker, + rawData: Uint8Array, + verifyId = true, +) { + const sm = await node.services.p2p.unpackAndVerifySignature(data); + const u = Unpacker.fromPacked(sm.message); + const method = data.unpackInt(); + + if (method !== null && messages.has(method)) { + await messages.get(method)?.(node, peer, u, sm, verifyId); + } +} diff --git a/src/messages/signedMessages/announcePeers.ts b/src/messages/signedMessages/announcePeers.ts new file mode 100644 index 0000000..b1a70d4 --- /dev/null +++ b/src/messages/signedMessages/announcePeers.ts @@ -0,0 +1,39 @@ +import { S5Node } from "#node.js"; +import { Peer, SignedMessage } from "#types.js"; +import Unpacker from "#serialization/unpack.js"; +import { equalBytes } from "@noble/curves/abstract/utils"; +import { URL } from "url"; +import NodeId from "#nodeId.js"; + +export default async function ( + node: S5Node, + peer: Peer, + data: Unpacker, + message: SignedMessage, + verifyId: boolean, +) { + const length = data.unpackInt() as number; + for (let i = 0; i < length; i++) { + const peerIdBinary = data.unpackBinary(); + const id = new NodeId(peerIdBinary); + + const isConnected = data.unpackBool() as boolean; + + const connectionUrisCount = data.unpackInt() as number; + + const connectionUris: URL[] = []; + + for (let i = 0; i < connectionUrisCount; i++) { + connectionUris.push(new URL(data.unpackString() as string)); + } + + if (connectionUris.length > 0) { + // TODO Fully support multiple connection uris + const uri = new URL(connectionUris[0].toString()); + uri.username = id.toBase58(); + if (!this.reconnectDelay.has(NodeId.decode(uri.username).toString())) { + node.services.p2p.connectToNode([uri]); + } + } + } +} diff --git a/src/messages/signedMessages/handshakeDone.ts b/src/messages/signedMessages/handshakeDone.ts new file mode 100644 index 0000000..3e124f8 --- /dev/null +++ b/src/messages/signedMessages/handshakeDone.ts @@ -0,0 +1,65 @@ +import { S5Node } from "#node.js"; +import { Peer, SignedMessage } from "#types.js"; +import Unpacker from "#serialization/unpack.js"; +import { equalBytes } from "@noble/curves/abstract/utils"; +import { URL } from "url"; + +export default async function ( + node: S5Node, + peer: Peer, + data: Unpacker, + message: SignedMessage, + verifyId: boolean, +) { + const challenge = data.unpackBinary(); + + if (!equalBytes(peer.challenge, challenge)) { + throw "Invalid challenge"; + } + + const pId = message.nodeId; + + if (!verifyId) { + peer.id = pId; + } else { + if (!peer.id.equals(pId)) { + throw "Invalid transports id on initial list"; + } + } + + peer.isConnected = true; + + const supportedFeatures = data.unpackInt(); + + if (supportedFeatures !== 3) { + throw "Remote node does not support required features"; + } + + node.services.p2p.peers.set(peer.id.toString(), peer); + node.services.p2p.reconnectDelay.set(peer.id.toString(), 1); + + const connectionUrisCount = data.unpackInt() as number; + + peer.connectionUris = []; + for (let i = 0; i < connectionUrisCount; i++) { + peer.connectionUris.push(new URL(data.unpackString() as string)); + } + + this.logger.info( + `[+] ${peer.id.toString()} (${peer.renderLocationUri().toString()})`, + ); + + node.services.p2p.sendPublicPeersToPeer( + peer, + Array.from(node.services.p2p.peers.values()), + ); + for (const p of this._peers.values()) { + if (p.id.equals(peer.id)) { + continue; + } + + if (p.isConnected) { + this.sendPublicPeersToPeer(p, [peer]); + } + } +} diff --git a/src/messages/signedMessages/index.ts b/src/messages/signedMessages/index.ts new file mode 100644 index 0000000..e3a60fc --- /dev/null +++ b/src/messages/signedMessages/index.ts @@ -0,0 +1,12 @@ +import { P2PSignedMessageHandler } from "#types.js"; +import handshakeDone from "#messages/signedMessages/handshakeDone.js"; + +const messages = new Map( + Object.entries({ + protocolMethodHandshakeOpen: handshakeDone, + }).map(([key, value]) => [Number(key), value]), +); + +Object.freeze(messages); + +export default messages; diff --git a/src/messages/storageLocation.ts b/src/messages/storageLocation.ts new file mode 100644 index 0000000..bca6f42 --- /dev/null +++ b/src/messages/storageLocation.ts @@ -0,0 +1,77 @@ +import { S5Node } from "#node.js"; +import { Peer } from "#types.js"; +import Unpacker from "#serialization/unpack.js"; +import { Multihash } from "#multihash.js"; +import { decodeEndian } from "#util.js"; +import { mkeyEd25519 } from "#constants.js"; +import { ed25519 } from "@noble/curves/ed25519"; +import NodeId from "#nodeId.js"; +import StorageLocation from "#storage.js"; + +export default async function ( + node: S5Node, + peer: Peer, + data: Unpacker, + rawData: Uint8Array, +) { + const hash = new Multihash(rawData.subarray(1, 34)); + const type = rawData[34]; + const expiry = decodeEndian(rawData.subarray(35, 39)); + const partCount = rawData[39]; + const parts: string[] = []; + let cursor = 40; + for (let i = 0; i < partCount; i++) { + const length = decodeEndian(rawData.subarray(cursor, cursor + 2)); + cursor += 2; + parts.push( + new TextDecoder().decode(rawData.subarray(cursor, cursor + length)), + ); + cursor += length; + } + cursor++; + + const publicKey = rawData.subarray(cursor, cursor + 33); + const signature = rawData.subarray(cursor + 33); + + if (publicKey[0] !== mkeyEd25519) { + throw `Unsupported public key type ${mkeyEd25519}`; + } + + if ( + !ed25519.verify( + signature, + rawData.subarray(0, cursor), + publicKey.subarray(1), + ) + ) { + return; + } + + const nodeId = new NodeId(publicKey); + await node.addStorageLocation({ + hash, + nodeId, + location: new StorageLocation(type, parts, expiry), + message: rawData, + config: this.node.config, + }); + + const list = this.hashQueryRoutingTable.get(hash) || new Set(); + for (const peerId of list) { + if (peerId.equals(nodeId)) { + continue; + } + if (peerId.equals(peer.id)) { + continue; + } + + if (this._peers.has(peerId.toString())) { + try { + this._peers.get(peerId.toString())?.sendMessage(event); + } catch (e) { + this.logger.catched(e); + } + } + } + this.hashQueryRoutingTable.delete(hash); +} diff --git a/src/node.ts b/src/node.ts index 396aa4e..df1e950 100644 --- a/src/node.ts +++ b/src/node.ts @@ -40,11 +40,29 @@ export interface S5NodeConfig { export class S5Node { private _nodeConfig: S5NodeConfig; - private _config?: S5Config; + constructor(config: S5NodeConfig) { this._nodeConfig = config; } + private _config?: S5Config; + + get config() { + return this._config as S5Config; + } + + get services() { + return this._config?.services as S5Services; + } + + get db() { + return this._config?.db as AbstractLevel; + } + + get logger() { + return this._config?.logger as Logger; + } + public async start() { this._config = { keyPair: this._nodeConfig.keyPair, @@ -62,77 +80,63 @@ export class S5Node { await p2p.start(); } - get services() { - return this._config?.services as S5Services; - } - get config() { - return this._config as S5Config; - } - get db() { - return this._config?.db as AbstractLevel; - } - get logger() { - return this._config?.logger as Logger; - } -} - -export async function readStorageLocationsFromDB({ - hash, - config, -}: { - hash: Multihash; - config: S5Config; -}): Promise>>> { - const map = new Map>>(); - const bytes = await config.db.get(stringifyHash(hash)); - if (bytes === null) { + async readStorageLocationsFromDB( + hash: Multihash, + ): Promise>>> { + const map = new Map>>(); + const bytes = await this.db.get(stringifyHash(hash)); + if (bytes === null) { + return map; + } + const unpacker = Unpacker.fromPacked(bytes); + const mapLength = unpacker.unpackMapLength(); + for (let i = 0; i < mapLength; i++) { + const type = unpacker.unpackInt() as number; + const innerMap = new Map>(); + map.set(type, innerMap); + const innerMapLength = unpacker.unpackMapLength(); + for (let j = 0; j < innerMapLength; j++) { + const nodeId = new NodeId(unpacker.unpackBinary()); + innerMap.set( + nodeId.toString(), + new Map(unpacker.unpackMap() as [number, any][]), + ); + } + } return map; } - const unpacker = Unpacker.fromPacked(bytes); - const mapLength = unpacker.unpackMapLength(); - for (let i = 0; i < mapLength; i++) { - const type = unpacker.unpackInt() as number; - const innerMap = new Map>(); - map.set(type, innerMap); - const innerMapLength = unpacker.unpackMapLength(); - for (let j = 0; j < innerMapLength; j++) { - const nodeId = new NodeId(unpacker.unpackBinary()); - innerMap.set(nodeId, new Map(unpacker.unpackMap() as [number, any][])); - } + + async addStorageLocation({ + hash, + nodeId, + location, + message, + config, + }: { + hash: Multihash; + nodeId: NodeId; + location: StorageLocation; + message?: Uint8Array; + config: S5Config; + }) { + const map = await this.readStorageLocationsFromDB(hash); + const innerMap = + map.get(location.type) || new Map>(); + map.set(location.type, innerMap); + + const locationMap = new Map([ + [1, location.parts], + // [2, location.binaryParts], + [3, location.expiry], + [4, message], + ]); + + innerMap.set(nodeId.toString(), locationMap); + await config.cacheDb.put( + stringifyHash(hash), + new Packer().pack(map).takeBytes(), + ); } - return map; -} - -export async function addStorageLocation({ - hash, - nodeId, - location, - message, - config, -}: { - hash: Multihash; - nodeId: NodeId; - location: StorageLocation; - message?: Uint8Array; - config: S5Config; -}) { - const map = this.readStorageLocationsFromDB(hash); - const innerMap = - map.get(location.type) || new Map>(); - map.set(location.type, innerMap); - - const locationMap = new Map([ - [1, location.parts], - // [2, location.binaryParts], - [3, location.expiry], - [4, message], - ]); - - innerMap.set(nodeId, locationMap); - await config.cacheDb.put( - stringifyHash(hash), - new Packer().pack(map).takeBytes(), - ); } export function stringifyBytes(data: Uint8Array) { diff --git a/src/service/p2p.ts b/src/service/p2p.ts index 7ed03d1..698c871 100644 --- a/src/service/p2p.ts +++ b/src/service/p2p.ts @@ -1,43 +1,37 @@ import { Multihash } from "../multihash.js"; import NodeId from "../nodeId.js"; -import { equalBytes } from "@noble/curves/abstract/utils"; -import { Logger, Peer, S5Config, SignedMessage } from "../types.js"; +import { Logger, Peer, SignedMessage } from "../types.js"; import KeyPairEd25519 from "../ed25519.js"; import * as crypto from "crypto"; import { - mkeyEd25519, protocolMethodAnnouncePeers, - protocolMethodHandshakeDone, protocolMethodHandshakeOpen, protocolMethodHashQuery, protocolMethodSignedMessage, - recordTypeRegistryEntry, recordTypeStorageLocation, storageLocationTypeFull, } from "../constants.js"; import defer from "p-defer"; -import { calculateScore, decodeEndian, encodeEndian } from "#util.js"; +import { calculateScore, encodeEndian } from "#util.js"; import Packer from "#serialization/pack.js"; import Unpacker from "#serialization/unpack.js"; import { ed25519 } from "@noble/curves/ed25519"; import { AbstractLevel, AbstractSublevel } from "abstract-level"; import StorageLocation from "#storage.js"; -import { addStorageLocation, S5Node, stringifyNode } from "#node.js"; +import { S5Node, stringifyNode } from "#node.js"; import { URL } from "url"; import { Buffer } from "buffer"; import { createTransportPeer, createTransportSocket, - isTransport, } from "#transports/index.js"; +import messages from "#messages/index.js"; + export class P2PService { - private node: S5Node; private logger: Logger; private nodeKeyPair: KeyPairEd25519; private localNodeId?: NodeId; private networkId?: string; - private reconnectDelay: Map = new Map(); - private selfConnectionUris: Array = []; private nodesDb?: AbstractSublevel< AbstractLevel, Uint8Array, @@ -47,7 +41,7 @@ export class P2PService { private hashQueryRoutingTable: Map> = new Map(); constructor(node: S5Node) { - this.node = node; + this._node = node; this.networkId = node.config.p2p?.network; this.nodeKeyPair = node.config.keyPair; this.logger = node.logger; @@ -55,6 +49,24 @@ export class P2PService { node.config.services.p2p = this; } + private _node: S5Node; + + get node(): S5Node { + return this._node; + } + + private _reconnectDelay: Map = new Map(); + + get reconnectDelay(): Map { + return this._reconnectDelay; + } + + private _selfConnectionUris: Array = []; + + get selfConnectionUris(): Array { + return this._selfConnectionUris; + } + private _peers: Map = new Map(); get peers(): Map { @@ -63,11 +75,11 @@ export class P2PService { async init(): Promise { this.localNodeId = new NodeId(this.nodeKeyPair.publicKey); // Define the NodeId constructor - this.nodesDb = this.node.db.sublevel("s5-nodes", {}); + this.nodesDb = this._node.db.sublevel("s5-nodes", {}); } async start(): Promise { - const initialPeers = this.node.config?.p2p?.peers?.initial || []; + const initialPeers = this._node.config?.p2p?.peers?.initial || []; for (const p of initialPeers) { this.connectToNode([new URL(p)]); @@ -86,196 +98,14 @@ export class P2PService { const completer = defer(); - const supportedFeatures = 3; // 0b00000011 - peer.listenForMessages( async (event: Uint8Array) => { let u = Unpacker.fromPacked(event); const method = u.unpackInt(); - if (method === protocolMethodHandshakeOpen) { - const p = new Packer(); - p.packInt(protocolMethodHandshakeDone); - p.packBinary(u.unpackBinary()); - let peerNetworkId: string | null = null; - try { - peerNetworkId = u.unpackString(); - } catch {} - if (this.networkId && peerNetworkId !== this.networkId) { - throw `Peer is in different network: ${peerNetworkId}`; - } - - p.packInt(supportedFeatures); - p.packInt(this.selfConnectionUris.length); - for (const uri of this.selfConnectionUris) { - p.packString(uri.toString()); - } - // TODO Protocol version - // p.packInt(protocolVersion); - peer.sendMessage(await this.signMessageSimple(p.takeBytes())); - return; - } else if (method === recordTypeRegistryEntry) { - const sre = - this.node.services.registry.deserializeRegistryEntry(event); - await this.node.services.registry.set(sre, false, peer); - return; - } else if (method === recordTypeStorageLocation) { - const hash = new Multihash(event.subarray(1, 34)); - const type = event[34]; - const expiry = decodeEndian(event.subarray(35, 39)); - const partCount = event[39]; - const parts: string[] = []; - let cursor = 40; - for (let i = 0; i < partCount; i++) { - const length = decodeEndian(event.subarray(cursor, cursor + 2)); - cursor += 2; - parts.push( - new TextDecoder().decode(event.subarray(cursor, cursor + length)), - ); - cursor += length; - } - cursor++; - - const publicKey = event.subarray(cursor, cursor + 33); - const signature = event.subarray(cursor + 33); - - if (publicKey[0] !== mkeyEd25519) { - throw `Unsupported public key type ${mkeyEd25519}`; - } - - if ( - !ed25519.verify( - signature, - event.subarray(0, cursor), - publicKey.subarray(1), - ) - ) { - return; - } - - const nodeId = new NodeId(publicKey); - await addStorageLocation({ - hash, - nodeId, - location: new StorageLocation(type, parts, expiry), - message: event, - config: this.node.config, - }); - - const list = - this.hashQueryRoutingTable.get(hash) || new Set(); - for (const peerId of list) { - if (peerId.equals(nodeId)) { - continue; - } - if (peerId.equals(peer.id)) { - continue; - } - - if (this._peers.has(peerId.toString())) { - try { - this._peers.get(peerId.toString())?.sendMessage(event); - } catch (e) { - this.logger.catched(e); - } - } - } - this.hashQueryRoutingTable.delete(hash); + if (method !== null && messages.has(method)) { + await messages.get(method)?.(this.node, peer, u, event, verifyId); } - - if (method === protocolMethodSignedMessage) { - const sm = await this.unpackAndVerifySignature(u); - u = Unpacker.fromPacked(sm.message); - const method2 = u.unpackInt(); - - if (method2 === protocolMethodHandshakeDone) { - const challenge = u.unpackBinary(); - - if (!equalBytes(peer.challenge, challenge)) { - throw "Invalid challenge"; - } - - const pId = sm.nodeId; - - if (!verifyId) { - peer.id = pId; - } else { - if (!peer.id.equals(pId)) { - throw "Invalid transports id on initial list"; - } - } - - peer.isConnected = true; - - const supportedFeatures = u.unpackInt(); - - if (supportedFeatures !== 3) { - throw "Remote node does not support required features"; - } - - this._peers.set(peer.id.toString(), peer); - this.reconnectDelay.set(peer.id.toString(), 1); - - const connectionUrisCount = u.unpackInt() as number; - - peer.connectionUris = []; - for (let i = 0; i < connectionUrisCount; i++) { - peer.connectionUris.push(new URL(u.unpackString() as string)); - } - - this.logger.info( - `[+] ${peer.id.toString()} (${peer - .renderLocationUri() - .toString()})`, - ); - - this.sendPublicPeersToPeer(peer, Array.from(this._peers.values())); - for (const p of this._peers.values()) { - if (p.id.equals(peer.id)) continue; - - if (p.isConnected) { - this.sendPublicPeersToPeer(p, [peer]); - } - } - - return; - } else if (method2 === protocolMethodAnnouncePeers) { - const length = u.unpackInt() as number; - for (let i = 0; i < length; i++) { - const peerIdBinary = u.unpackBinary(); - const id = new NodeId(peerIdBinary); - - const isConnected = u.unpackBool() as boolean; - - const connectionUrisCount = u.unpackInt() as number; - - const connectionUris: URL[] = []; - - for (let i = 0; i < connectionUrisCount; i++) { - connectionUris.push(new URL(u.unpackString() as string)); - } - - if (connectionUris.length > 0) { - // TODO Fully support multiple connection uris - const uri = new URL(connectionUris[0].toString()); - uri.username = id.toBase58(); - if ( - !this.reconnectDelay.has( - NodeId.decode(uri.username).toString(), - ) - ) { - this.connectToNode([uri]); - } - } - } - } - } /* else if (method === protocolMethodRegistryQuery) { - const pk = u.unpackBinary(); - const sre = node.registry.getFromDB(pk); - if (sre !== null) { - transports.sendMessage(node.registry.serializeRegistryEntry(sre)); - } - }*/ }, { onDone: async () => { @@ -466,9 +296,9 @@ export class P2PService { const id = NodeId.decode(connectionUri.username); - this.reconnectDelay.set( + this._reconnectDelay.set( id.toString(), - this.reconnectDelay.get(id.toString()) || 1, + this._reconnectDelay.get(id.toString()) || 1, ); if (id.equals(this.localNodeId)) { @@ -491,8 +321,8 @@ export class P2PService { this.logger.catched(e); - const delay = this.reconnectDelay.get(id.toString())!; - this.reconnectDelay.set(id.toString(), delay * 2); + const delay = this._reconnectDelay.get(id.toString())!; + this._reconnectDelay.set(id.toString(), delay * 2); await new Promise((resolve) => setTimeout(resolve, delay * 1000)); await this.connectToNode(connectionUris, retried); diff --git a/src/service/registry.ts b/src/service/registry.ts index 498ff85..94378a9 100644 --- a/src/service/registry.ts +++ b/src/service/registry.ts @@ -68,7 +68,7 @@ export class RegistryService { throw new Error("Data too long"); } - const isValid = verifyRegistryEntry(sre); + const isValid = this.verifyRegistryEntry(sre); if (!isValid) { throw new Error("Invalid signature found"); } @@ -81,7 +81,7 @@ export class RegistryService { if (existingEntry.revision === sre.revision) { return; } else if (existingEntry.revision > sre.revision) { - const updateMessage = serializeRegistryEntry(existingEntry); + const updateMessage = this.serializeRegistryEntry(existingEntry); receivedFrom.sendMessage(updateMessage); return; } @@ -95,7 +95,7 @@ export class RegistryService { const key = new Multihash(sre.pk); this.streams.get(key.toString())?.emit("event", sre); - this.db?.put(stringifyBytes(sre.pk), serializeRegistryEntry(sre)); + this.db?.put(stringifyBytes(sre.pk), this.serializeRegistryEntry(sre)); this.broadcastEntry(sre, receivedFrom); } @@ -104,7 +104,7 @@ export class RegistryService { // TODO: If there are more than X peers, only broadcast to subscribed nodes (routing table) and shard-nodes (256) broadcastEntry(sre: SignedRegistryEntry, receivedFrom?: Peer): void { this.logger.verbose("[registry] broadcastEntry"); - const updateMessage = serializeRegistryEntry(sre); + const updateMessage = this.serializeRegistryEntry(sre); for (const p of Object.values(this.node.services.p2p.peers)) { if (receivedFrom == null || p.id !== receivedFrom.id) { @@ -216,25 +216,25 @@ export class RegistryService { signature: event.slice(43 + dataLength), }; } -} -function verifyRegistryEntry(sre: SignedRegistryEntry): boolean { - const list: Uint8Array = Uint8Array.from([ - recordTypeRegistryEntry, - ...encodeEndian(sre.revision, 8), - sre.data.length, // 1 byte - ...sre.data, - ]); + public verifyRegistryEntry(sre: SignedRegistryEntry): boolean { + const list: Uint8Array = Uint8Array.from([ + recordTypeRegistryEntry, + ...encodeEndian(sre.revision, 8), + sre.data.length, // 1 byte + ...sre.data, + ]); - return ed25519.verify(list, sre.signature, sre.pk.slice(1)); -} -function serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array { - return Uint8Array.from([ - recordTypeRegistryEntry, - ...sre.pk, - ...encodeEndian(sre.revision, 8), - sre.data.length, - ...sre.data, - ...sre.signature, - ]); + return ed25519.verify(list, sre.signature, sre.pk.slice(1)); + } + public serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array { + return Uint8Array.from([ + recordTypeRegistryEntry, + ...sre.pk, + ...encodeEndian(sre.revision, 8), + sre.data.length, + ...sre.data, + ...sre.signature, + ]); + } } diff --git a/src/types.ts b/src/types.ts index 47e7815..30ea65a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -3,6 +3,8 @@ import KeyPairEd25519 from "#ed25519.js"; import { AbstractLevel } from "abstract-level"; import { P2PService } from "./service/p2p.js"; import { RegistryService } from "./service/registry.js"; +import { S5Node } from "#node.js"; +import Unpacker from "#serialization/unpack.js"; export interface Peer { id: NodeId; @@ -63,3 +65,19 @@ export interface SignedMessage { nodeId: NodeId; message: Uint8Array; } + +export type P2PMessageHandler = ( + node: S5Node, + peer: Peer, + data: Unpacker, + rawData: Uint8Array, + verifyId: boolean, +) => Promise; + +export type P2PSignedMessageHandler = ( + node: S5Node, + peer: Peer, + data: Unpacker, + message: SignedMessage, + verifyId: boolean, +) => Promise;