import EventEmitter from "events"; // @ts-ignore import { DiGraph, hasPath } from "jsnetworkx"; // @ts-ignore import orderedJSON from "ordered-json"; // @ts-ignore import crypto from "hypercore-crypto"; import b4a from "b4a"; import { Message, State, Type } from "./messages.js"; // @ts-ignore import sodium from "sodium-universal"; import type { PartialMessage } from "@protobuf-ts/runtime"; import DHTFlood from "@lumeweb/dht-flood"; import type { Logger } from "pino"; type Bootstrap = { [key: string]: State; }; const EntityType = { PUBKEY: Symbol.for("PUBKEY"), ITEM: Symbol.for("ITEM"), }; const DISCONNECT_SMOOTH = 500; export default class DHTCache extends EventEmitter { protected swarm: any; private id: Buffer; private bootstrapped: boolean; private graph: any; private connectedTo: Set; private heartBeatInterval: number; protected flood: DHTFlood; private log: Logger; constructor( swarm: any, { id = swarm.keyPair.publicKey, logger, heartBeatInterval = 60, ...opts }: { id?: Buffer; logger?: Logger; [key: string]: any } = {} ) { super(); if (!id) { throw new TypeError("Must provide id for self"); } if (!logger) { throw new TypeError("Must logger for self"); } this.id = b4a.from(id) as Buffer; this.bootstrapped = false; this.graph = new DiGraph(); this.connectedTo = new Set(); this.heartBeatInterval = heartBeatInterval; this._cache = new Set(); this._online = new Set([this._maybeHexify(this.id)]); this.swarm = swarm; this.flood = new DHTFlood({ id, swarm, ...opts }); this.log = logger; this.flood.on("peer-open", (peer) => this.addPeerHandler(peer)); this.flood.on("peer-remove", (peer) => this.removePeerHandler(peer)); this.flood.on("message", (message, id) => this.onGetBroadcast(message, id)); this.swarm.on("connection", this._hello.bind(this)); [...this.swarm.peers.values()] .map((item) => { remotePublicKey: item.publicKey; }) .filter((item: any) => !!item) .forEach((item) => { this.addPeerHandler(item); }); this._ensurePeer(this.id); setInterval( () => this._heartbeatCheck(), (this.heartBeatInterval / 4) * 1000 ); setInterval(() => this._emitHeartbeat(), this.heartBeatInterval * 1000); } private _cache: Set; public get cache(): string[] { return [...this._cache].sort(); } public get allCache(): string[] { const items = []; for (const id of this.graph.nodesIter()) { const item = this.graph.node.get(id); if (item?.type !== EntityType.ITEM) { continue; } items.push(id); } return items.sort(); } public peerHasItem(peer: string | Buffer, item: string | Buffer): boolean { peer = this._maybeHexify(peer); item = this._maybeHexify(item); return this.graph.hasSuccessor(peer, item); } public addItem(item: string | Buffer) { item = this._maybeHexify(item); this._cache.add(item); this._ensureItem(item); this._addEntityConnection(this.id, item); const broadcast = () => { this._broadcastMessage(this._createAddItemRequest(item)); }; if (!this.bootstrapped) { this.once("bootstrapped", broadcast); } else { broadcast(); } } private _compileMessage(message: PartialMessage): Uint8Array { return Message.toBinary(Message.create(message)); } private _broadcastMessage(message: PartialMessage, ttl?: number) { this.broadcast(this._compileMessage(message), ttl); } public removeItem(item: string | Buffer): boolean { item = this._maybeHexify(item); if (!this._cache.has(item)) { return false; } this._cache.delete(item); this._removeEntity(item); const broadcast = () => { this._broadcastMessage({ type: Type.REMOVE_ITEM, data: b4a.from(item as string, "hex"), signature: this._signItem(item), }); }; if (!this.bootstrapped) { this.once("bootstrapped", broadcast); } else { broadcast(); } return true; } private _online: Set; public get online(): Set { return this._online; } public broadcast(message: any, ttl?: number) { this.flood.broadcast(message, ttl); } public send(peer: any, message: any) { this.flood.send(peer, message, 0); } protected addPeerHandler(peer: any) { const id = peer.remotePublicKey; const stringId = id.toString("hex"); if (!this.connectedTo.has(stringId)) { this.connectedTo.add(stringId); } if (!this._hasSeenEntity(id)) { this._ensurePeer(id); this._addEntityConnection(this.id, id); this.emit("peer-add", id); this._recalculate(); this._broadcastMessage({ type: Type.CONNECTED, id, }); this.send(peer, this._compileMessage({ type: Type.BOOTSTRAP_REQUEST })); this._emitHeartbeat(peer); this.log.debug(`Relay peer connected: ${stringId}`); } if (this.bootstrapped) { this._sendItemsToPeer(peer); return; } this.log.debug(`Broadcasting bootstrap request`); // If this is the first person we've met, get their graph this._broadcastMessage({ type: Type.BOOTSTRAP_REQUEST }, 0); } removePeerHandler(peer: any) { const id = peer.remotePublicKey; // Wait for a bit and check if we're still disconnected before removing the peer setTimeout(() => { if (this.swarm._allConnections.has(id)) { return; } this.onRemovePeer(peer); }, DISCONNECT_SMOOTH); this.log.debug(`Relay peer might have disconnected: ${id.toString("hex")}`); } protected onRemovePeer(peer: any) { const id = peer.remotePublicKey; this.connectedTo.delete(id.toString("hex")); this._removeEntityConnection(this.id, id); this.emit("peer-remove"); this._recalculate(); this._broadcastMessage( { type: Type.DISCONNECTED, id, }, 0 ); this.log.debug(`Relay peer confirmed disconnected: ${id.toString("hex")}`); } protected onGetBroadcast(message: Buffer, id: Buffer) { let decoded; try { decoded = Message.fromBinary(message); } catch { return; } const { type } = decoded; if (!type) { throw new Error("Missing Type In Message"); } if ([Type.ADD_ITEM, Type.REMOVE_ITEM].includes(type)) { const { data: rawData, signature } = decoded; const bufData = b4a.from(rawData as Uint8Array) as Buffer; if (signature && crypto.verify(bufData, signature, id)) { if (Type.ADD_ITEM === type) { this._ensureItem(bufData); this._addEntityConnection(id, bufData); this.emit("item-added", id, bufData); this.log.debug( `New item added: ${bufData.toString("hex")} from ${id.toString( "hex" )}` ); } if (Type.REMOVE_ITEM === type) { this.removeItem(bufData); this._pruneItems(); this.emit("item-removed", id, bufData); this.log.debug( `Item removed: ${bufData.toString("hex")} from ${id.toString( "hex" )}` ); } } } else if (type === Type.CONNECTED) { const { id: toId } = decoded; let bufId = b4a.from(toId as Uint8Array) as Buffer; this._ensurePeer(id); this._ensurePeer(bufId); this._addEntityConnection(id, bufId); this.emit("peer-add-seen", id, bufId); this._recalculate(); this.log.debug(`Network peer connected: ${bufId.toString("hex")}`); } else if (type === Type.DISCONNECTED) { const { id: toId } = decoded; let bufId = b4a.from(toId as Uint8Array) as Buffer; this._removeEntityConnection(id, b4a.from(toId as Uint8Array) as Buffer); this.emit("peer-remove-seen", id, toId); this._recalculate(); this._pruneItems(); this.log.debug(`Network peer disconnected: ${bufId.toString("hex")}`); } else if (type === Type.BOOTSTRAP_REQUEST) { const bootstrap = this._getBootstrapInfo(); this.broadcast( Message.toBinary( Message.create({ type: Type.BOOTSTRAP_RESPONSE, bootstrap, }) ), 0 ); this.log.debug(`Bootstrap request received`); } else if (type === Type.BOOTSTRAP_RESPONSE) { const { bootstrap } = decoded; this._bootstrapFrom(bootstrap); if (this.swarm._allConnections.has(id)) { this._sendItemsToPeer(this.swarm._allConnections.get(id)); } this.log.debug(`Bootstrap response received`); } else if (type === Type.HEARTBEAT) { const { id: toId, signature, data: bufData } = decoded; let bufId = b4a.from(toId as Uint8Array) as Buffer; if (!(signature && crypto.verify(bufData, signature, bufId))) { return; } this.addPeerHandler({ remotePublicKey: bufId, }); this._setEntity(bufId as Buffer, { heartbeat: Date.now() }); this._heartbeatCheck(); this.log.debug(`Heartbeat received from ${bufId.toString("hex")}`); } } private _signItem(item: string | Buffer): Buffer { item = this._maybeHexify(item); return crypto.sign(b4a.from(item, "hex"), this.swarm.keyPair.secretKey); } private _setEntity(id: Buffer | string, data: any): void { this.graph.addNode(this._maybeHexify(id), data); } private _ensureItem(id: Buffer | string) { this._ensureEntity(id, { type: EntityType.ITEM }); } private _ensurePeer(id: Buffer | string) { this._ensureEntity(id, { type: EntityType.PUBKEY }); } private _ensureEntity(id: Buffer | string, def = {}) { id = this._maybeHexify(id); if (!this._hasSeenEntity(id)) { this._setEntity(id, def); } } private _hasSeenEntity(id: Buffer | string): boolean { return this.graph.hasNode(this._maybeHexify(id)); } private _addEntityConnection( origin: Buffer | string, destination: Buffer | string ) { this._ensureEntity(origin); this._ensureEntity(destination); this.graph.addEdge( this._maybeHexify(origin), this._maybeHexify(destination) ); } private _removeEntityConnection(origin: Buffer, destination: Buffer) { try { this._ensureEntity(origin); this._ensureEntity(destination); this.graph.removeEdge( origin.toString("hex"), destination.toString("hex") ); } catch (e) { if ((e as Error).name !== "JSNetworkXError") throw e; } } private _removeEntity(id: Buffer | string) { this.graph.removeNode(this._maybeHexify(id)); } private _bootstrapFrom(bootstrap: Bootstrap) { for (const id in bootstrap) { const { connectedTo } = bootstrap[id]; if (id === this.id.toString("hex")) { continue; } if (!this.connectedTo.has(id)) { this.swarm.joinPeer(b4a.from(id, "hex")); } for (const connection of connectedTo) { const peer = b4a.from(connection) as Buffer; if (b4a.equals(peer, this.id)) { continue; } this._ensurePeer(peer); this._addEntityConnection(id, peer); if (!this.connectedTo.has(peer.toString("hex"))) { this.swarm.joinPeer(peer); } } } if (!this.bootstrapped) { this.bootstrapped = true; this.emit("bootstrapped"); } this._recalculate(); } private _getBootstrapInfo() { const state: Bootstrap = {}; for (const id of this.graph.nodesIter()) { const item = this.graph.node.get(id); if (item?.type !== EntityType.PUBKEY) { continue; } const connectedTo = this.graph .neighbors(id) .filter( (item: any) => this.graph.node.get(item)?.type === EntityType.PUBKEY ) .map((id: string) => b4a.from(id, "hex")); state[id] = { connectedTo }; } return state; } // Calculate who's online and emit an event private _recalculate() { const online = new Set(); const offline = new Set(); for (const id of this.graph.nodesIter()) { const item = this.graph.node.get(id); if (item?.type !== EntityType.PUBKEY) { continue; } if ( hasPath(this.graph, { source: this._maybeHexify(this.id), target: id, }) ) { online.add(id); } else { offline.add(id); } } for (const id of offline) { this.graph.removeNode(id); } for (const id of online) { if (b4a.equals(b4a.from(id, "hex"), this.id)) { continue; } const item = this.graph.node.get(id); if (!item?.online) { online.delete(id); } } this._online = online; this.emit("online", online); this.log.debug( `Online list updated: ${online.size - 1} network peers online, ${ offline.size } network peers offline and removed from DAG` ); } private _maybeHexify(data: Buffer | string): string { if (b4a.isBuffer(data)) { return data.toString("hex"); } return data; } private _pruneItems(item?: Buffer) { let items: string[] | Buffer[]; if (!item) { items = this.graph.nodesIter(); } else { items = [item]; } (async () => { for (const id of items) { const item = this.graph.node.get(id); if (item?.type !== EntityType.ITEM) { continue; } (async () => { try { if (0 === this.graph.neighbors(id)?.length) { this.graph.removeNode(id); } } catch {} })(); } })(); } private _heartbeatCheck() { let changed = false; for (const peer of this.connectedTo) { const pubkey = b4a.from(peer, "hex"); const node = this.graph.node.get(peer); const heartbeat = node?.heartbeat; const conn = this.swarm._allConnections.get(pubkey); const online = conn && heartbeat > 0 && Date.now() - heartbeat <= this.heartBeatInterval * 1000; if (node?.online !== online) { changed = true; } this._setEntity(peer, { online }); } if (changed) { this._recalculate(); } } private _emitHeartbeat(peer?: any) { let peers = [...this.connectedTo]; if (peer) { // @ts-ignore peers = [b4a.from(peer.remotePublicKey).toString("hex")]; } for (const peer of peers) { const pubkey = b4a.from(peer, "hex"); const conn = this.swarm._allConnections.get(pubkey); if (!conn) { continue; } const data = b4a.from(Uint8Array.from([1])); this.send( conn, this._compileMessage({ type: Type.HEARTBEAT, id: this.id, data, signature: crypto.sign(data, this.swarm.keyPair.secretKey), }) ); } } private _hello(peer: any) { this.send(peer, b4a.from("hello")); this.swarm.leavePeer(peer.remotePublicKey); } private _sendItemsToPeer(peer: any) { for (let item of this.cache) { item = this._maybeHexify(item); this.send(peer, this._createAddItemRequestMessage(item)); } } private _createAddItemRequestMessage(item: string | Buffer): Uint8Array { item = this._maybeHexify(item); return Message.toBinary(Message.create(this._createAddItemRequest(item))); } private _createAddItemRequest( item: string | Buffer ): PartialMessage { return { type: Type.ADD_ITEM, data: b4a.from(item as string, "hex"), signature: this._signItem(item), }; } }