From bae0efa3b04f3996fe29af9b610cdc7457004f8b Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Wed, 23 Nov 2022 20:39:16 -0500 Subject: [PATCH] *Major refactor to pivot cache design based on items, but store all items and peers as nodes in graph with a type, and update communication structs for it --- dist/DHTDataBase.d.ts | 38 ---- dist/DHTDataBase.d.ts.map | 1 - dist/DHTDataBase.js | 275 -------------------------- dist/index.d.ts | 14 -- dist/index.d.ts.map | 1 - dist/index.js | 39 ---- dist/messages.d.ts | 110 ----------- dist/messages.d.ts.map | 1 - dist/messages.js | 208 ------------------- messages.proto | 9 +- package.json | 2 +- src/DHTDataBase.ts | 366 ---------------------------------- src/index.ts | 407 ++++++++++++++++++++++++++++++++++++-- src/messages.ts | 66 ++----- test.js | 25 +-- 15 files changed, 422 insertions(+), 1140 deletions(-) delete mode 100644 dist/DHTDataBase.d.ts delete mode 100644 dist/DHTDataBase.d.ts.map delete mode 100644 dist/DHTDataBase.js delete mode 100644 dist/index.d.ts delete mode 100644 dist/index.d.ts.map delete mode 100644 dist/index.js delete mode 100644 dist/messages.d.ts delete mode 100644 dist/messages.d.ts.map delete mode 100644 dist/messages.js delete mode 100644 src/DHTDataBase.ts diff --git a/dist/DHTDataBase.d.ts b/dist/DHTDataBase.d.ts deleted file mode 100644 index a9620d8..0000000 --- a/dist/DHTDataBase.d.ts +++ /dev/null @@ -1,38 +0,0 @@ -/// -/// -import EventEmitter from "events"; -export default class DHTDataBase extends EventEmitter { - protected swarm: any; - private id; - private bootstrapped; - private graph; - private connectedTo; - constructor(id: Buffer, { swarm }?: { - swarm?: any; - }); - private _data; - get data(): {}; - set data(value: {}); - private _online; - get online(): string[]; - broadcast(data: any, ttl?: number): void; - getPeerRaw(id: Buffer | string): any; - getPeerField(id: Buffer | string, field: string): any; - getPeerData(id: Buffer | string): any; - getPeerTimestamp(id: Buffer | string): any; - getPeerSignature(id: Buffer | string): any; - protected onAddPeer(id: Buffer): void; - protected onRemovePeer(id: Buffer): void; - protected onGetBroadcast(message: Buffer, id: Buffer): void; - private _broadcastData; - private _hasSeenPeer; - private _setPeer; - private _ensurePeer; - private _addPeerConnection; - private _removePeerConnection; - private _bootstrapFrom; - private _getBootstrapInfo; - private _recalculate; - private _maybeHexify; -} -//# sourceMappingURL=DHTDataBase.d.ts.map \ No newline at end of file diff --git a/dist/DHTDataBase.d.ts.map b/dist/DHTDataBase.d.ts.map deleted file mode 100644 index 08e2d18..0000000 --- a/dist/DHTDataBase.d.ts.map +++ /dev/null @@ -1 +0,0 @@ -{"version":3,"file":"DHTDataBase.d.ts","sourceRoot":"","sources":["../src/DHTDataBase.ts"],"names":[],"mappings":";;AAAA,OAAO,YAAY,MAAM,QAAQ,CAAC;AAiBlC,MAAM,CAAC,OAAO,OAAO,WAAY,SAAQ,YAAY;IACnD,SAAS,CAAC,KAAK,EAAE,GAAG,CAAC;IACrB,OAAO,CAAC,EAAE,CAAS;IACnB,OAAO,CAAC,YAAY,CAAU;IAC9B,OAAO,CAAC,KAAK,CAAM;IACnB,OAAO,CAAC,WAAW,CAAW;gBAElB,EAAE,EAAE,MAAM,EAAE,EAAE,KAAK,EAAE,GAAE;QAAE,KAAK,CAAC,EAAE,GAAG,CAAA;KAAO;IAavD,OAAO,CAAC,KAAK,CAAK;IAElB,IAAI,IAAI,IAAI,EAAE,CAEb;IAED,IAAI,IAAI,CAAC,KAAK,EAAE,EAAE,EAcjB;IAED,OAAO,CAAC,OAAO,CAAW;IAE1B,IAAI,MAAM,IAAI,MAAM,EAAE,CAErB;IAED,SAAS,CAAC,IAAI,EAAE,GAAG,EAAE,GAAG,CAAC,EAAE,MAAM;IAIjC,UAAU,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAI9B,YAAY,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM,EAAE,KAAK,EAAE,MAAM;IAI/C,WAAW,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAI/B,gBAAgB,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAIpC,gBAAgB,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAIpC,SAAS,CAAC,SAAS,CAAC,EAAE,EAAE,MAAM;IAqC9B,SAAS,CAAC,YAAY,CAAC,EAAE,EAAE,MAAM;IAiBjC,SAAS,CAAC,cAAc,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE,EAAE,MAAM;IA2DpD,OAAO,CAAC,cAAc;IAmBtB,OAAO,CAAC,YAAY;IAIpB,OAAO,CAAC,QAAQ;IAahB,OAAO,CAAC,WAAW;IAOnB,OAAO,CAAC,kBAAkB;IAY1B,OAAO,CAAC,qBAAqB;IAa7B,OAAO,CAAC,cAAc;IAgDtB,OAAO,CAAC,iBAAiB;IAgBzB,OAAO,CAAC,YAAY;IAwBpB,OAAO,CAAC,YAAY;CAOrB"} \ No newline at end of file diff --git a/dist/DHTDataBase.js b/dist/DHTDataBase.js deleted file mode 100644 index 765c2e9..0000000 --- a/dist/DHTDataBase.js +++ /dev/null @@ -1,275 +0,0 @@ -"use strict"; -var __importDefault = (this && this.__importDefault) || function (mod) { - return (mod && mod.__esModule) ? mod : { "default": mod }; -}; -Object.defineProperty(exports, "__esModule", { value: true }); -const events_1 = __importDefault(require("events")); -// @ts-ignore -const jsnetworkx_1 = require("jsnetworkx"); -// @ts-ignore -const ordered_json_1 = __importDefault(require("ordered-json")); -// @ts-ignore -const hypercore_crypto_1 = __importDefault(require("hypercore-crypto")); -const b4a_1 = __importDefault(require("b4a")); -const messages_js_1 = require("./messages.js"); -const debug_1 = __importDefault(require("debug")); -class DHTDataBase extends events_1.default { - swarm; - id; - bootstrapped; - graph; - connectedTo; - constructor(id, { swarm } = {}) { - super(); - if (!id) - throw new TypeError("Must provide id for self"); - this.id = id; - this.bootstrapped = false; - this.graph = new jsnetworkx_1.DiGraph(); - this.connectedTo = new Set(); - this._data = {}; - this._online = [this._maybeHexify(this.id)]; - this.swarm = swarm; - } - _data; - get data() { - return { ...this._data }; - } - set data(value) { - this._data = value; - const timestamp = BigInt(Date.now()); - const rawData = ordered_json_1.default.stringify(value); - const signature = hypercore_crypto_1.default.sign(b4a_1.default.from(`${timestamp}${rawData}`), this.swarm.keyPair.secretKey); - this._setPeer(this.id, value, timestamp, signature); - this._broadcastData(); - } - _online; - get online() { - return this._online; - } - broadcast(data, ttl) { - throw new TypeError("Broadcast has not been implemented"); - } - getPeerRaw(id) { - return this.graph.node.get(this._maybeHexify(id)); - } - getPeerField(id, field) { - return this.getPeerRaw(id)?.[field]; - } - getPeerData(id) { - return this.getPeerField(id, "data"); - } - getPeerTimestamp(id) { - return this.getPeerField(id, "timestamp"); - } - getPeerSignature(id) { - return this.getPeerField(id, "signature"); - } - onAddPeer(id) { - const stringId = id.toString("hex"); - if (this.connectedTo.has(stringId)) { - return; - } // Already know we're connected here - this.connectedTo.add(stringId); - this._addPeerConnection(this.id, id); - this.emit("peer-add", id); - this._recalculate(); - this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({ - type: messages_js_1.Type.CONNECTED, - id, - }))); - this._broadcastData(); - if (this.bootstrapped) { - return; - } - // If this is the first person we've met, get their graph - this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({ - type: messages_js_1.Type.BOOTSTRAP_REQUEST, - })), 0); - } - onRemovePeer(id) { - this.connectedTo.delete(id.toString("hex")); - this._removePeerConnection(this.id, id); - this.emit("peer-remove"); - this._recalculate(); - this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({ - type: messages_js_1.Type.DISCONNECTED, - id, - }))); - } - onGetBroadcast(message, id) { - let decoded; - try { - decoded = messages_js_1.Message.fromBinary(message); - } - catch { - return; - } - const { type } = decoded; - if (!type) { - throw new Error("Missing Type In Message"); - } - if (type === messages_js_1.Type.STATE) { - const { data: rawData, timestamp, signature } = decoded; - if (signature && - hypercore_crypto_1.default.verify(b4a_1.default.from(`${timestamp}${rawData}`), signature, id)) { - if ((timestamp || 0) <= this.getPeerTimestamp(id)) { - (0, debug_1.default)(`Received old data for peer ${id}`); - return; - } - const data = rawData ? ordered_json_1.default.parse(rawData) : null; - this._setPeer(id, data, timestamp, signature); - this.emit("peer-data", data, id); - this._recalculate(); - return; - } - (0, debug_1.default)(`Invalid signature received for peer ${id}`); - } - else if (type === messages_js_1.Type.CONNECTED) { - const { id: toId } = decoded; - this._addPeerConnection(id, Buffer.from(toId)); - this.emit("peer-add-seen", id, toId); - this._recalculate(); - } - else if (type === messages_js_1.Type.DISCONNECTED) { - const { id: toId } = decoded; - this._removePeerConnection(id, Buffer.from(toId)); - this.emit("peer-remove-seen", id, toId); - this._recalculate(); - } - else if (type === messages_js_1.Type.BOOTSTRAP_REQUEST) { - const bootstrap = this._getBootstrapInfo(); - this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({ - type: messages_js_1.Type.BOOTSTRAP_RESPONSE, - bootstrap, - })), 0); - } - else if (type === messages_js_1.Type.BOOTSTRAP_RESPONSE) { - const { bootstrap } = decoded; - this._bootstrapFrom(bootstrap); - } - } - _broadcastData() { - const rawData = this._data; - if (!Object.keys(rawData).length) { - return; - } - const data = ordered_json_1.default.stringify(rawData); - const { timestamp, signature } = this.getPeerRaw(this.id); - this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({ - type: messages_js_1.Type.STATE, - data: b4a_1.default.from(data), - signature, - timestamp, - }))); - } - _hasSeenPeer(id) { - return this.graph.hasNode(this._maybeHexify(id)); - } - _setPeer(id, data, timestamp, signature) { - this.graph.addNode(this._maybeHexify(id), { - timestamp, - signature, - data, - }); - } - _ensurePeer(id) { - id = this._maybeHexify(id); - if (!this._hasSeenPeer(id)) { - this._setPeer(id, {}); - } - } - _addPeerConnection(origin, destination) { - this._ensurePeer(origin); - this._ensurePeer(destination); - this.graph.addEdge(this._maybeHexify(origin), this._maybeHexify(destination)); - } - _removePeerConnection(origin, destination) { - try { - this._ensurePeer(origin); - this._ensurePeer(destination); - this.graph.removeEdge(origin.toString("hex"), destination.toString("hex")); - } - catch (e) { - if (e.name !== "JSNetworkXError") - throw e; - } - } - _bootstrapFrom(bootstrap) { - if (this.bootstrapped) { - return; - } - for (const id in bootstrap) { - const { data, connectedTo, signature, timestamp } = bootstrap[id]; - if (id === this.id.toString("hex")) { - continue; - } - if (signature && - hypercore_crypto_1.default.verify(b4a_1.default.from(`${timestamp}${data}`), signature, b4a_1.default.from(id, "hex"))) { - const parsedData = data ? ordered_json_1.default.parse(data) : null; - let peerData = parsedData || {}; - // If we're already tracking them - if (this._hasSeenPeer(id)) { - // Ensure we don't have old data - if ((timestamp || 0) > this.getPeerTimestamp(id)) { - // See what data we already have for them - // Add their existing data to what we got from the bootstrap - const existingPeerData = this.getPeerData(id); - peerData = { ...existingPeerData, ...peerData }; - this._setPeer(id, peerData, timestamp, signature); - } - else { - (0, debug_1.default)(`Received old data for peer ${id}`); - } - } - } - else { - (0, debug_1.default)(`Invalid signature received for peer ${id}`); - } - for (const connection of connectedTo) { - this._addPeerConnection(id, Buffer.from(connection)); - } - } - this.emit("bootstrapped"); - this._recalculate(); - } - _getBootstrapInfo() { - const state = {}; - for (const [id, rawData] of this.graph.nodes(true)) { - const connectedTo = this.graph - .neighbors(id) - .map((id) => Buffer.from(id, "hex")); - const data = rawData ? ordered_json_1.default.stringify(rawData?.data) : null; - const { timestamp = undefined, signature = undefined } = rawData; - state[id] = { data: b4a_1.default.from(data), connectedTo, timestamp, signature }; - } - return state; - } - // Calculate who's online and emit an event - _recalculate() { - const online = this.graph.nodes().filter((id) => { - return (0, jsnetworkx_1.hasPath)(this.graph, { - source: this._maybeHexify(this.id), - target: id, - }); - }); - const offline = this.graph.nodes().filter((id) => { - return !(0, jsnetworkx_1.hasPath)(this.graph, { - source: this._maybeHexify(this.id), - target: id, - }); - }); - for (const id of offline) { - this.graph.removeNode(id); - } - this._online = online; - this.emit("online", online); - } - _maybeHexify(data) { - if (b4a_1.default.isBuffer(data)) { - return data.toString("hex"); - } - return data; - } -} -exports.default = DHTDataBase; diff --git a/dist/index.d.ts b/dist/index.d.ts deleted file mode 100644 index fcf8473..0000000 --- a/dist/index.d.ts +++ /dev/null @@ -1,14 +0,0 @@ -/// -import DHTDataBase from "./DHTDataBase.js"; -export default class DHTData extends DHTDataBase { - private flood; - constructor(swarm: any, { id, data, ...opts }?: { - id?: Buffer; - data?: {}; - [key: string]: any; - }); - handlePeerAdd(peer: any): void; - handlePeerRemove(peer: any): void; - broadcast(message: any, ttl?: number): void; -} -//# sourceMappingURL=index.d.ts.map \ No newline at end of file diff --git a/dist/index.d.ts.map b/dist/index.d.ts.map deleted file mode 100644 index b0179e5..0000000 --- a/dist/index.d.ts.map +++ /dev/null @@ -1 +0,0 @@ -{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":";AAAA,OAAO,WAAW,MAAM,kBAAkB,CAAC;AAK3C,MAAM,CAAC,OAAO,OAAO,OAAQ,SAAQ,WAAW;IAC9C,OAAO,CAAC,KAAK,CAAW;gBAGtB,KAAK,EAAE,GAAG,EACV,EACE,EAA4B,EAC5B,IAAS,EACT,GAAG,IAAI,EACR,GAAE;QAAE,EAAE,CAAC,EAAE,MAAM,CAAC;QAAC,IAAI,CAAC,EAAE,EAAE,CAAC;QAAC,CAAC,GAAG,EAAE,MAAM,GAAG,GAAG,CAAA;KAAO;IAkBxD,aAAa,CAAC,IAAI,EAAE,GAAG;IAKvB,gBAAgB,CAAC,IAAI,EAAE,GAAG;IAW1B,SAAS,CAAC,OAAO,EAAE,GAAG,EAAE,GAAG,CAAC,EAAE,MAAM;CAGrC"} \ No newline at end of file diff --git a/dist/index.js b/dist/index.js deleted file mode 100644 index c1e04dc..0000000 --- a/dist/index.js +++ /dev/null @@ -1,39 +0,0 @@ -"use strict"; -var __importDefault = (this && this.__importDefault) || function (mod) { - return (mod && mod.__esModule) ? mod : { "default": mod }; -}; -Object.defineProperty(exports, "__esModule", { value: true }); -const DHTDataBase_js_1 = __importDefault(require("./DHTDataBase.js")); -const dht_flood_1 = __importDefault(require("@lumeweb/dht-flood")); -const DISCONNECT_SMOOTH = 500; -class DHTData extends DHTDataBase_js_1.default { - flood; - constructor(swarm, { id = swarm.keyPair.publicKey, data = {}, ...opts } = {}) { - super(id, { swarm, ...opts }); - this.flood = new dht_flood_1.default({ id, swarm, ...opts }); - this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer)); - this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer)); - this.flood.on("message", (message, id) => this.onGetBroadcast(message, id)); - this.swarm.on("connection", (peer) => this.flood.send(peer, Buffer.from("hello"), 0)); - this.data = data; - [...this.swarm.peers.values()].forEach(this.handlePeerAdd.bind(this)); - } - handlePeerAdd(peer) { - const id = peer.remotePublicKey; - this.onAddPeer(id); - } - handlePeerRemove(peer) { - const id = peer.remotePublicKey; - // Wait for a bit and check if we're still disconnected before removing the peer - setTimeout(() => { - if (this.swarm._allConnections.has(id)) { - return; - } - this.onRemovePeer(id); - }, DISCONNECT_SMOOTH); - } - broadcast(message, ttl) { - this.flood.broadcast(message, ttl); - } -} -exports.default = DHTData; diff --git a/dist/messages.d.ts b/dist/messages.d.ts deleted file mode 100644 index 2fd39df..0000000 --- a/dist/messages.d.ts +++ /dev/null @@ -1,110 +0,0 @@ -import type { BinaryWriteOptions } from "@protobuf-ts/runtime"; -import type { IBinaryWriter } from "@protobuf-ts/runtime"; -import type { BinaryReadOptions } from "@protobuf-ts/runtime"; -import type { IBinaryReader } from "@protobuf-ts/runtime"; -import type { PartialMessage } from "@protobuf-ts/runtime"; -import { MessageType } from "@protobuf-ts/runtime"; -/** - * @generated from protobuf message Message - */ -export interface Message { - /** - * @generated from protobuf field: Type type = 1; - */ - type: Type; - /** - * @generated from protobuf field: map bootstrap = 2; - */ - bootstrap: { - [key: string]: State; - }; - /** - * @generated from protobuf field: optional bytes data = 3; - */ - data?: Uint8Array; - /** - * @generated from protobuf field: optional bytes signature = 4; - */ - signature?: Uint8Array; - /** - * @generated from protobuf field: optional int64 timestamp = 5; - */ - timestamp?: bigint; - /** - * @generated from protobuf field: optional bytes id = 6; - */ - id?: Uint8Array; -} -/** - * @generated from protobuf message State - */ -export interface State { - /** - * @generated from protobuf field: repeated bytes connectedTo = 1; - */ - connectedTo: Uint8Array[]; - /** - * @generated from protobuf field: optional bytes data = 2; - */ - data?: Uint8Array; - /** - * @generated from protobuf field: optional bytes signature = 3; - */ - signature?: Uint8Array; - /** - * @generated from protobuf field: optional int64 timestamp = 4; - */ - timestamp?: bigint; -} -/** - * @generated from protobuf enum Type - */ -export declare enum Type { - /** - * @generated synthetic value - protobuf-ts requires all enums to have a 0 value - */ - UNSPECIFIED$ = 0, - /** - * @generated from protobuf enum value: BOOTSTRAP_REQUEST = 1; - */ - BOOTSTRAP_REQUEST = 1, - /** - * @generated from protobuf enum value: BOOTSTRAP_RESPONSE = 2; - */ - BOOTSTRAP_RESPONSE = 2, - /** - * @generated from protobuf enum value: CONNECTED = 3; - */ - CONNECTED = 3, - /** - * @generated from protobuf enum value: DISCONNECTED = 4; - */ - DISCONNECTED = 4, - /** - * @generated from protobuf enum value: STATE = 5; - */ - STATE = 5 -} -declare class Message$Type extends MessageType { - constructor(); - create(value?: PartialMessage): Message; - internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: Message): Message; - private binaryReadMap2; - internalBinaryWrite(message: Message, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter; -} -/** - * @generated MessageType for protobuf message Message - */ -export declare const Message: Message$Type; -declare class State$Type extends MessageType { - constructor(); - create(value?: PartialMessage): State; - internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: State): State; - internalBinaryWrite(message: State, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter; -} -/** - * @generated MessageType for protobuf message State - */ -export declare const State: State$Type; -export {}; -//# sourceMappingURL=messages.d.ts.map \ No newline at end of file diff --git a/dist/messages.d.ts.map b/dist/messages.d.ts.map deleted file mode 100644 index 4bf0559..0000000 --- a/dist/messages.d.ts.map +++ /dev/null @@ -1 +0,0 @@ -{"version":3,"file":"messages.d.ts","sourceRoot":"","sources":["../src/messages.ts"],"names":[],"mappings":"AAGA,OAAO,KAAK,EAAE,kBAAkB,EAAE,MAAM,sBAAsB,CAAC;AAC/D,OAAO,KAAK,EAAE,aAAa,EAAE,MAAM,sBAAsB,CAAC;AAE1D,OAAO,KAAK,EAAE,iBAAiB,EAAE,MAAM,sBAAsB,CAAC;AAC9D,OAAO,KAAK,EAAE,aAAa,EAAE,MAAM,sBAAsB,CAAC;AAE1D,OAAO,KAAK,EAAE,cAAc,EAAE,MAAM,sBAAsB,CAAC;AAG3D,OAAO,EAAE,WAAW,EAAE,MAAM,sBAAsB,CAAC;AACnD;;GAEG;AACH,MAAM,WAAW,OAAO;IACpB;;OAEG;IACH,IAAI,EAAE,IAAI,CAAC;IACX;;OAEG;IACH,SAAS,EAAE;QACP,CAAC,GAAG,EAAE,MAAM,GAAG,KAAK,CAAC;KACxB,CAAC;IACF;;OAEG;IACH,IAAI,CAAC,EAAE,UAAU,CAAC;IAClB;;OAEG;IACH,SAAS,CAAC,EAAE,UAAU,CAAC;IACvB;;OAEG;IACH,SAAS,CAAC,EAAE,MAAM,CAAC;IACnB;;OAEG;IACH,EAAE,CAAC,EAAE,UAAU,CAAC;CACnB;AACD;;GAEG;AACH,MAAM,WAAW,KAAK;IAClB;;OAEG;IACH,WAAW,EAAE,UAAU,EAAE,CAAC;IAC1B;;OAEG;IACH,IAAI,CAAC,EAAE,UAAU,CAAC;IAClB;;OAEG;IACH,SAAS,CAAC,EAAE,UAAU,CAAC;IACvB;;OAEG;IACH,SAAS,CAAC,EAAE,MAAM,CAAC;CACtB;AACD;;GAEG;AACH,oBAAY,IAAI;IACZ;;OAEG;IACH,YAAY,IAAI;IAChB;;OAEG;IACH,iBAAiB,IAAI;IACrB;;OAEG;IACH,kBAAkB,IAAI;IACtB;;OAEG;IACH,SAAS,IAAI;IACb;;OAEG;IACH,YAAY,IAAI;IAChB;;OAEG;IACH,KAAK,IAAI;CACZ;AAED,cAAM,YAAa,SAAQ,WAAW,CAAC,OAAO,CAAC;;IAW3C,MAAM,CAAC,KAAK,CAAC,EAAE,cAAc,CAAC,OAAO,CAAC,GAAG,OAAO;IAOhD,kBAAkB,CAAC,MAAM,EAAE,aAAa,EAAE,MAAM,EAAE,MAAM,EAAE,OAAO,EAAE,iBAAiB,EAAE,MAAM,CAAC,EAAE,OAAO,GAAG,OAAO;IAkChH,OAAO,CAAC,cAAc;IAgBtB,mBAAmB,CAAC,OAAO,EAAE,OAAO,EAAE,MAAM,EAAE,aAAa,EAAE,OAAO,EAAE,kBAAkB,GAAG,aAAa;CA4B3G;AACD;;GAEG;AACH,eAAO,MAAM,OAAO,cAAqB,CAAC;AAE1C,cAAM,UAAW,SAAQ,WAAW,CAAC,KAAK,CAAC;;IASvC,MAAM,CAAC,KAAK,CAAC,EAAE,cAAc,CAAC,KAAK,CAAC,GAAG,KAAK;IAO5C,kBAAkB,CAAC,MAAM,EAAE,aAAa,EAAE,MAAM,EAAE,MAAM,EAAE,OAAO,EAAE,iBAAiB,EAAE,MAAM,CAAC,EAAE,KAAK,GAAG,KAAK;IA4B5G,mBAAmB,CAAC,OAAO,EAAE,KAAK,EAAE,MAAM,EAAE,aAAa,EAAE,OAAO,EAAE,kBAAkB,GAAG,aAAa;CAkBzG;AACD;;GAEG;AACH,eAAO,MAAM,KAAK,YAAmB,CAAC"} \ No newline at end of file diff --git a/dist/messages.js b/dist/messages.js deleted file mode 100644 index 61bd989..0000000 --- a/dist/messages.js +++ /dev/null @@ -1,208 +0,0 @@ -"use strict"; -Object.defineProperty(exports, "__esModule", { value: true }); -exports.State = exports.Message = exports.Type = void 0; -const runtime_1 = require("@protobuf-ts/runtime"); -const runtime_2 = require("@protobuf-ts/runtime"); -const runtime_3 = require("@protobuf-ts/runtime"); -const runtime_4 = require("@protobuf-ts/runtime"); -const runtime_5 = require("@protobuf-ts/runtime"); -/** - * @generated from protobuf enum Type - */ -var Type; -(function (Type) { - /** - * @generated synthetic value - protobuf-ts requires all enums to have a 0 value - */ - Type[Type["UNSPECIFIED$"] = 0] = "UNSPECIFIED$"; - /** - * @generated from protobuf enum value: BOOTSTRAP_REQUEST = 1; - */ - Type[Type["BOOTSTRAP_REQUEST"] = 1] = "BOOTSTRAP_REQUEST"; - /** - * @generated from protobuf enum value: BOOTSTRAP_RESPONSE = 2; - */ - Type[Type["BOOTSTRAP_RESPONSE"] = 2] = "BOOTSTRAP_RESPONSE"; - /** - * @generated from protobuf enum value: CONNECTED = 3; - */ - Type[Type["CONNECTED"] = 3] = "CONNECTED"; - /** - * @generated from protobuf enum value: DISCONNECTED = 4; - */ - Type[Type["DISCONNECTED"] = 4] = "DISCONNECTED"; - /** - * @generated from protobuf enum value: STATE = 5; - */ - Type[Type["STATE"] = 5] = "STATE"; -})(Type = exports.Type || (exports.Type = {})); -// @generated message type with reflection information, may provide speed optimized methods -class Message$Type extends runtime_5.MessageType { - constructor() { - super("Message", [ - { no: 1, name: "type", kind: "enum", T: () => ["Type", Type] }, - { no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => exports.State } }, - { no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, - { no: 4, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, - { no: 5, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ }, - { no: 6, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ } - ]); - } - create(value) { - const message = { type: 0, bootstrap: {} }; - globalThis.Object.defineProperty(message, runtime_4.MESSAGE_TYPE, { enumerable: false, value: this }); - if (value !== undefined) - (0, runtime_3.reflectionMergePartial)(this, message, value); - return message; - } - internalBinaryRead(reader, length, options, target) { - let message = target ?? this.create(), end = reader.pos + length; - while (reader.pos < end) { - let [fieldNo, wireType] = reader.tag(); - switch (fieldNo) { - case /* Type type */ 1: - message.type = reader.int32(); - break; - case /* map bootstrap */ 2: - this.binaryReadMap2(message.bootstrap, reader, options); - break; - case /* optional bytes data */ 3: - message.data = reader.bytes(); - break; - case /* optional bytes signature */ 4: - message.signature = reader.bytes(); - break; - case /* optional int64 timestamp */ 5: - message.timestamp = reader.int64().toBigInt(); - break; - case /* optional bytes id */ 6: - message.id = reader.bytes(); - break; - default: - let u = options.readUnknownField; - if (u === "throw") - throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); - let d = reader.skip(wireType); - if (u !== false) - (u === true ? runtime_2.UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); - } - } - return message; - } - binaryReadMap2(map, reader, options) { - let len = reader.uint32(), end = reader.pos + len, key, val; - while (reader.pos < end) { - let [fieldNo, wireType] = reader.tag(); - switch (fieldNo) { - case 1: - key = reader.string(); - break; - case 2: - val = exports.State.internalBinaryRead(reader, reader.uint32(), options); - break; - default: throw new globalThis.Error("unknown map entry field for field Message.bootstrap"); - } - } - map[key ?? ""] = val ?? exports.State.create(); - } - internalBinaryWrite(message, writer, options) { - /* Type type = 1; */ - if (message.type !== 0) - writer.tag(1, runtime_1.WireType.Varint).int32(message.type); - /* map bootstrap = 2; */ - for (let k of Object.keys(message.bootstrap)) { - writer.tag(2, runtime_1.WireType.LengthDelimited).fork().tag(1, runtime_1.WireType.LengthDelimited).string(k); - writer.tag(2, runtime_1.WireType.LengthDelimited).fork(); - exports.State.internalBinaryWrite(message.bootstrap[k], writer, options); - writer.join().join(); - } - /* optional bytes data = 3; */ - if (message.data !== undefined) - writer.tag(3, runtime_1.WireType.LengthDelimited).bytes(message.data); - /* optional bytes signature = 4; */ - if (message.signature !== undefined) - writer.tag(4, runtime_1.WireType.LengthDelimited).bytes(message.signature); - /* optional int64 timestamp = 5; */ - if (message.timestamp !== undefined) - writer.tag(5, runtime_1.WireType.Varint).int64(message.timestamp); - /* optional bytes id = 6; */ - if (message.id !== undefined) - writer.tag(6, runtime_1.WireType.LengthDelimited).bytes(message.id); - let u = options.writeUnknownFields; - if (u !== false) - (u == true ? runtime_2.UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); - return writer; - } -} -/** - * @generated MessageType for protobuf message Message - */ -exports.Message = new Message$Type(); -// @generated message type with reflection information, may provide speed optimized methods -class State$Type extends runtime_5.MessageType { - constructor() { - super("State", [ - { no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ }, - { no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, - { no: 3, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, - { no: 4, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ } - ]); - } - create(value) { - const message = { connectedTo: [] }; - globalThis.Object.defineProperty(message, runtime_4.MESSAGE_TYPE, { enumerable: false, value: this }); - if (value !== undefined) - (0, runtime_3.reflectionMergePartial)(this, message, value); - return message; - } - internalBinaryRead(reader, length, options, target) { - let message = target ?? this.create(), end = reader.pos + length; - while (reader.pos < end) { - let [fieldNo, wireType] = reader.tag(); - switch (fieldNo) { - case /* repeated bytes connectedTo */ 1: - message.connectedTo.push(reader.bytes()); - break; - case /* optional bytes data */ 2: - message.data = reader.bytes(); - break; - case /* optional bytes signature */ 3: - message.signature = reader.bytes(); - break; - case /* optional int64 timestamp */ 4: - message.timestamp = reader.int64().toBigInt(); - break; - default: - let u = options.readUnknownField; - if (u === "throw") - throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); - let d = reader.skip(wireType); - if (u !== false) - (u === true ? runtime_2.UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); - } - } - return message; - } - internalBinaryWrite(message, writer, options) { - /* repeated bytes connectedTo = 1; */ - for (let i = 0; i < message.connectedTo.length; i++) - writer.tag(1, runtime_1.WireType.LengthDelimited).bytes(message.connectedTo[i]); - /* optional bytes data = 2; */ - if (message.data !== undefined) - writer.tag(2, runtime_1.WireType.LengthDelimited).bytes(message.data); - /* optional bytes signature = 3; */ - if (message.signature !== undefined) - writer.tag(3, runtime_1.WireType.LengthDelimited).bytes(message.signature); - /* optional int64 timestamp = 4; */ - if (message.timestamp !== undefined) - writer.tag(4, runtime_1.WireType.Varint).int64(message.timestamp); - let u = options.writeUnknownFields; - if (u !== false) - (u == true ? runtime_2.UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); - return writer; - } -} -/** - * @generated MessageType for protobuf message State - */ -exports.State = new State$Type(); diff --git a/messages.proto b/messages.proto index f3737a1..815754c 100644 --- a/messages.proto +++ b/messages.proto @@ -5,7 +5,8 @@ enum Type { BOOTSTRAP_RESPONSE = 2; CONNECTED = 3; DISCONNECTED = 4; - STATE = 5; + ADD_ITEM = 5; + REMOVE_ITEM = 6; } message Message { @@ -13,13 +14,9 @@ message Message { map bootstrap = 2; // For bootstrap events optional bytes data = 3; // For state event optional bytes signature = 4; - optional int64 timestamp = 5; - optional bytes id = 6; // For connected and disconnected events + optional bytes id = 5; // For connected and disconnected events } message State { repeated bytes connectedTo = 1; - optional bytes data = 2; - optional bytes signature = 3; - optional int64 timestamp = 4; } diff --git a/package.json b/package.json index 29d7d04..1f6a041 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "@lumeweb/dht-data", + "name": "@lumeweb/dht-cache", "type": "commonjs", "version": "0.1.0", "main": "dist/index.js", diff --git a/src/DHTDataBase.ts b/src/DHTDataBase.ts deleted file mode 100644 index 43c036a..0000000 --- a/src/DHTDataBase.ts +++ /dev/null @@ -1,366 +0,0 @@ -import EventEmitter from "events"; -// @ts-ignore -import { DiGraph, hasPath } from "jsnetworkx"; -// @ts-ignore -import orderedJSON from "ordered-json"; -// @ts-ignore -import crypto from "hypercore-crypto"; -import b4a from "b4a"; -import { Message, State, Type } from "./messages.js"; -// @ts-ignore -import sodium from "sodium-universal"; -import debug from "debug"; - -type Bootstrap = { - [key: string]: State; -}; - -export default class DHTDataBase extends EventEmitter { - protected swarm: any; - private id: Buffer; - private bootstrapped: boolean; - private graph: any; - private connectedTo: Set; - - constructor(id: Buffer, { swarm }: { swarm?: any } = {}) { - super(); - if (!id) throw new TypeError("Must provide id for self"); - - this.id = id; - this.bootstrapped = false; - this.graph = new DiGraph(); - this.connectedTo = new Set(); - this._data = {}; - this._online = [this._maybeHexify(this.id)]; - this.swarm = swarm; - } - - private _data: {}; - - get data(): {} { - return { ...this._data }; - } - - set data(value: {}) { - this._data = value; - - const timestamp = BigInt(Date.now()); - - const rawData = orderedJSON.stringify(value); - - const signature = crypto.sign( - b4a.from(`${timestamp}${rawData}`), - this.swarm.keyPair.secretKey - ); - - this._setPeer(this.id, value, timestamp, signature); - this._broadcastData(); - } - - private _online: string[]; - - get online(): string[] { - return this._online; - } - - broadcast(data: any, ttl?: number) { - throw new TypeError("Broadcast has not been implemented"); - } - - getPeerRaw(id: Buffer | string) { - return this.graph.node.get(this._maybeHexify(id)); - } - - getPeerField(id: Buffer | string, field: string) { - return this.getPeerRaw(id)?.[field]; - } - - getPeerData(id: Buffer | string) { - return this.getPeerField(id, "data"); - } - - getPeerTimestamp(id: Buffer | string) { - return this.getPeerField(id, "timestamp"); - } - - getPeerSignature(id: Buffer | string) { - return this.getPeerField(id, "signature"); - } - - protected onAddPeer(id: Buffer) { - const stringId = id.toString("hex"); - if (this.connectedTo.has(stringId)) { - return; - } // Already know we're connected here - - this.connectedTo.add(stringId); - this._addPeerConnection(this.id, id); - this.emit("peer-add", id); - - this._recalculate(); - - this.broadcast( - Message.toBinary( - Message.create({ - type: Type.CONNECTED, - id, - }) - ) - ); - this._broadcastData(); - - if (this.bootstrapped) { - return; - } - - // If this is the first person we've met, get their graph - this.broadcast( - Message.toBinary( - Message.create({ - type: Type.BOOTSTRAP_REQUEST, - }) - ), - 0 - ); - } - - protected onRemovePeer(id: Buffer) { - this.connectedTo.delete(id.toString("hex")); - this._removePeerConnection(this.id, id); - this.emit("peer-remove"); - - this._recalculate(); - - this.broadcast( - Message.toBinary( - Message.create({ - type: Type.DISCONNECTED, - id, - }) - ) - ); - } - - protected onGetBroadcast(message: Buffer, id: Buffer) { - let decoded; - try { - decoded = Message.fromBinary(message); - } catch { - return; - } - - const { type } = decoded; - if (!type) { - throw new Error("Missing Type In Message"); - } - if (type === Type.STATE) { - const { data: rawData, timestamp, signature } = decoded; - - if ( - signature && - crypto.verify(b4a.from(`${timestamp}${rawData}`), signature, id) - ) { - if ((timestamp || 0) <= this.getPeerTimestamp(id)) { - debug(`Received old data for peer ${id}`); - return; - } - - const data = rawData ? orderedJSON.parse(rawData) : null; - this._setPeer(id, data, timestamp, signature); - this.emit("peer-data", data, id); - this._recalculate(); - return; - } - - debug(`Invalid signature received for peer ${id}`); - } else if (type === Type.CONNECTED) { - const { id: toId } = decoded; - this._addPeerConnection(id, Buffer.from(toId as Uint8Array)); - this.emit("peer-add-seen", id, toId); - this._recalculate(); - } else if (type === Type.DISCONNECTED) { - const { id: toId } = decoded; - this._removePeerConnection(id, Buffer.from(toId as Uint8Array)); - this.emit("peer-remove-seen", id, toId); - this._recalculate(); - } else if (type === Type.BOOTSTRAP_REQUEST) { - const bootstrap = this._getBootstrapInfo(); - this.broadcast( - Message.toBinary( - Message.create({ - type: Type.BOOTSTRAP_RESPONSE, - bootstrap, - }) - ), - 0 - ); - } else if (type === Type.BOOTSTRAP_RESPONSE) { - const { bootstrap } = decoded; - this._bootstrapFrom(bootstrap); - } - } - - private _broadcastData() { - const rawData = this._data; - if (!Object.keys(rawData).length) { - return; - } - const data = orderedJSON.stringify(rawData); - const { timestamp, signature } = this.getPeerRaw(this.id); - this.broadcast( - Message.toBinary( - Message.create({ - type: Type.STATE, - data: b4a.from(data), - signature, - timestamp, - }) - ) - ); - } - - private _hasSeenPeer(id: Buffer | string) { - return this.graph.hasNode(this._maybeHexify(id)); - } - - private _setPeer( - id: Buffer | string, - data: any, - timestamp?: BigInt, - signature?: Uint8Array - ) { - this.graph.addNode(this._maybeHexify(id), { - timestamp, - signature, - data, - }); - } - - private _ensurePeer(id: Buffer | string) { - id = this._maybeHexify(id); - if (!this._hasSeenPeer(id)) { - this._setPeer(id, {}); - } - } - - private _addPeerConnection( - origin: Buffer | string, - destination: Buffer | string - ) { - this._ensurePeer(origin); - this._ensurePeer(destination); - this.graph.addEdge( - this._maybeHexify(origin), - this._maybeHexify(destination) - ); - } - - private _removePeerConnection(origin: Buffer, destination: Buffer) { - try { - this._ensurePeer(origin); - this._ensurePeer(destination); - this.graph.removeEdge( - origin.toString("hex"), - destination.toString("hex") - ); - } catch (e: any) { - if (e.name !== "JSNetworkXError") throw e; - } - } - - private _bootstrapFrom(bootstrap: Bootstrap) { - if (this.bootstrapped) { - return; - } - - for (const id in bootstrap) { - const { data, connectedTo, signature, timestamp } = bootstrap[id]; - if (id === this.id.toString("hex")) { - continue; - } - - if ( - signature && - crypto.verify( - b4a.from(`${timestamp}${data}`), - signature, - b4a.from(id, "hex") - ) - ) { - const parsedData = data ? orderedJSON.parse(data) : null; - let peerData = parsedData || {}; - - // If we're already tracking them - if (this._hasSeenPeer(id)) { - // Ensure we don't have old data - if ((timestamp || 0) > this.getPeerTimestamp(id)) { - // See what data we already have for them - // Add their existing data to what we got from the bootstrap - const existingPeerData = this.getPeerData(id); - peerData = { ...existingPeerData, ...peerData }; - this._setPeer(id, peerData, timestamp, signature); - } else { - debug(`Received old data for peer ${id}`); - } - } - } else { - debug(`Invalid signature received for peer ${id}`); - } - for (const connection of connectedTo) { - this._addPeerConnection(id, Buffer.from(connection)); - } - } - - this.emit("bootstrapped"); - - this._recalculate(); - } - - private _getBootstrapInfo() { - const state: Bootstrap = {}; - for (const [id, rawData] of this.graph.nodes(true)) { - const connectedTo = this.graph - .neighbors(id) - .map((id: string) => Buffer.from(id, "hex")); - - const data = rawData ? orderedJSON.stringify(rawData?.data) : null; - const { timestamp = undefined, signature = undefined } = rawData; - state[id] = { data: b4a.from(data), connectedTo, timestamp, signature }; - } - - return state; - } - - // Calculate who's online and emit an event - private _recalculate() { - const online = this.graph.nodes().filter((id: string) => { - return hasPath(this.graph, { - source: this._maybeHexify(this.id), - target: id, - }); - }); - - const offline = this.graph.nodes().filter((id: string) => { - return !hasPath(this.graph, { - source: this._maybeHexify(this.id), - target: id, - }); - }); - - for (const id of offline) { - this.graph.removeNode(id); - } - - this._online = online; - - this.emit("online", online); - } - - private _maybeHexify(data: Buffer | string): string { - if (b4a.isBuffer(data)) { - return data.toString("hex"); - } - - return data; - } -} diff --git a/src/index.ts b/src/index.ts index 8408f71..02e2eae 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,52 +1,427 @@ -import DHTDataBase from "./DHTDataBase.js"; +import EventEmitter from "events"; +// @ts-ignore +import { DiGraph, hasPath } from "jsnetworkx"; +// @ts-ignore +import orderedJSON from "ordered-json"; +// @ts-ignore +import crypto from "hypercore-crypto"; +import b4a from "b4a"; +import { Message, State, Type } from "./messages.js"; +// @ts-ignore +import sodium from "sodium-universal"; +import type { PartialMessage } from "@protobuf-ts/runtime"; import DHTFlood from "@lumeweb/dht-flood"; +type Bootstrap = { + [key: string]: State; +}; + +const EntityType = { + PUBKEY: Symbol.for("PUBKEY"), + ITEM: Symbol.for("ITEM"), +}; + const DISCONNECT_SMOOTH = 500; -export default class DHTData extends DHTDataBase { - private flood: DHTFlood; +export default class DHTCache extends EventEmitter { + protected swarm: any; + private id: Buffer; + private bootstrapped: boolean; + private graph: any; + private connectedTo: Set; + + protected flood: DHTFlood; constructor( swarm: any, { id = swarm.keyPair.publicKey, - data = {}, ...opts - }: { id?: Buffer; data?: {}; [key: string]: any } = {} + }: { id?: Buffer; [key: string]: any } = {} ) { - super(id, { swarm, ...(opts as any) }); + super(); + if (!id) throw new TypeError("Must provide id for self"); + + this.id = id; + this.bootstrapped = false; + this.graph = new DiGraph(); + this.connectedTo = new Set(); + this._cache = new Set(); + this._online = new Set([this._maybeHexify(this.id)]); + this.swarm = swarm; this.flood = new DHTFlood({ id, swarm, ...opts }); - this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer)); - this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer)); + this.flood.on("peer-open", (peer) => this.addPeerHandler(peer)); + this.flood.on("peer-remove", (peer) => this.removePeerHandler(peer)); this.flood.on("message", (message, id) => this.onGetBroadcast(message, id)); this.swarm.on("connection", (peer: any) => this.flood.send(peer, Buffer.from("hello"), 0) ); - this.data = data; + [...this.swarm.peers.values()] + .map((item) => { + remotePublicKey: item.publicKey; + }) + .filter((item: any) => !!item) + .forEach((item) => { + this.addPeerHandler(item); + }); - [...this.swarm.peers.values()].forEach(this.handlePeerAdd.bind(this)); + this._ensurePeer(id); } - handlePeerAdd(peer: any) { + private _cache: Set; + + public get cache(): {} { + return [...this._cache].sort(); + } + + public get all_cache(): string[] { + const items = []; + for (const id of this.graph.nodesIter()) { + const item = this.graph.node.get(id); + if (item?.type !== EntityType.ITEM) { + continue; + } + items.push(id); + } + return items.sort(); + } + + public addItem(item: string | Buffer) { + item = this._maybeHexify(item); + this._cache.add(item); + + this._ensureItem(item); + + const broadcast = () => { + this._broadcastMessage({ + type: Type.ADD_ITEM, + data: b4a.from(item as string, "hex"), + signature: this._signItem(item), + }); + }; + + if (!this.bootstrapped) { + this.once("bootstrapped", broadcast); + } else { + broadcast(); + } + } + + private _compileMessage(message: PartialMessage): Uint8Array { + return Message.toBinary(Message.create(message)); + } + + private _broadcastMessage(message: PartialMessage, ttl?: number) { + this.broadcast(this._compileMessage(message), ttl); + } + + public removeItem(item: string | Buffer): boolean { + item = this._maybeHexify(item); + + if (!this._cache.has(item)) { + return false; + } + this._cache.delete(item); + + this._removeEntity(item); + + this._broadcastMessage({ + type: Type.REMOVE_ITEM, + data: b4a.from(item, "hex"), + signature: this._signItem(item), + }); + + return true; + } + + private _online: Set; + + public get online(): Set { + return this._online; + } + + public broadcast(message: any, ttl?: number) { + this.flood.broadcast(message, ttl); + } + + public send(message: any) { + this.flood.send(message, 0); + } + + protected addPeerHandler(peer: any) { const id = peer.remotePublicKey; - this.onAddPeer(id); + const stringId = id.toString("hex"); + if (this.connectedTo.has(stringId)) { + return; + } // Already know we're connected here + + this.connectedTo.add(stringId); + this._ensurePeer(id); + this._addEntityConnection(this.id, id); + this.emit("peer-add", id); + + this._recalculate(); + + this._broadcastMessage({ + type: Type.CONNECTED, + id, + }); + + if (this.bootstrapped) { + return; + } + + // If this is the first person we've met, get their graph + this._broadcastMessage({ type: Type.BOOTSTRAP_REQUEST }, 0); } - handlePeerRemove(peer: any) { + removePeerHandler(peer: any) { const id = peer.remotePublicKey; // Wait for a bit and check if we're still disconnected before removing the peer setTimeout(() => { if (this.swarm._allConnections.has(id)) { return; } - this.onRemovePeer(id); + this.onRemovePeer(peer); }, DISCONNECT_SMOOTH); } - broadcast(message: any, ttl?: number) { - this.flood.broadcast(message, ttl); + protected onRemovePeer(peer: any) { + const id = peer.remotePublicKey; + + this.connectedTo.delete(id.toString("hex")); + + this._removeEntityConnection(this.id, id); + this.emit("peer-remove"); + + this._recalculate(); + + this._broadcastMessage( + { + type: Type.DISCONNECTED, + id, + }, + 0 + ); + } + + protected onGetBroadcast(message: Buffer, id: Buffer) { + let decoded; + try { + decoded = Message.fromBinary(message); + } catch { + return; + } + + const { type } = decoded; + if (!type) { + throw new Error("Missing Type In Message"); + } + + if ([Type.ADD_ITEM, Type.REMOVE_ITEM].includes(type)) { + const { data: rawData, signature } = decoded; + const bufData = b4a.from(rawData as Uint8Array) as Buffer; + if (signature && crypto.verify(bufData, signature, id)) { + if (Type.ADD_ITEM === type) { + this._ensureItem(bufData); + this._addEntityConnection(id, bufData); + } + + if (Type.REMOVE_ITEM === type) { + this.removeItem(bufData); + this._pruneItems(); + } + } + } else if (type === Type.CONNECTED) { + const { id: toId } = decoded; + this._addEntityConnection(id, Buffer.from(toId as Uint8Array)); + this.emit("peer-add-seen", id, toId); + this._recalculate(); + } else if (type === Type.DISCONNECTED) { + const { id: toId } = decoded; + this._removeEntityConnection(id, Buffer.from(toId as Uint8Array)); + this.emit("peer-remove-seen", id, toId); + this._recalculate(); + } else if (type === Type.BOOTSTRAP_REQUEST) { + const bootstrap = this._getBootstrapInfo(); + this.broadcast( + Message.toBinary( + Message.create({ + type: Type.BOOTSTRAP_RESPONSE, + bootstrap, + }) + ), + 0 + ); + } else if (type === Type.BOOTSTRAP_RESPONSE) { + const { bootstrap } = decoded; + this._bootstrapFrom(bootstrap); + } + } + + private _signItem(item: string | Buffer): Buffer { + item = this._maybeHexify(item); + + return crypto.sign(b4a.from(item, "hex"), this.swarm.keyPair.secretKey); + } + + private _setEntity(id: Buffer | string, data: any): void { + this.graph.addNode(this._maybeHexify(id), data); + } + + private _ensureItem(id: Buffer | string) { + this._ensureEntity(id, { type: EntityType.ITEM }); + } + + private _ensurePeer(id: Buffer | string) { + this._ensureEntity(id, { type: EntityType.PUBKEY }); + } + + private _ensureEntity(id: Buffer | string, def = {}) { + id = this._maybeHexify(id); + if (!this._hasSeenEntity(id)) { + this._setEntity(id, def); + } + } + + private _hasSeenEntity(id: Buffer | string): boolean { + return this.graph.hasNode(this._maybeHexify(id)); + } + + private _addEntityConnection( + origin: Buffer | string, + destination: Buffer | string + ) { + this._ensureEntity(origin); + this._ensureEntity(destination); + this.graph.addEdge( + this._maybeHexify(origin), + this._maybeHexify(destination) + ); + } + + private _removeEntityConnection(origin: Buffer, destination: Buffer) { + try { + this._ensureEntity(origin); + this._ensureEntity(destination); + this.graph.removeEdge( + origin.toString("hex"), + destination.toString("hex") + ); + } catch (e: any) { + if (e.name !== "JSNetworkXError") throw e; + } + } + + private _removeEntity(id: Buffer | string) { + this.graph.removeNode(this._maybeHexify(id)); + } + + private _bootstrapFrom(bootstrap: Bootstrap) { + if (this.bootstrapped) { + return; + } + + for (const id in bootstrap) { + const { connectedTo } = bootstrap[id]; + if (id === this.id.toString("hex")) { + continue; + } + + for (const connection of connectedTo) { + const peer = b4a.from(connection) as Buffer; + this._ensurePeer(peer); + this._addEntityConnection(id, peer); + } + } + + this.emit("bootstrapped"); + + this._recalculate(); + } + + private _getBootstrapInfo() { + const state: Bootstrap = {}; + for (const id of this.graph.nodesIter()) { + const item = this.graph.node.get(id); + if (item?.type !== EntityType.PUBKEY) { + continue; + } + const connectedTo = this.graph + .neighbors(id) + .map((id: string) => Buffer.from(id, "hex")); + state[id] = { connectedTo }; + } + + return state; + } + + // Calculate who's online and emit an event + private _recalculate() { + const online = new Set(); + const offline = new Set(); + + for (const id of this.graph.nodesIter()) { + const item = this.graph.node.get(id); + if (item?.type !== EntityType.PUBKEY) { + continue; + } + + if ( + hasPath(this.graph, { + source: this._maybeHexify(this.id), + target: id, + }) + ) { + online.add(id); + } else { + offline.add(id); + } + } + + for (const id of offline) { + this.graph.removeNode(id); + } + + this._online = online; + + this.emit("online", online); + } + + private _maybeHexify(data: Buffer | string): string { + if (b4a.isBuffer(data)) { + return data.toString("hex"); + } + + return data; + } + + private _pruneItems(item?: Buffer) { + let items: string[] | Buffer[]; + if (!item) { + items = this.graph.nodesIter(); + } else { + items = [item]; + } + + (async () => { + for (const id of items) { + const item = this.graph.node.get(id); + if (item?.type !== EntityType.ITEM) { + continue; + } + + (async () => { + try { + if (0 === this.graph.neighbors(id)?.length) { + this.graph.removeNode(id); + } + } catch {} + })(); + } + })(); } } diff --git a/src/messages.ts b/src/messages.ts index cbc8085..cc1cd5a 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -1,4 +1,4 @@ -// @generated by protobuf-ts 2.8.1 +// @generated by protobuf-ts 2.8.2 // @generated from protobuf file "messages.proto" (syntax proto2) // tslint:disable import type { BinaryWriteOptions } from "@protobuf-ts/runtime"; @@ -34,11 +34,7 @@ export interface Message { */ signature?: Uint8Array; /** - * @generated from protobuf field: optional int64 timestamp = 5; - */ - timestamp?: bigint; - /** - * @generated from protobuf field: optional bytes id = 6; + * @generated from protobuf field: optional bytes id = 5; */ id?: Uint8Array; // For connected and disconnected events } @@ -50,18 +46,6 @@ export interface State { * @generated from protobuf field: repeated bytes connectedTo = 1; */ connectedTo: Uint8Array[]; - /** - * @generated from protobuf field: optional bytes data = 2; - */ - data?: Uint8Array; - /** - * @generated from protobuf field: optional bytes signature = 3; - */ - signature?: Uint8Array; - /** - * @generated from protobuf field: optional int64 timestamp = 4; - */ - timestamp?: bigint; } /** * @generated from protobuf enum Type @@ -88,9 +72,13 @@ export enum Type { */ DISCONNECTED = 4, /** - * @generated from protobuf enum value: STATE = 5; + * @generated from protobuf enum value: ADD_ITEM = 5; */ - STATE = 5 + ADD_ITEM = 5, + /** + * @generated from protobuf enum value: REMOVE_ITEM = 6; + */ + REMOVE_ITEM = 6 } // @generated message type with reflection information, may provide speed optimized methods class Message$Type extends MessageType { @@ -100,8 +88,7 @@ class Message$Type extends MessageType { { no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => State } }, { no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, { no: 4, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, - { no: 5, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ }, - { no: 6, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ } + { no: 5, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ } ]); } create(value?: PartialMessage): Message { @@ -128,10 +115,7 @@ class Message$Type extends MessageType { case /* optional bytes signature */ 4: message.signature = reader.bytes(); break; - case /* optional int64 timestamp */ 5: - message.timestamp = reader.int64().toBigInt(); - break; - case /* optional bytes id */ 6: + case /* optional bytes id */ 5: message.id = reader.bytes(); break; default: @@ -178,12 +162,9 @@ class Message$Type extends MessageType { /* optional bytes signature = 4; */ if (message.signature !== undefined) writer.tag(4, WireType.LengthDelimited).bytes(message.signature); - /* optional int64 timestamp = 5; */ - if (message.timestamp !== undefined) - writer.tag(5, WireType.Varint).int64(message.timestamp); - /* optional bytes id = 6; */ + /* optional bytes id = 5; */ if (message.id !== undefined) - writer.tag(6, WireType.LengthDelimited).bytes(message.id); + writer.tag(5, WireType.LengthDelimited).bytes(message.id); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); @@ -198,10 +179,7 @@ export const Message = new Message$Type(); class State$Type extends MessageType { constructor() { super("State", [ - { no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ }, - { no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, - { no: 3, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, - { no: 4, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ } + { no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ } ]); } create(value?: PartialMessage): State { @@ -219,15 +197,6 @@ class State$Type extends MessageType { case /* repeated bytes connectedTo */ 1: message.connectedTo.push(reader.bytes()); break; - case /* optional bytes data */ 2: - message.data = reader.bytes(); - break; - case /* optional bytes signature */ 3: - message.signature = reader.bytes(); - break; - case /* optional int64 timestamp */ 4: - message.timestamp = reader.int64().toBigInt(); - break; default: let u = options.readUnknownField; if (u === "throw") @@ -243,15 +212,6 @@ class State$Type extends MessageType { /* repeated bytes connectedTo = 1; */ for (let i = 0; i < message.connectedTo.length; i++) writer.tag(1, WireType.LengthDelimited).bytes(message.connectedTo[i]); - /* optional bytes data = 2; */ - if (message.data !== undefined) - writer.tag(2, WireType.LengthDelimited).bytes(message.data); - /* optional bytes signature = 3; */ - if (message.signature !== undefined) - writer.tag(3, WireType.LengthDelimited).bytes(message.signature); - /* optional int64 timestamp = 4; */ - if (message.timestamp !== undefined) - writer.tag(4, WireType.Varint).int64(message.timestamp); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); diff --git a/test.js b/test.js index 5368400..82ffede 100644 --- a/test.js +++ b/test.js @@ -3,11 +3,14 @@ const test = require("tape"); const Hyperswarm = require("hyperswarm"); const sodium = require("sodium-universal"); const b4a = require("b4a"); -const { default: DHTData } = require("./"); +const { default: DHTCache } = require("./"); const crypto = require("crypto"); const topicName = crypto.randomBytes(10); const topic = b4a.allocUnsafe(32); -sodium.crypto_generichash(topic, b4a.from(topicName)); +const item1 = b4a.allocUnsafe(32); +const item2 = b4a.allocUnsafe(32); +sodium.crypto_generichash(item1, b4a.from("item1")); +sodium.crypto_generichash(item2, b4a.from("item2")); test("Basic presence test / data propagation", (t) => { t.plan(6); @@ -19,11 +22,11 @@ test("Basic presence test / data propagation", (t) => { const peer1 = peers.shift(); const peer2 = peers.shift(); - const p1 = new DHTData(peer1); - const p2 = new DHTData(peer2); + const p1 = new DHTCache(peer1); + const p2 = new DHTCache(peer2); - p1.data = { message: "Hello" }; - p2.data = { message: "World!" }; + p1.addItem(item1.toString("hex")); + p2.addItem(item2.toString("hex")); t.ok(p1.id, "Generated id 1"); t.ok(p2.id, "Generated id 2"); @@ -36,9 +39,9 @@ test("Basic presence test / data propagation", (t) => { let hasFinished = false; function handleOnline(list) { - if (list.length === 2) { - const peerData1 = p1.getPeerData(p2.id); - const peerData2 = p2.getPeerData(p1.id); + if (list.size === 2) { + const peerData1 = [...p1.cache]; + const peerData2 = [...p2.cache]; const hasP1 = peerData1 && Object.keys(peerData1).length; const hasP2 = peerData2 && Object.keys(peerData2).length; if (!hasP1 || !hasP2) { @@ -51,8 +54,8 @@ test("Basic presence test / data propagation", (t) => { setTimeout(() => { t.pass("Seeing everyone online"); - t.deepEqual(peerData1, p2.data, "Got peer data from peer 2"); - t.deepEqual(peerData2, p1.data, "Got peer data from peer 2"); + t.equals(true, p2.all_cache.includes(peerData1[0])); + t.equals(true, p1.all_cache.includes(peerData2[0])); hasFinished = true; peer2._allConnections.get(peer1.keyPair.publicKey).end();