"use strict"; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; Object.defineProperty(exports, "__esModule", { value: true }); const events_1 = __importDefault(require("events")); // @ts-ignore const jsnetworkx_1 = require("jsnetworkx"); // @ts-ignore const hypercore_crypto_1 = __importDefault(require("hypercore-crypto")); const b4a_1 = __importDefault(require("b4a")); const messages_js_1 = require("./messages.js"); const dht_flood_1 = __importDefault(require("@lumeweb/dht-flood")); const loglevel_1 = require("loglevel"); const EntityType = { PUBKEY: Symbol.for("PUBKEY"), ITEM: Symbol.for("ITEM"), }; const DISCONNECT_SMOOTH = 500; class DHTCache extends events_1.default { constructor(swarm, { id = swarm.keyPair.publicKey, heartBeatInterval = 60, ...opts } = {}) { super(); if (!id) throw new TypeError("Must provide id for self"); this.id = b4a_1.default.from(id); this.bootstrapped = false; this.graph = new jsnetworkx_1.DiGraph(); this.connectedTo = new Set(); this.heartBeatInterval = heartBeatInterval; this._cache = new Set(); this._online = new Set([this._maybeHexify(this.id)]); this.swarm = swarm; this.flood = new dht_flood_1.default({ id, swarm, ...opts }); this.log = loglevel_1.getLogger("dht-cache"); this.flood.on("peer-open", (peer) => this.addPeerHandler(peer)); this.flood.on("peer-remove", (peer) => this.removePeerHandler(peer)); this.flood.on("message", (message, id) => this.onGetBroadcast(message, id)); this.swarm.on("connection", (peer) => this.send(peer, b4a_1.default.from("hello"))); [...this.swarm.peers.values()] .map((item) => { remotePublicKey: item.publicKey; }) .filter((item) => !!item) .forEach((item) => { this.addPeerHandler(item); }); this._ensurePeer(this.id); setInterval(() => this._heartbeatCheck(), 5 * 1000); setInterval(() => this._emitHeartbeat(), this.heartBeatInterval * 1000); } get cache() { return [...this._cache].sort(); } get allCache() { const items = []; for (const id of this.graph.nodesIter()) { const item = this.graph.node.get(id); if (item?.type !== EntityType.ITEM) { continue; } items.push(id); } return items.sort(); } peerHasItem(peer, item) { peer = this._maybeHexify(peer); item = this._maybeHexify(item); return this.graph.hasSuccessor(peer, item); } addItem(item) { item = this._maybeHexify(item); this._cache.add(item); this._ensureItem(item); this._addEntityConnection(this.id, item); const broadcast = () => { this._broadcastMessage({ type: messages_js_1.Type.ADD_ITEM, data: b4a_1.default.from(item, "hex"), signature: this._signItem(item), }); }; if (!this.bootstrapped) { this.once("bootstrapped", broadcast); } else { broadcast(); } } _compileMessage(message) { return messages_js_1.Message.toBinary(messages_js_1.Message.create(message)); } _broadcastMessage(message, ttl) { this.broadcast(this._compileMessage(message), ttl); } removeItem(item) { item = this._maybeHexify(item); if (!this._cache.has(item)) { return false; } this._cache.delete(item); this._removeEntity(item); const broadcast = () => { this._broadcastMessage({ type: messages_js_1.Type.REMOVE_ITEM, data: b4a_1.default.from(item, "hex"), signature: this._signItem(item), }); }; if (!this.bootstrapped) { this.once("bootstrapped", broadcast); } else { broadcast(); } return true; } get online() { return this._online; } broadcast(message, ttl) { this.flood.broadcast(message, ttl); } send(peer, message) { this.flood.send(peer, message, 0); } addPeerHandler(peer) { const id = peer.remotePublicKey; const stringId = id.toString("hex"); if (!this.connectedTo.has(stringId)) { this.connectedTo.add(stringId); } if (!this._hasSeenEntity(id)) { this._ensurePeer(id); this._addEntityConnection(this.id, id); this.emit("peer-add", id); this._recalculate(); this._broadcastMessage({ type: messages_js_1.Type.CONNECTED, id, }); this._emitHeartbeat(peer); this.log.debug(`Relay peer connected: ${stringId}`); } if (this.bootstrapped) { return; } this.log.debug(`Broadcasting bootstrap request`); // If this is the first person we've met, get their graph this._broadcastMessage({ type: messages_js_1.Type.BOOTSTRAP_REQUEST }, 0); } removePeerHandler(peer) { const id = peer.remotePublicKey; // Wait for a bit and check if we're still disconnected before removing the peer setTimeout(() => { if (this.swarm._allConnections.has(id)) { return; } this.onRemovePeer(peer); }, DISCONNECT_SMOOTH); this.log.debug(`Relay peer might have disconnected: ${id.toString("hex")}`); } onRemovePeer(peer) { const id = peer.remotePublicKey; this.connectedTo.delete(id.toString("hex")); this._removeEntityConnection(this.id, id); this.emit("peer-remove"); this._recalculate(); this._broadcastMessage({ type: messages_js_1.Type.DISCONNECTED, id, }, 0); this.log.debug(`Relay peer confirmed disconnected: ${id.toString("hex")}`); } onGetBroadcast(message, id) { let decoded; try { decoded = messages_js_1.Message.fromBinary(message); } catch { return; } const { type } = decoded; if (!type) { throw new Error("Missing Type In Message"); } if ([messages_js_1.Type.ADD_ITEM, messages_js_1.Type.REMOVE_ITEM].includes(type)) { const { data: rawData, signature } = decoded; const bufData = b4a_1.default.from(rawData); if (signature && hypercore_crypto_1.default.verify(bufData, signature, id)) { if (messages_js_1.Type.ADD_ITEM === type) { this._ensureItem(bufData); this._addEntityConnection(id, bufData); this.emit("item-added", id, bufData); this.log.debug(`New item added: ${bufData.toString("hex")} from ${id.toString("hex")}`); } if (messages_js_1.Type.REMOVE_ITEM === type) { this.removeItem(bufData); this._pruneItems(); this.emit("item-removed", id, bufData); this.log.debug(`Item removed: ${bufData.toString("hex")} from ${id.toString("hex")}`); } } } else if (type === messages_js_1.Type.CONNECTED) { const { id: toId } = decoded; let bufId = b4a_1.default.from(toId); this._addEntityConnection(id, bufId); this.emit("peer-add-seen", id, bufId); this._recalculate(); this.log.debug(`Network peer connected: ${bufId.toString("hex")}`); } else if (type === messages_js_1.Type.DISCONNECTED) { const { id: toId } = decoded; let bufId = b4a_1.default.from(toId); this._removeEntityConnection(id, b4a_1.default.from(toId)); this.emit("peer-remove-seen", id, toId); this._recalculate(); this._pruneItems(); this.log.debug(`Network peer disconnected: ${bufId.toString("hex")}`); } else if (type === messages_js_1.Type.BOOTSTRAP_REQUEST) { const bootstrap = this._getBootstrapInfo(); this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({ type: messages_js_1.Type.BOOTSTRAP_RESPONSE, bootstrap, })), 0); this.log.debug(`Bootstrap request received`); } else if (type === messages_js_1.Type.BOOTSTRAP_RESPONSE) { const { bootstrap } = decoded; this._bootstrapFrom(bootstrap); this.log.debug(`Bootstrap response received`); } else if (type === messages_js_1.Type.HEARTBEAT) { const { id: toId, signature, data: bufData } = decoded; let bufId = b4a_1.default.from(toId); if (!(signature && hypercore_crypto_1.default.verify(bufData, signature, bufId))) { return; } this.addPeerHandler({ remotePublicKey: bufId, }); this._setEntity(bufId, { heartbeat: Date.now() }); this.log.debug(`Heartbeat received from ${bufId.toString("hex")}`); } } _signItem(item) { item = this._maybeHexify(item); return hypercore_crypto_1.default.sign(b4a_1.default.from(item, "hex"), this.swarm.keyPair.secretKey); } _setEntity(id, data) { this.graph.addNode(this._maybeHexify(id), data); } _ensureItem(id) { this._ensureEntity(id, { type: EntityType.ITEM }); } _ensurePeer(id) { this._ensureEntity(id, { type: EntityType.PUBKEY }); } _ensureEntity(id, def = {}) { id = this._maybeHexify(id); if (!this._hasSeenEntity(id)) { this._setEntity(id, def); } } _hasSeenEntity(id) { return this.graph.hasNode(this._maybeHexify(id)); } _addEntityConnection(origin, destination) { this._ensureEntity(origin); this._ensureEntity(destination); this.graph.addEdge(this._maybeHexify(origin), this._maybeHexify(destination)); } _removeEntityConnection(origin, destination) { try { this._ensureEntity(origin); this._ensureEntity(destination); this.graph.removeEdge(origin.toString("hex"), destination.toString("hex")); } catch (e) { if (e.name !== "JSNetworkXError") throw e; } } _removeEntity(id) { this.graph.removeNode(this._maybeHexify(id)); } _bootstrapFrom(bootstrap) { if (this.bootstrapped) { return; } for (const id in bootstrap) { const { connectedTo } = bootstrap[id]; if (id === this.id.toString("hex")) { continue; } for (const connection of connectedTo) { const peer = b4a_1.default.from(connection); this._ensurePeer(peer); this._addEntityConnection(id, peer); } } this.bootstrapped = true; this.emit("bootstrapped"); this._recalculate(); } _getBootstrapInfo() { const state = {}; for (const id of this.graph.nodesIter()) { const item = this.graph.node.get(id); if (item?.type !== EntityType.PUBKEY) { continue; } const connectedTo = this.graph .neighbors(id) .filter((item) => this.graph.node.get(item)?.type === EntityType.PUBKEY) .map((id) => b4a_1.default.from(id, "hex")); state[id] = { connectedTo }; } return state; } // Calculate who's online and emit an event _recalculate() { const online = new Set(); const offline = new Set(); for (const id of this.graph.nodesIter()) { const item = this.graph.node.get(id); if (item?.type !== EntityType.PUBKEY) { continue; } if (jsnetworkx_1.hasPath(this.graph, { source: this._maybeHexify(this.id), target: id, })) { online.add(id); } else { offline.add(id); } } for (const id of offline) { this.graph.removeNode(id); } for (const id of online) { if (b4a_1.default.equals(b4a_1.default.from(id, "hex"), this.id)) { continue; } const item = this.graph.node.get(id); if (!item?.online) { online.delete(id); } } this._online = online; this.emit("online", online); this.log.debug(`Online list updated: ${online.size} network peers online, ${offline.size} network peers offline and removed from DAG`); } _maybeHexify(data) { if (b4a_1.default.isBuffer(data)) { return data.toString("hex"); } return data; } _pruneItems(item) { let items; if (!item) { items = this.graph.nodesIter(); } else { items = [item]; } (async () => { for (const id of items) { const item = this.graph.node.get(id); if (item?.type !== EntityType.ITEM) { continue; } (async () => { try { if (0 === this.graph.neighbors(id)?.length) { this.graph.removeNode(id); } } catch { } })(); } })(); } _heartbeatCheck() { let changed = false; for (const peer of this.connectedTo) { const pubkey = b4a_1.default.from(peer, "hex"); const node = this.graph.node.get(peer); const heartbeat = node?.heartbeat; const conn = this.swarm._allConnections.get(pubkey); const online = conn && heartbeat > 0 && Date.now() - heartbeat <= this.heartBeatInterval * 1000; if (node?.online !== online) { changed = true; } this._setEntity(peer, { online }); } if (changed) { this._recalculate(); } } _emitHeartbeat(peer) { let peers = [...this.connectedTo]; if (peer) { // @ts-ignore peers = [b4a_1.default.from(peer.remotePublicKey).toString("hex")]; } for (const peer of peers) { const pubkey = b4a_1.default.from(peer, "hex"); const conn = this.swarm._allConnections.get(pubkey); if (!conn) { continue; } const data = b4a_1.default.from(Uint8Array.from([1])); this.send(conn, this._compileMessage({ type: messages_js_1.Type.HEARTBEAT, id: this.id, data, signature: hypercore_crypto_1.default.sign(data, this.swarm.keyPair.secretKey), })); } } } exports.default = DHTCache;