diff --git a/dist/DHTDataBase.d.ts b/dist/DHTDataBase.d.ts
deleted file mode 100644
index a9620d8..0000000
--- a/dist/DHTDataBase.d.ts
+++ /dev/null
@@ -1,38 +0,0 @@
-///
-///
-import EventEmitter from "events";
-export default class DHTDataBase extends EventEmitter {
- protected swarm: any;
- private id;
- private bootstrapped;
- private graph;
- private connectedTo;
- constructor(id: Buffer, { swarm }?: {
- swarm?: any;
- });
- private _data;
- get data(): {};
- set data(value: {});
- private _online;
- get online(): string[];
- broadcast(data: any, ttl?: number): void;
- getPeerRaw(id: Buffer | string): any;
- getPeerField(id: Buffer | string, field: string): any;
- getPeerData(id: Buffer | string): any;
- getPeerTimestamp(id: Buffer | string): any;
- getPeerSignature(id: Buffer | string): any;
- protected onAddPeer(id: Buffer): void;
- protected onRemovePeer(id: Buffer): void;
- protected onGetBroadcast(message: Buffer, id: Buffer): void;
- private _broadcastData;
- private _hasSeenPeer;
- private _setPeer;
- private _ensurePeer;
- private _addPeerConnection;
- private _removePeerConnection;
- private _bootstrapFrom;
- private _getBootstrapInfo;
- private _recalculate;
- private _maybeHexify;
-}
-//# sourceMappingURL=DHTDataBase.d.ts.map
\ No newline at end of file
diff --git a/dist/DHTDataBase.d.ts.map b/dist/DHTDataBase.d.ts.map
deleted file mode 100644
index 08e2d18..0000000
--- a/dist/DHTDataBase.d.ts.map
+++ /dev/null
@@ -1 +0,0 @@
-{"version":3,"file":"DHTDataBase.d.ts","sourceRoot":"","sources":["../src/DHTDataBase.ts"],"names":[],"mappings":";;AAAA,OAAO,YAAY,MAAM,QAAQ,CAAC;AAiBlC,MAAM,CAAC,OAAO,OAAO,WAAY,SAAQ,YAAY;IACnD,SAAS,CAAC,KAAK,EAAE,GAAG,CAAC;IACrB,OAAO,CAAC,EAAE,CAAS;IACnB,OAAO,CAAC,YAAY,CAAU;IAC9B,OAAO,CAAC,KAAK,CAAM;IACnB,OAAO,CAAC,WAAW,CAAW;gBAElB,EAAE,EAAE,MAAM,EAAE,EAAE,KAAK,EAAE,GAAE;QAAE,KAAK,CAAC,EAAE,GAAG,CAAA;KAAO;IAavD,OAAO,CAAC,KAAK,CAAK;IAElB,IAAI,IAAI,IAAI,EAAE,CAEb;IAED,IAAI,IAAI,CAAC,KAAK,EAAE,EAAE,EAcjB;IAED,OAAO,CAAC,OAAO,CAAW;IAE1B,IAAI,MAAM,IAAI,MAAM,EAAE,CAErB;IAED,SAAS,CAAC,IAAI,EAAE,GAAG,EAAE,GAAG,CAAC,EAAE,MAAM;IAIjC,UAAU,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAI9B,YAAY,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM,EAAE,KAAK,EAAE,MAAM;IAI/C,WAAW,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAI/B,gBAAgB,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAIpC,gBAAgB,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAIpC,SAAS,CAAC,SAAS,CAAC,EAAE,EAAE,MAAM;IAqC9B,SAAS,CAAC,YAAY,CAAC,EAAE,EAAE,MAAM;IAiBjC,SAAS,CAAC,cAAc,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE,EAAE,MAAM;IA2DpD,OAAO,CAAC,cAAc;IAmBtB,OAAO,CAAC,YAAY;IAIpB,OAAO,CAAC,QAAQ;IAahB,OAAO,CAAC,WAAW;IAOnB,OAAO,CAAC,kBAAkB;IAY1B,OAAO,CAAC,qBAAqB;IAa7B,OAAO,CAAC,cAAc;IAgDtB,OAAO,CAAC,iBAAiB;IAgBzB,OAAO,CAAC,YAAY;IAwBpB,OAAO,CAAC,YAAY;CAOrB"}
\ No newline at end of file
diff --git a/dist/DHTDataBase.js b/dist/DHTDataBase.js
deleted file mode 100644
index 765c2e9..0000000
--- a/dist/DHTDataBase.js
+++ /dev/null
@@ -1,275 +0,0 @@
-"use strict";
-var __importDefault = (this && this.__importDefault) || function (mod) {
- return (mod && mod.__esModule) ? mod : { "default": mod };
-};
-Object.defineProperty(exports, "__esModule", { value: true });
-const events_1 = __importDefault(require("events"));
-// @ts-ignore
-const jsnetworkx_1 = require("jsnetworkx");
-// @ts-ignore
-const ordered_json_1 = __importDefault(require("ordered-json"));
-// @ts-ignore
-const hypercore_crypto_1 = __importDefault(require("hypercore-crypto"));
-const b4a_1 = __importDefault(require("b4a"));
-const messages_js_1 = require("./messages.js");
-const debug_1 = __importDefault(require("debug"));
-class DHTDataBase extends events_1.default {
- swarm;
- id;
- bootstrapped;
- graph;
- connectedTo;
- constructor(id, { swarm } = {}) {
- super();
- if (!id)
- throw new TypeError("Must provide id for self");
- this.id = id;
- this.bootstrapped = false;
- this.graph = new jsnetworkx_1.DiGraph();
- this.connectedTo = new Set();
- this._data = {};
- this._online = [this._maybeHexify(this.id)];
- this.swarm = swarm;
- }
- _data;
- get data() {
- return { ...this._data };
- }
- set data(value) {
- this._data = value;
- const timestamp = BigInt(Date.now());
- const rawData = ordered_json_1.default.stringify(value);
- const signature = hypercore_crypto_1.default.sign(b4a_1.default.from(`${timestamp}${rawData}`), this.swarm.keyPair.secretKey);
- this._setPeer(this.id, value, timestamp, signature);
- this._broadcastData();
- }
- _online;
- get online() {
- return this._online;
- }
- broadcast(data, ttl) {
- throw new TypeError("Broadcast has not been implemented");
- }
- getPeerRaw(id) {
- return this.graph.node.get(this._maybeHexify(id));
- }
- getPeerField(id, field) {
- return this.getPeerRaw(id)?.[field];
- }
- getPeerData(id) {
- return this.getPeerField(id, "data");
- }
- getPeerTimestamp(id) {
- return this.getPeerField(id, "timestamp");
- }
- getPeerSignature(id) {
- return this.getPeerField(id, "signature");
- }
- onAddPeer(id) {
- const stringId = id.toString("hex");
- if (this.connectedTo.has(stringId)) {
- return;
- } // Already know we're connected here
- this.connectedTo.add(stringId);
- this._addPeerConnection(this.id, id);
- this.emit("peer-add", id);
- this._recalculate();
- this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
- type: messages_js_1.Type.CONNECTED,
- id,
- })));
- this._broadcastData();
- if (this.bootstrapped) {
- return;
- }
- // If this is the first person we've met, get their graph
- this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
- type: messages_js_1.Type.BOOTSTRAP_REQUEST,
- })), 0);
- }
- onRemovePeer(id) {
- this.connectedTo.delete(id.toString("hex"));
- this._removePeerConnection(this.id, id);
- this.emit("peer-remove");
- this._recalculate();
- this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
- type: messages_js_1.Type.DISCONNECTED,
- id,
- })));
- }
- onGetBroadcast(message, id) {
- let decoded;
- try {
- decoded = messages_js_1.Message.fromBinary(message);
- }
- catch {
- return;
- }
- const { type } = decoded;
- if (!type) {
- throw new Error("Missing Type In Message");
- }
- if (type === messages_js_1.Type.STATE) {
- const { data: rawData, timestamp, signature } = decoded;
- if (signature &&
- hypercore_crypto_1.default.verify(b4a_1.default.from(`${timestamp}${rawData}`), signature, id)) {
- if ((timestamp || 0) <= this.getPeerTimestamp(id)) {
- (0, debug_1.default)(`Received old data for peer ${id}`);
- return;
- }
- const data = rawData ? ordered_json_1.default.parse(rawData) : null;
- this._setPeer(id, data, timestamp, signature);
- this.emit("peer-data", data, id);
- this._recalculate();
- return;
- }
- (0, debug_1.default)(`Invalid signature received for peer ${id}`);
- }
- else if (type === messages_js_1.Type.CONNECTED) {
- const { id: toId } = decoded;
- this._addPeerConnection(id, Buffer.from(toId));
- this.emit("peer-add-seen", id, toId);
- this._recalculate();
- }
- else if (type === messages_js_1.Type.DISCONNECTED) {
- const { id: toId } = decoded;
- this._removePeerConnection(id, Buffer.from(toId));
- this.emit("peer-remove-seen", id, toId);
- this._recalculate();
- }
- else if (type === messages_js_1.Type.BOOTSTRAP_REQUEST) {
- const bootstrap = this._getBootstrapInfo();
- this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
- type: messages_js_1.Type.BOOTSTRAP_RESPONSE,
- bootstrap,
- })), 0);
- }
- else if (type === messages_js_1.Type.BOOTSTRAP_RESPONSE) {
- const { bootstrap } = decoded;
- this._bootstrapFrom(bootstrap);
- }
- }
- _broadcastData() {
- const rawData = this._data;
- if (!Object.keys(rawData).length) {
- return;
- }
- const data = ordered_json_1.default.stringify(rawData);
- const { timestamp, signature } = this.getPeerRaw(this.id);
- this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
- type: messages_js_1.Type.STATE,
- data: b4a_1.default.from(data),
- signature,
- timestamp,
- })));
- }
- _hasSeenPeer(id) {
- return this.graph.hasNode(this._maybeHexify(id));
- }
- _setPeer(id, data, timestamp, signature) {
- this.graph.addNode(this._maybeHexify(id), {
- timestamp,
- signature,
- data,
- });
- }
- _ensurePeer(id) {
- id = this._maybeHexify(id);
- if (!this._hasSeenPeer(id)) {
- this._setPeer(id, {});
- }
- }
- _addPeerConnection(origin, destination) {
- this._ensurePeer(origin);
- this._ensurePeer(destination);
- this.graph.addEdge(this._maybeHexify(origin), this._maybeHexify(destination));
- }
- _removePeerConnection(origin, destination) {
- try {
- this._ensurePeer(origin);
- this._ensurePeer(destination);
- this.graph.removeEdge(origin.toString("hex"), destination.toString("hex"));
- }
- catch (e) {
- if (e.name !== "JSNetworkXError")
- throw e;
- }
- }
- _bootstrapFrom(bootstrap) {
- if (this.bootstrapped) {
- return;
- }
- for (const id in bootstrap) {
- const { data, connectedTo, signature, timestamp } = bootstrap[id];
- if (id === this.id.toString("hex")) {
- continue;
- }
- if (signature &&
- hypercore_crypto_1.default.verify(b4a_1.default.from(`${timestamp}${data}`), signature, b4a_1.default.from(id, "hex"))) {
- const parsedData = data ? ordered_json_1.default.parse(data) : null;
- let peerData = parsedData || {};
- // If we're already tracking them
- if (this._hasSeenPeer(id)) {
- // Ensure we don't have old data
- if ((timestamp || 0) > this.getPeerTimestamp(id)) {
- // See what data we already have for them
- // Add their existing data to what we got from the bootstrap
- const existingPeerData = this.getPeerData(id);
- peerData = { ...existingPeerData, ...peerData };
- this._setPeer(id, peerData, timestamp, signature);
- }
- else {
- (0, debug_1.default)(`Received old data for peer ${id}`);
- }
- }
- }
- else {
- (0, debug_1.default)(`Invalid signature received for peer ${id}`);
- }
- for (const connection of connectedTo) {
- this._addPeerConnection(id, Buffer.from(connection));
- }
- }
- this.emit("bootstrapped");
- this._recalculate();
- }
- _getBootstrapInfo() {
- const state = {};
- for (const [id, rawData] of this.graph.nodes(true)) {
- const connectedTo = this.graph
- .neighbors(id)
- .map((id) => Buffer.from(id, "hex"));
- const data = rawData ? ordered_json_1.default.stringify(rawData?.data) : null;
- const { timestamp = undefined, signature = undefined } = rawData;
- state[id] = { data: b4a_1.default.from(data), connectedTo, timestamp, signature };
- }
- return state;
- }
- // Calculate who's online and emit an event
- _recalculate() {
- const online = this.graph.nodes().filter((id) => {
- return (0, jsnetworkx_1.hasPath)(this.graph, {
- source: this._maybeHexify(this.id),
- target: id,
- });
- });
- const offline = this.graph.nodes().filter((id) => {
- return !(0, jsnetworkx_1.hasPath)(this.graph, {
- source: this._maybeHexify(this.id),
- target: id,
- });
- });
- for (const id of offline) {
- this.graph.removeNode(id);
- }
- this._online = online;
- this.emit("online", online);
- }
- _maybeHexify(data) {
- if (b4a_1.default.isBuffer(data)) {
- return data.toString("hex");
- }
- return data;
- }
-}
-exports.default = DHTDataBase;
diff --git a/dist/index.d.ts b/dist/index.d.ts
deleted file mode 100644
index fcf8473..0000000
--- a/dist/index.d.ts
+++ /dev/null
@@ -1,14 +0,0 @@
-///
-import DHTDataBase from "./DHTDataBase.js";
-export default class DHTData extends DHTDataBase {
- private flood;
- constructor(swarm: any, { id, data, ...opts }?: {
- id?: Buffer;
- data?: {};
- [key: string]: any;
- });
- handlePeerAdd(peer: any): void;
- handlePeerRemove(peer: any): void;
- broadcast(message: any, ttl?: number): void;
-}
-//# sourceMappingURL=index.d.ts.map
\ No newline at end of file
diff --git a/dist/index.d.ts.map b/dist/index.d.ts.map
deleted file mode 100644
index b0179e5..0000000
--- a/dist/index.d.ts.map
+++ /dev/null
@@ -1 +0,0 @@
-{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":";AAAA,OAAO,WAAW,MAAM,kBAAkB,CAAC;AAK3C,MAAM,CAAC,OAAO,OAAO,OAAQ,SAAQ,WAAW;IAC9C,OAAO,CAAC,KAAK,CAAW;gBAGtB,KAAK,EAAE,GAAG,EACV,EACE,EAA4B,EAC5B,IAAS,EACT,GAAG,IAAI,EACR,GAAE;QAAE,EAAE,CAAC,EAAE,MAAM,CAAC;QAAC,IAAI,CAAC,EAAE,EAAE,CAAC;QAAC,CAAC,GAAG,EAAE,MAAM,GAAG,GAAG,CAAA;KAAO;IAkBxD,aAAa,CAAC,IAAI,EAAE,GAAG;IAKvB,gBAAgB,CAAC,IAAI,EAAE,GAAG;IAW1B,SAAS,CAAC,OAAO,EAAE,GAAG,EAAE,GAAG,CAAC,EAAE,MAAM;CAGrC"}
\ No newline at end of file
diff --git a/dist/index.js b/dist/index.js
deleted file mode 100644
index c1e04dc..0000000
--- a/dist/index.js
+++ /dev/null
@@ -1,39 +0,0 @@
-"use strict";
-var __importDefault = (this && this.__importDefault) || function (mod) {
- return (mod && mod.__esModule) ? mod : { "default": mod };
-};
-Object.defineProperty(exports, "__esModule", { value: true });
-const DHTDataBase_js_1 = __importDefault(require("./DHTDataBase.js"));
-const dht_flood_1 = __importDefault(require("@lumeweb/dht-flood"));
-const DISCONNECT_SMOOTH = 500;
-class DHTData extends DHTDataBase_js_1.default {
- flood;
- constructor(swarm, { id = swarm.keyPair.publicKey, data = {}, ...opts } = {}) {
- super(id, { swarm, ...opts });
- this.flood = new dht_flood_1.default({ id, swarm, ...opts });
- this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer));
- this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer));
- this.flood.on("message", (message, id) => this.onGetBroadcast(message, id));
- this.swarm.on("connection", (peer) => this.flood.send(peer, Buffer.from("hello"), 0));
- this.data = data;
- [...this.swarm.peers.values()].forEach(this.handlePeerAdd.bind(this));
- }
- handlePeerAdd(peer) {
- const id = peer.remotePublicKey;
- this.onAddPeer(id);
- }
- handlePeerRemove(peer) {
- const id = peer.remotePublicKey;
- // Wait for a bit and check if we're still disconnected before removing the peer
- setTimeout(() => {
- if (this.swarm._allConnections.has(id)) {
- return;
- }
- this.onRemovePeer(id);
- }, DISCONNECT_SMOOTH);
- }
- broadcast(message, ttl) {
- this.flood.broadcast(message, ttl);
- }
-}
-exports.default = DHTData;
diff --git a/dist/messages.d.ts b/dist/messages.d.ts
deleted file mode 100644
index 2fd39df..0000000
--- a/dist/messages.d.ts
+++ /dev/null
@@ -1,110 +0,0 @@
-import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
-import type { IBinaryWriter } from "@protobuf-ts/runtime";
-import type { BinaryReadOptions } from "@protobuf-ts/runtime";
-import type { IBinaryReader } from "@protobuf-ts/runtime";
-import type { PartialMessage } from "@protobuf-ts/runtime";
-import { MessageType } from "@protobuf-ts/runtime";
-/**
- * @generated from protobuf message Message
- */
-export interface Message {
- /**
- * @generated from protobuf field: Type type = 1;
- */
- type: Type;
- /**
- * @generated from protobuf field: map bootstrap = 2;
- */
- bootstrap: {
- [key: string]: State;
- };
- /**
- * @generated from protobuf field: optional bytes data = 3;
- */
- data?: Uint8Array;
- /**
- * @generated from protobuf field: optional bytes signature = 4;
- */
- signature?: Uint8Array;
- /**
- * @generated from protobuf field: optional int64 timestamp = 5;
- */
- timestamp?: bigint;
- /**
- * @generated from protobuf field: optional bytes id = 6;
- */
- id?: Uint8Array;
-}
-/**
- * @generated from protobuf message State
- */
-export interface State {
- /**
- * @generated from protobuf field: repeated bytes connectedTo = 1;
- */
- connectedTo: Uint8Array[];
- /**
- * @generated from protobuf field: optional bytes data = 2;
- */
- data?: Uint8Array;
- /**
- * @generated from protobuf field: optional bytes signature = 3;
- */
- signature?: Uint8Array;
- /**
- * @generated from protobuf field: optional int64 timestamp = 4;
- */
- timestamp?: bigint;
-}
-/**
- * @generated from protobuf enum Type
- */
-export declare enum Type {
- /**
- * @generated synthetic value - protobuf-ts requires all enums to have a 0 value
- */
- UNSPECIFIED$ = 0,
- /**
- * @generated from protobuf enum value: BOOTSTRAP_REQUEST = 1;
- */
- BOOTSTRAP_REQUEST = 1,
- /**
- * @generated from protobuf enum value: BOOTSTRAP_RESPONSE = 2;
- */
- BOOTSTRAP_RESPONSE = 2,
- /**
- * @generated from protobuf enum value: CONNECTED = 3;
- */
- CONNECTED = 3,
- /**
- * @generated from protobuf enum value: DISCONNECTED = 4;
- */
- DISCONNECTED = 4,
- /**
- * @generated from protobuf enum value: STATE = 5;
- */
- STATE = 5
-}
-declare class Message$Type extends MessageType {
- constructor();
- create(value?: PartialMessage): Message;
- internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: Message): Message;
- private binaryReadMap2;
- internalBinaryWrite(message: Message, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter;
-}
-/**
- * @generated MessageType for protobuf message Message
- */
-export declare const Message: Message$Type;
-declare class State$Type extends MessageType {
- constructor();
- create(value?: PartialMessage): State;
- internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: State): State;
- internalBinaryWrite(message: State, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter;
-}
-/**
- * @generated MessageType for protobuf message State
- */
-export declare const State: State$Type;
-export {};
-//# sourceMappingURL=messages.d.ts.map
\ No newline at end of file
diff --git a/dist/messages.d.ts.map b/dist/messages.d.ts.map
deleted file mode 100644
index 4bf0559..0000000
--- a/dist/messages.d.ts.map
+++ /dev/null
@@ -1 +0,0 @@
-{"version":3,"file":"messages.d.ts","sourceRoot":"","sources":["../src/messages.ts"],"names":[],"mappings":"AAGA,OAAO,KAAK,EAAE,kBAAkB,EAAE,MAAM,sBAAsB,CAAC;AAC/D,OAAO,KAAK,EAAE,aAAa,EAAE,MAAM,sBAAsB,CAAC;AAE1D,OAAO,KAAK,EAAE,iBAAiB,EAAE,MAAM,sBAAsB,CAAC;AAC9D,OAAO,KAAK,EAAE,aAAa,EAAE,MAAM,sBAAsB,CAAC;AAE1D,OAAO,KAAK,EAAE,cAAc,EAAE,MAAM,sBAAsB,CAAC;AAG3D,OAAO,EAAE,WAAW,EAAE,MAAM,sBAAsB,CAAC;AACnD;;GAEG;AACH,MAAM,WAAW,OAAO;IACpB;;OAEG;IACH,IAAI,EAAE,IAAI,CAAC;IACX;;OAEG;IACH,SAAS,EAAE;QACP,CAAC,GAAG,EAAE,MAAM,GAAG,KAAK,CAAC;KACxB,CAAC;IACF;;OAEG;IACH,IAAI,CAAC,EAAE,UAAU,CAAC;IAClB;;OAEG;IACH,SAAS,CAAC,EAAE,UAAU,CAAC;IACvB;;OAEG;IACH,SAAS,CAAC,EAAE,MAAM,CAAC;IACnB;;OAEG;IACH,EAAE,CAAC,EAAE,UAAU,CAAC;CACnB;AACD;;GAEG;AACH,MAAM,WAAW,KAAK;IAClB;;OAEG;IACH,WAAW,EAAE,UAAU,EAAE,CAAC;IAC1B;;OAEG;IACH,IAAI,CAAC,EAAE,UAAU,CAAC;IAClB;;OAEG;IACH,SAAS,CAAC,EAAE,UAAU,CAAC;IACvB;;OAEG;IACH,SAAS,CAAC,EAAE,MAAM,CAAC;CACtB;AACD;;GAEG;AACH,oBAAY,IAAI;IACZ;;OAEG;IACH,YAAY,IAAI;IAChB;;OAEG;IACH,iBAAiB,IAAI;IACrB;;OAEG;IACH,kBAAkB,IAAI;IACtB;;OAEG;IACH,SAAS,IAAI;IACb;;OAEG;IACH,YAAY,IAAI;IAChB;;OAEG;IACH,KAAK,IAAI;CACZ;AAED,cAAM,YAAa,SAAQ,WAAW,CAAC,OAAO,CAAC;;IAW3C,MAAM,CAAC,KAAK,CAAC,EAAE,cAAc,CAAC,OAAO,CAAC,GAAG,OAAO;IAOhD,kBAAkB,CAAC,MAAM,EAAE,aAAa,EAAE,MAAM,EAAE,MAAM,EAAE,OAAO,EAAE,iBAAiB,EAAE,MAAM,CAAC,EAAE,OAAO,GAAG,OAAO;IAkChH,OAAO,CAAC,cAAc;IAgBtB,mBAAmB,CAAC,OAAO,EAAE,OAAO,EAAE,MAAM,EAAE,aAAa,EAAE,OAAO,EAAE,kBAAkB,GAAG,aAAa;CA4B3G;AACD;;GAEG;AACH,eAAO,MAAM,OAAO,cAAqB,CAAC;AAE1C,cAAM,UAAW,SAAQ,WAAW,CAAC,KAAK,CAAC;;IASvC,MAAM,CAAC,KAAK,CAAC,EAAE,cAAc,CAAC,KAAK,CAAC,GAAG,KAAK;IAO5C,kBAAkB,CAAC,MAAM,EAAE,aAAa,EAAE,MAAM,EAAE,MAAM,EAAE,OAAO,EAAE,iBAAiB,EAAE,MAAM,CAAC,EAAE,KAAK,GAAG,KAAK;IA4B5G,mBAAmB,CAAC,OAAO,EAAE,KAAK,EAAE,MAAM,EAAE,aAAa,EAAE,OAAO,EAAE,kBAAkB,GAAG,aAAa;CAkBzG;AACD;;GAEG;AACH,eAAO,MAAM,KAAK,YAAmB,CAAC"}
\ No newline at end of file
diff --git a/dist/messages.js b/dist/messages.js
deleted file mode 100644
index 61bd989..0000000
--- a/dist/messages.js
+++ /dev/null
@@ -1,208 +0,0 @@
-"use strict";
-Object.defineProperty(exports, "__esModule", { value: true });
-exports.State = exports.Message = exports.Type = void 0;
-const runtime_1 = require("@protobuf-ts/runtime");
-const runtime_2 = require("@protobuf-ts/runtime");
-const runtime_3 = require("@protobuf-ts/runtime");
-const runtime_4 = require("@protobuf-ts/runtime");
-const runtime_5 = require("@protobuf-ts/runtime");
-/**
- * @generated from protobuf enum Type
- */
-var Type;
-(function (Type) {
- /**
- * @generated synthetic value - protobuf-ts requires all enums to have a 0 value
- */
- Type[Type["UNSPECIFIED$"] = 0] = "UNSPECIFIED$";
- /**
- * @generated from protobuf enum value: BOOTSTRAP_REQUEST = 1;
- */
- Type[Type["BOOTSTRAP_REQUEST"] = 1] = "BOOTSTRAP_REQUEST";
- /**
- * @generated from protobuf enum value: BOOTSTRAP_RESPONSE = 2;
- */
- Type[Type["BOOTSTRAP_RESPONSE"] = 2] = "BOOTSTRAP_RESPONSE";
- /**
- * @generated from protobuf enum value: CONNECTED = 3;
- */
- Type[Type["CONNECTED"] = 3] = "CONNECTED";
- /**
- * @generated from protobuf enum value: DISCONNECTED = 4;
- */
- Type[Type["DISCONNECTED"] = 4] = "DISCONNECTED";
- /**
- * @generated from protobuf enum value: STATE = 5;
- */
- Type[Type["STATE"] = 5] = "STATE";
-})(Type = exports.Type || (exports.Type = {}));
-// @generated message type with reflection information, may provide speed optimized methods
-class Message$Type extends runtime_5.MessageType {
- constructor() {
- super("Message", [
- { no: 1, name: "type", kind: "enum", T: () => ["Type", Type] },
- { no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => exports.State } },
- { no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
- { no: 4, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
- { no: 5, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ },
- { no: 6, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
- ]);
- }
- create(value) {
- const message = { type: 0, bootstrap: {} };
- globalThis.Object.defineProperty(message, runtime_4.MESSAGE_TYPE, { enumerable: false, value: this });
- if (value !== undefined)
- (0, runtime_3.reflectionMergePartial)(this, message, value);
- return message;
- }
- internalBinaryRead(reader, length, options, target) {
- let message = target ?? this.create(), end = reader.pos + length;
- while (reader.pos < end) {
- let [fieldNo, wireType] = reader.tag();
- switch (fieldNo) {
- case /* Type type */ 1:
- message.type = reader.int32();
- break;
- case /* map bootstrap */ 2:
- this.binaryReadMap2(message.bootstrap, reader, options);
- break;
- case /* optional bytes data */ 3:
- message.data = reader.bytes();
- break;
- case /* optional bytes signature */ 4:
- message.signature = reader.bytes();
- break;
- case /* optional int64 timestamp */ 5:
- message.timestamp = reader.int64().toBigInt();
- break;
- case /* optional bytes id */ 6:
- message.id = reader.bytes();
- break;
- default:
- let u = options.readUnknownField;
- if (u === "throw")
- throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
- let d = reader.skip(wireType);
- if (u !== false)
- (u === true ? runtime_2.UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
- }
- }
- return message;
- }
- binaryReadMap2(map, reader, options) {
- let len = reader.uint32(), end = reader.pos + len, key, val;
- while (reader.pos < end) {
- let [fieldNo, wireType] = reader.tag();
- switch (fieldNo) {
- case 1:
- key = reader.string();
- break;
- case 2:
- val = exports.State.internalBinaryRead(reader, reader.uint32(), options);
- break;
- default: throw new globalThis.Error("unknown map entry field for field Message.bootstrap");
- }
- }
- map[key ?? ""] = val ?? exports.State.create();
- }
- internalBinaryWrite(message, writer, options) {
- /* Type type = 1; */
- if (message.type !== 0)
- writer.tag(1, runtime_1.WireType.Varint).int32(message.type);
- /* map bootstrap = 2; */
- for (let k of Object.keys(message.bootstrap)) {
- writer.tag(2, runtime_1.WireType.LengthDelimited).fork().tag(1, runtime_1.WireType.LengthDelimited).string(k);
- writer.tag(2, runtime_1.WireType.LengthDelimited).fork();
- exports.State.internalBinaryWrite(message.bootstrap[k], writer, options);
- writer.join().join();
- }
- /* optional bytes data = 3; */
- if (message.data !== undefined)
- writer.tag(3, runtime_1.WireType.LengthDelimited).bytes(message.data);
- /* optional bytes signature = 4; */
- if (message.signature !== undefined)
- writer.tag(4, runtime_1.WireType.LengthDelimited).bytes(message.signature);
- /* optional int64 timestamp = 5; */
- if (message.timestamp !== undefined)
- writer.tag(5, runtime_1.WireType.Varint).int64(message.timestamp);
- /* optional bytes id = 6; */
- if (message.id !== undefined)
- writer.tag(6, runtime_1.WireType.LengthDelimited).bytes(message.id);
- let u = options.writeUnknownFields;
- if (u !== false)
- (u == true ? runtime_2.UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
- return writer;
- }
-}
-/**
- * @generated MessageType for protobuf message Message
- */
-exports.Message = new Message$Type();
-// @generated message type with reflection information, may provide speed optimized methods
-class State$Type extends runtime_5.MessageType {
- constructor() {
- super("State", [
- { no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ },
- { no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
- { no: 3, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
- { no: 4, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ }
- ]);
- }
- create(value) {
- const message = { connectedTo: [] };
- globalThis.Object.defineProperty(message, runtime_4.MESSAGE_TYPE, { enumerable: false, value: this });
- if (value !== undefined)
- (0, runtime_3.reflectionMergePartial)(this, message, value);
- return message;
- }
- internalBinaryRead(reader, length, options, target) {
- let message = target ?? this.create(), end = reader.pos + length;
- while (reader.pos < end) {
- let [fieldNo, wireType] = reader.tag();
- switch (fieldNo) {
- case /* repeated bytes connectedTo */ 1:
- message.connectedTo.push(reader.bytes());
- break;
- case /* optional bytes data */ 2:
- message.data = reader.bytes();
- break;
- case /* optional bytes signature */ 3:
- message.signature = reader.bytes();
- break;
- case /* optional int64 timestamp */ 4:
- message.timestamp = reader.int64().toBigInt();
- break;
- default:
- let u = options.readUnknownField;
- if (u === "throw")
- throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
- let d = reader.skip(wireType);
- if (u !== false)
- (u === true ? runtime_2.UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
- }
- }
- return message;
- }
- internalBinaryWrite(message, writer, options) {
- /* repeated bytes connectedTo = 1; */
- for (let i = 0; i < message.connectedTo.length; i++)
- writer.tag(1, runtime_1.WireType.LengthDelimited).bytes(message.connectedTo[i]);
- /* optional bytes data = 2; */
- if (message.data !== undefined)
- writer.tag(2, runtime_1.WireType.LengthDelimited).bytes(message.data);
- /* optional bytes signature = 3; */
- if (message.signature !== undefined)
- writer.tag(3, runtime_1.WireType.LengthDelimited).bytes(message.signature);
- /* optional int64 timestamp = 4; */
- if (message.timestamp !== undefined)
- writer.tag(4, runtime_1.WireType.Varint).int64(message.timestamp);
- let u = options.writeUnknownFields;
- if (u !== false)
- (u == true ? runtime_2.UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
- return writer;
- }
-}
-/**
- * @generated MessageType for protobuf message State
- */
-exports.State = new State$Type();
diff --git a/messages.proto b/messages.proto
index f3737a1..815754c 100644
--- a/messages.proto
+++ b/messages.proto
@@ -5,7 +5,8 @@ enum Type {
BOOTSTRAP_RESPONSE = 2;
CONNECTED = 3;
DISCONNECTED = 4;
- STATE = 5;
+ ADD_ITEM = 5;
+ REMOVE_ITEM = 6;
}
message Message {
@@ -13,13 +14,9 @@ message Message {
map bootstrap = 2; // For bootstrap events
optional bytes data = 3; // For state event
optional bytes signature = 4;
- optional int64 timestamp = 5;
- optional bytes id = 6; // For connected and disconnected events
+ optional bytes id = 5; // For connected and disconnected events
}
message State {
repeated bytes connectedTo = 1;
- optional bytes data = 2;
- optional bytes signature = 3;
- optional int64 timestamp = 4;
}
diff --git a/package.json b/package.json
index 29d7d04..1f6a041 100644
--- a/package.json
+++ b/package.json
@@ -1,5 +1,5 @@
{
- "name": "@lumeweb/dht-data",
+ "name": "@lumeweb/dht-cache",
"type": "commonjs",
"version": "0.1.0",
"main": "dist/index.js",
diff --git a/src/DHTDataBase.ts b/src/DHTDataBase.ts
deleted file mode 100644
index 43c036a..0000000
--- a/src/DHTDataBase.ts
+++ /dev/null
@@ -1,366 +0,0 @@
-import EventEmitter from "events";
-// @ts-ignore
-import { DiGraph, hasPath } from "jsnetworkx";
-// @ts-ignore
-import orderedJSON from "ordered-json";
-// @ts-ignore
-import crypto from "hypercore-crypto";
-import b4a from "b4a";
-import { Message, State, Type } from "./messages.js";
-// @ts-ignore
-import sodium from "sodium-universal";
-import debug from "debug";
-
-type Bootstrap = {
- [key: string]: State;
-};
-
-export default class DHTDataBase extends EventEmitter {
- protected swarm: any;
- private id: Buffer;
- private bootstrapped: boolean;
- private graph: any;
- private connectedTo: Set;
-
- constructor(id: Buffer, { swarm }: { swarm?: any } = {}) {
- super();
- if (!id) throw new TypeError("Must provide id for self");
-
- this.id = id;
- this.bootstrapped = false;
- this.graph = new DiGraph();
- this.connectedTo = new Set();
- this._data = {};
- this._online = [this._maybeHexify(this.id)];
- this.swarm = swarm;
- }
-
- private _data: {};
-
- get data(): {} {
- return { ...this._data };
- }
-
- set data(value: {}) {
- this._data = value;
-
- const timestamp = BigInt(Date.now());
-
- const rawData = orderedJSON.stringify(value);
-
- const signature = crypto.sign(
- b4a.from(`${timestamp}${rawData}`),
- this.swarm.keyPair.secretKey
- );
-
- this._setPeer(this.id, value, timestamp, signature);
- this._broadcastData();
- }
-
- private _online: string[];
-
- get online(): string[] {
- return this._online;
- }
-
- broadcast(data: any, ttl?: number) {
- throw new TypeError("Broadcast has not been implemented");
- }
-
- getPeerRaw(id: Buffer | string) {
- return this.graph.node.get(this._maybeHexify(id));
- }
-
- getPeerField(id: Buffer | string, field: string) {
- return this.getPeerRaw(id)?.[field];
- }
-
- getPeerData(id: Buffer | string) {
- return this.getPeerField(id, "data");
- }
-
- getPeerTimestamp(id: Buffer | string) {
- return this.getPeerField(id, "timestamp");
- }
-
- getPeerSignature(id: Buffer | string) {
- return this.getPeerField(id, "signature");
- }
-
- protected onAddPeer(id: Buffer) {
- const stringId = id.toString("hex");
- if (this.connectedTo.has(stringId)) {
- return;
- } // Already know we're connected here
-
- this.connectedTo.add(stringId);
- this._addPeerConnection(this.id, id);
- this.emit("peer-add", id);
-
- this._recalculate();
-
- this.broadcast(
- Message.toBinary(
- Message.create({
- type: Type.CONNECTED,
- id,
- })
- )
- );
- this._broadcastData();
-
- if (this.bootstrapped) {
- return;
- }
-
- // If this is the first person we've met, get their graph
- this.broadcast(
- Message.toBinary(
- Message.create({
- type: Type.BOOTSTRAP_REQUEST,
- })
- ),
- 0
- );
- }
-
- protected onRemovePeer(id: Buffer) {
- this.connectedTo.delete(id.toString("hex"));
- this._removePeerConnection(this.id, id);
- this.emit("peer-remove");
-
- this._recalculate();
-
- this.broadcast(
- Message.toBinary(
- Message.create({
- type: Type.DISCONNECTED,
- id,
- })
- )
- );
- }
-
- protected onGetBroadcast(message: Buffer, id: Buffer) {
- let decoded;
- try {
- decoded = Message.fromBinary(message);
- } catch {
- return;
- }
-
- const { type } = decoded;
- if (!type) {
- throw new Error("Missing Type In Message");
- }
- if (type === Type.STATE) {
- const { data: rawData, timestamp, signature } = decoded;
-
- if (
- signature &&
- crypto.verify(b4a.from(`${timestamp}${rawData}`), signature, id)
- ) {
- if ((timestamp || 0) <= this.getPeerTimestamp(id)) {
- debug(`Received old data for peer ${id}`);
- return;
- }
-
- const data = rawData ? orderedJSON.parse(rawData) : null;
- this._setPeer(id, data, timestamp, signature);
- this.emit("peer-data", data, id);
- this._recalculate();
- return;
- }
-
- debug(`Invalid signature received for peer ${id}`);
- } else if (type === Type.CONNECTED) {
- const { id: toId } = decoded;
- this._addPeerConnection(id, Buffer.from(toId as Uint8Array));
- this.emit("peer-add-seen", id, toId);
- this._recalculate();
- } else if (type === Type.DISCONNECTED) {
- const { id: toId } = decoded;
- this._removePeerConnection(id, Buffer.from(toId as Uint8Array));
- this.emit("peer-remove-seen", id, toId);
- this._recalculate();
- } else if (type === Type.BOOTSTRAP_REQUEST) {
- const bootstrap = this._getBootstrapInfo();
- this.broadcast(
- Message.toBinary(
- Message.create({
- type: Type.BOOTSTRAP_RESPONSE,
- bootstrap,
- })
- ),
- 0
- );
- } else if (type === Type.BOOTSTRAP_RESPONSE) {
- const { bootstrap } = decoded;
- this._bootstrapFrom(bootstrap);
- }
- }
-
- private _broadcastData() {
- const rawData = this._data;
- if (!Object.keys(rawData).length) {
- return;
- }
- const data = orderedJSON.stringify(rawData);
- const { timestamp, signature } = this.getPeerRaw(this.id);
- this.broadcast(
- Message.toBinary(
- Message.create({
- type: Type.STATE,
- data: b4a.from(data),
- signature,
- timestamp,
- })
- )
- );
- }
-
- private _hasSeenPeer(id: Buffer | string) {
- return this.graph.hasNode(this._maybeHexify(id));
- }
-
- private _setPeer(
- id: Buffer | string,
- data: any,
- timestamp?: BigInt,
- signature?: Uint8Array
- ) {
- this.graph.addNode(this._maybeHexify(id), {
- timestamp,
- signature,
- data,
- });
- }
-
- private _ensurePeer(id: Buffer | string) {
- id = this._maybeHexify(id);
- if (!this._hasSeenPeer(id)) {
- this._setPeer(id, {});
- }
- }
-
- private _addPeerConnection(
- origin: Buffer | string,
- destination: Buffer | string
- ) {
- this._ensurePeer(origin);
- this._ensurePeer(destination);
- this.graph.addEdge(
- this._maybeHexify(origin),
- this._maybeHexify(destination)
- );
- }
-
- private _removePeerConnection(origin: Buffer, destination: Buffer) {
- try {
- this._ensurePeer(origin);
- this._ensurePeer(destination);
- this.graph.removeEdge(
- origin.toString("hex"),
- destination.toString("hex")
- );
- } catch (e: any) {
- if (e.name !== "JSNetworkXError") throw e;
- }
- }
-
- private _bootstrapFrom(bootstrap: Bootstrap) {
- if (this.bootstrapped) {
- return;
- }
-
- for (const id in bootstrap) {
- const { data, connectedTo, signature, timestamp } = bootstrap[id];
- if (id === this.id.toString("hex")) {
- continue;
- }
-
- if (
- signature &&
- crypto.verify(
- b4a.from(`${timestamp}${data}`),
- signature,
- b4a.from(id, "hex")
- )
- ) {
- const parsedData = data ? orderedJSON.parse(data) : null;
- let peerData = parsedData || {};
-
- // If we're already tracking them
- if (this._hasSeenPeer(id)) {
- // Ensure we don't have old data
- if ((timestamp || 0) > this.getPeerTimestamp(id)) {
- // See what data we already have for them
- // Add their existing data to what we got from the bootstrap
- const existingPeerData = this.getPeerData(id);
- peerData = { ...existingPeerData, ...peerData };
- this._setPeer(id, peerData, timestamp, signature);
- } else {
- debug(`Received old data for peer ${id}`);
- }
- }
- } else {
- debug(`Invalid signature received for peer ${id}`);
- }
- for (const connection of connectedTo) {
- this._addPeerConnection(id, Buffer.from(connection));
- }
- }
-
- this.emit("bootstrapped");
-
- this._recalculate();
- }
-
- private _getBootstrapInfo() {
- const state: Bootstrap = {};
- for (const [id, rawData] of this.graph.nodes(true)) {
- const connectedTo = this.graph
- .neighbors(id)
- .map((id: string) => Buffer.from(id, "hex"));
-
- const data = rawData ? orderedJSON.stringify(rawData?.data) : null;
- const { timestamp = undefined, signature = undefined } = rawData;
- state[id] = { data: b4a.from(data), connectedTo, timestamp, signature };
- }
-
- return state;
- }
-
- // Calculate who's online and emit an event
- private _recalculate() {
- const online = this.graph.nodes().filter((id: string) => {
- return hasPath(this.graph, {
- source: this._maybeHexify(this.id),
- target: id,
- });
- });
-
- const offline = this.graph.nodes().filter((id: string) => {
- return !hasPath(this.graph, {
- source: this._maybeHexify(this.id),
- target: id,
- });
- });
-
- for (const id of offline) {
- this.graph.removeNode(id);
- }
-
- this._online = online;
-
- this.emit("online", online);
- }
-
- private _maybeHexify(data: Buffer | string): string {
- if (b4a.isBuffer(data)) {
- return data.toString("hex");
- }
-
- return data;
- }
-}
diff --git a/src/index.ts b/src/index.ts
index 8408f71..02e2eae 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -1,52 +1,427 @@
-import DHTDataBase from "./DHTDataBase.js";
+import EventEmitter from "events";
+// @ts-ignore
+import { DiGraph, hasPath } from "jsnetworkx";
+// @ts-ignore
+import orderedJSON from "ordered-json";
+// @ts-ignore
+import crypto from "hypercore-crypto";
+import b4a from "b4a";
+import { Message, State, Type } from "./messages.js";
+// @ts-ignore
+import sodium from "sodium-universal";
+import type { PartialMessage } from "@protobuf-ts/runtime";
import DHTFlood from "@lumeweb/dht-flood";
+type Bootstrap = {
+ [key: string]: State;
+};
+
+const EntityType = {
+ PUBKEY: Symbol.for("PUBKEY"),
+ ITEM: Symbol.for("ITEM"),
+};
+
const DISCONNECT_SMOOTH = 500;
-export default class DHTData extends DHTDataBase {
- private flood: DHTFlood;
+export default class DHTCache extends EventEmitter {
+ protected swarm: any;
+ private id: Buffer;
+ private bootstrapped: boolean;
+ private graph: any;
+ private connectedTo: Set;
+
+ protected flood: DHTFlood;
constructor(
swarm: any,
{
id = swarm.keyPair.publicKey,
- data = {},
...opts
- }: { id?: Buffer; data?: {}; [key: string]: any } = {}
+ }: { id?: Buffer; [key: string]: any } = {}
) {
- super(id, { swarm, ...(opts as any) });
+ super();
+ if (!id) throw new TypeError("Must provide id for self");
+
+ this.id = id;
+ this.bootstrapped = false;
+ this.graph = new DiGraph();
+ this.connectedTo = new Set();
+ this._cache = new Set();
+ this._online = new Set([this._maybeHexify(this.id)]);
+ this.swarm = swarm;
this.flood = new DHTFlood({ id, swarm, ...opts });
- this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer));
- this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer));
+ this.flood.on("peer-open", (peer) => this.addPeerHandler(peer));
+ this.flood.on("peer-remove", (peer) => this.removePeerHandler(peer));
this.flood.on("message", (message, id) => this.onGetBroadcast(message, id));
this.swarm.on("connection", (peer: any) =>
this.flood.send(peer, Buffer.from("hello"), 0)
);
- this.data = data;
+ [...this.swarm.peers.values()]
+ .map((item) => {
+ remotePublicKey: item.publicKey;
+ })
+ .filter((item: any) => !!item)
+ .forEach((item) => {
+ this.addPeerHandler(item);
+ });
- [...this.swarm.peers.values()].forEach(this.handlePeerAdd.bind(this));
+ this._ensurePeer(id);
}
- handlePeerAdd(peer: any) {
+ private _cache: Set;
+
+ public get cache(): {} {
+ return [...this._cache].sort();
+ }
+
+ public get all_cache(): string[] {
+ const items = [];
+ for (const id of this.graph.nodesIter()) {
+ const item = this.graph.node.get(id);
+ if (item?.type !== EntityType.ITEM) {
+ continue;
+ }
+ items.push(id);
+ }
+ return items.sort();
+ }
+
+ public addItem(item: string | Buffer) {
+ item = this._maybeHexify(item);
+ this._cache.add(item);
+
+ this._ensureItem(item);
+
+ const broadcast = () => {
+ this._broadcastMessage({
+ type: Type.ADD_ITEM,
+ data: b4a.from(item as string, "hex"),
+ signature: this._signItem(item),
+ });
+ };
+
+ if (!this.bootstrapped) {
+ this.once("bootstrapped", broadcast);
+ } else {
+ broadcast();
+ }
+ }
+
+ private _compileMessage(message: PartialMessage): Uint8Array {
+ return Message.toBinary(Message.create(message));
+ }
+
+ private _broadcastMessage(message: PartialMessage, ttl?: number) {
+ this.broadcast(this._compileMessage(message), ttl);
+ }
+
+ public removeItem(item: string | Buffer): boolean {
+ item = this._maybeHexify(item);
+
+ if (!this._cache.has(item)) {
+ return false;
+ }
+ this._cache.delete(item);
+
+ this._removeEntity(item);
+
+ this._broadcastMessage({
+ type: Type.REMOVE_ITEM,
+ data: b4a.from(item, "hex"),
+ signature: this._signItem(item),
+ });
+
+ return true;
+ }
+
+ private _online: Set;
+
+ public get online(): Set {
+ return this._online;
+ }
+
+ public broadcast(message: any, ttl?: number) {
+ this.flood.broadcast(message, ttl);
+ }
+
+ public send(message: any) {
+ this.flood.send(message, 0);
+ }
+
+ protected addPeerHandler(peer: any) {
const id = peer.remotePublicKey;
- this.onAddPeer(id);
+ const stringId = id.toString("hex");
+ if (this.connectedTo.has(stringId)) {
+ return;
+ } // Already know we're connected here
+
+ this.connectedTo.add(stringId);
+ this._ensurePeer(id);
+ this._addEntityConnection(this.id, id);
+ this.emit("peer-add", id);
+
+ this._recalculate();
+
+ this._broadcastMessage({
+ type: Type.CONNECTED,
+ id,
+ });
+
+ if (this.bootstrapped) {
+ return;
+ }
+
+ // If this is the first person we've met, get their graph
+ this._broadcastMessage({ type: Type.BOOTSTRAP_REQUEST }, 0);
}
- handlePeerRemove(peer: any) {
+ removePeerHandler(peer: any) {
const id = peer.remotePublicKey;
// Wait for a bit and check if we're still disconnected before removing the peer
setTimeout(() => {
if (this.swarm._allConnections.has(id)) {
return;
}
- this.onRemovePeer(id);
+ this.onRemovePeer(peer);
}, DISCONNECT_SMOOTH);
}
- broadcast(message: any, ttl?: number) {
- this.flood.broadcast(message, ttl);
+ protected onRemovePeer(peer: any) {
+ const id = peer.remotePublicKey;
+
+ this.connectedTo.delete(id.toString("hex"));
+
+ this._removeEntityConnection(this.id, id);
+ this.emit("peer-remove");
+
+ this._recalculate();
+
+ this._broadcastMessage(
+ {
+ type: Type.DISCONNECTED,
+ id,
+ },
+ 0
+ );
+ }
+
+ protected onGetBroadcast(message: Buffer, id: Buffer) {
+ let decoded;
+ try {
+ decoded = Message.fromBinary(message);
+ } catch {
+ return;
+ }
+
+ const { type } = decoded;
+ if (!type) {
+ throw new Error("Missing Type In Message");
+ }
+
+ if ([Type.ADD_ITEM, Type.REMOVE_ITEM].includes(type)) {
+ const { data: rawData, signature } = decoded;
+ const bufData = b4a.from(rawData as Uint8Array) as Buffer;
+ if (signature && crypto.verify(bufData, signature, id)) {
+ if (Type.ADD_ITEM === type) {
+ this._ensureItem(bufData);
+ this._addEntityConnection(id, bufData);
+ }
+
+ if (Type.REMOVE_ITEM === type) {
+ this.removeItem(bufData);
+ this._pruneItems();
+ }
+ }
+ } else if (type === Type.CONNECTED) {
+ const { id: toId } = decoded;
+ this._addEntityConnection(id, Buffer.from(toId as Uint8Array));
+ this.emit("peer-add-seen", id, toId);
+ this._recalculate();
+ } else if (type === Type.DISCONNECTED) {
+ const { id: toId } = decoded;
+ this._removeEntityConnection(id, Buffer.from(toId as Uint8Array));
+ this.emit("peer-remove-seen", id, toId);
+ this._recalculate();
+ } else if (type === Type.BOOTSTRAP_REQUEST) {
+ const bootstrap = this._getBootstrapInfo();
+ this.broadcast(
+ Message.toBinary(
+ Message.create({
+ type: Type.BOOTSTRAP_RESPONSE,
+ bootstrap,
+ })
+ ),
+ 0
+ );
+ } else if (type === Type.BOOTSTRAP_RESPONSE) {
+ const { bootstrap } = decoded;
+ this._bootstrapFrom(bootstrap);
+ }
+ }
+
+ private _signItem(item: string | Buffer): Buffer {
+ item = this._maybeHexify(item);
+
+ return crypto.sign(b4a.from(item, "hex"), this.swarm.keyPair.secretKey);
+ }
+
+ private _setEntity(id: Buffer | string, data: any): void {
+ this.graph.addNode(this._maybeHexify(id), data);
+ }
+
+ private _ensureItem(id: Buffer | string) {
+ this._ensureEntity(id, { type: EntityType.ITEM });
+ }
+
+ private _ensurePeer(id: Buffer | string) {
+ this._ensureEntity(id, { type: EntityType.PUBKEY });
+ }
+
+ private _ensureEntity(id: Buffer | string, def = {}) {
+ id = this._maybeHexify(id);
+ if (!this._hasSeenEntity(id)) {
+ this._setEntity(id, def);
+ }
+ }
+
+ private _hasSeenEntity(id: Buffer | string): boolean {
+ return this.graph.hasNode(this._maybeHexify(id));
+ }
+
+ private _addEntityConnection(
+ origin: Buffer | string,
+ destination: Buffer | string
+ ) {
+ this._ensureEntity(origin);
+ this._ensureEntity(destination);
+ this.graph.addEdge(
+ this._maybeHexify(origin),
+ this._maybeHexify(destination)
+ );
+ }
+
+ private _removeEntityConnection(origin: Buffer, destination: Buffer) {
+ try {
+ this._ensureEntity(origin);
+ this._ensureEntity(destination);
+ this.graph.removeEdge(
+ origin.toString("hex"),
+ destination.toString("hex")
+ );
+ } catch (e: any) {
+ if (e.name !== "JSNetworkXError") throw e;
+ }
+ }
+
+ private _removeEntity(id: Buffer | string) {
+ this.graph.removeNode(this._maybeHexify(id));
+ }
+
+ private _bootstrapFrom(bootstrap: Bootstrap) {
+ if (this.bootstrapped) {
+ return;
+ }
+
+ for (const id in bootstrap) {
+ const { connectedTo } = bootstrap[id];
+ if (id === this.id.toString("hex")) {
+ continue;
+ }
+
+ for (const connection of connectedTo) {
+ const peer = b4a.from(connection) as Buffer;
+ this._ensurePeer(peer);
+ this._addEntityConnection(id, peer);
+ }
+ }
+
+ this.emit("bootstrapped");
+
+ this._recalculate();
+ }
+
+ private _getBootstrapInfo() {
+ const state: Bootstrap = {};
+ for (const id of this.graph.nodesIter()) {
+ const item = this.graph.node.get(id);
+ if (item?.type !== EntityType.PUBKEY) {
+ continue;
+ }
+ const connectedTo = this.graph
+ .neighbors(id)
+ .map((id: string) => Buffer.from(id, "hex"));
+ state[id] = { connectedTo };
+ }
+
+ return state;
+ }
+
+ // Calculate who's online and emit an event
+ private _recalculate() {
+ const online = new Set();
+ const offline = new Set();
+
+ for (const id of this.graph.nodesIter()) {
+ const item = this.graph.node.get(id);
+ if (item?.type !== EntityType.PUBKEY) {
+ continue;
+ }
+
+ if (
+ hasPath(this.graph, {
+ source: this._maybeHexify(this.id),
+ target: id,
+ })
+ ) {
+ online.add(id);
+ } else {
+ offline.add(id);
+ }
+ }
+
+ for (const id of offline) {
+ this.graph.removeNode(id);
+ }
+
+ this._online = online;
+
+ this.emit("online", online);
+ }
+
+ private _maybeHexify(data: Buffer | string): string {
+ if (b4a.isBuffer(data)) {
+ return data.toString("hex");
+ }
+
+ return data;
+ }
+
+ private _pruneItems(item?: Buffer) {
+ let items: string[] | Buffer[];
+ if (!item) {
+ items = this.graph.nodesIter();
+ } else {
+ items = [item];
+ }
+
+ (async () => {
+ for (const id of items) {
+ const item = this.graph.node.get(id);
+ if (item?.type !== EntityType.ITEM) {
+ continue;
+ }
+
+ (async () => {
+ try {
+ if (0 === this.graph.neighbors(id)?.length) {
+ this.graph.removeNode(id);
+ }
+ } catch {}
+ })();
+ }
+ })();
}
}
diff --git a/src/messages.ts b/src/messages.ts
index cbc8085..cc1cd5a 100644
--- a/src/messages.ts
+++ b/src/messages.ts
@@ -1,4 +1,4 @@
-// @generated by protobuf-ts 2.8.1
+// @generated by protobuf-ts 2.8.2
// @generated from protobuf file "messages.proto" (syntax proto2)
// tslint:disable
import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
@@ -34,11 +34,7 @@ export interface Message {
*/
signature?: Uint8Array;
/**
- * @generated from protobuf field: optional int64 timestamp = 5;
- */
- timestamp?: bigint;
- /**
- * @generated from protobuf field: optional bytes id = 6;
+ * @generated from protobuf field: optional bytes id = 5;
*/
id?: Uint8Array; // For connected and disconnected events
}
@@ -50,18 +46,6 @@ export interface State {
* @generated from protobuf field: repeated bytes connectedTo = 1;
*/
connectedTo: Uint8Array[];
- /**
- * @generated from protobuf field: optional bytes data = 2;
- */
- data?: Uint8Array;
- /**
- * @generated from protobuf field: optional bytes signature = 3;
- */
- signature?: Uint8Array;
- /**
- * @generated from protobuf field: optional int64 timestamp = 4;
- */
- timestamp?: bigint;
}
/**
* @generated from protobuf enum Type
@@ -88,9 +72,13 @@ export enum Type {
*/
DISCONNECTED = 4,
/**
- * @generated from protobuf enum value: STATE = 5;
+ * @generated from protobuf enum value: ADD_ITEM = 5;
*/
- STATE = 5
+ ADD_ITEM = 5,
+ /**
+ * @generated from protobuf enum value: REMOVE_ITEM = 6;
+ */
+ REMOVE_ITEM = 6
}
// @generated message type with reflection information, may provide speed optimized methods
class Message$Type extends MessageType {
@@ -100,8 +88,7 @@ class Message$Type extends MessageType {
{ no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => State } },
{ no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
{ no: 4, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
- { no: 5, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ },
- { no: 6, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
+ { no: 5, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
]);
}
create(value?: PartialMessage): Message {
@@ -128,10 +115,7 @@ class Message$Type extends MessageType {
case /* optional bytes signature */ 4:
message.signature = reader.bytes();
break;
- case /* optional int64 timestamp */ 5:
- message.timestamp = reader.int64().toBigInt();
- break;
- case /* optional bytes id */ 6:
+ case /* optional bytes id */ 5:
message.id = reader.bytes();
break;
default:
@@ -178,12 +162,9 @@ class Message$Type extends MessageType {
/* optional bytes signature = 4; */
if (message.signature !== undefined)
writer.tag(4, WireType.LengthDelimited).bytes(message.signature);
- /* optional int64 timestamp = 5; */
- if (message.timestamp !== undefined)
- writer.tag(5, WireType.Varint).int64(message.timestamp);
- /* optional bytes id = 6; */
+ /* optional bytes id = 5; */
if (message.id !== undefined)
- writer.tag(6, WireType.LengthDelimited).bytes(message.id);
+ writer.tag(5, WireType.LengthDelimited).bytes(message.id);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
@@ -198,10 +179,7 @@ export const Message = new Message$Type();
class State$Type extends MessageType {
constructor() {
super("State", [
- { no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ },
- { no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
- { no: 3, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
- { no: 4, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ }
+ { no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ }
]);
}
create(value?: PartialMessage): State {
@@ -219,15 +197,6 @@ class State$Type extends MessageType {
case /* repeated bytes connectedTo */ 1:
message.connectedTo.push(reader.bytes());
break;
- case /* optional bytes data */ 2:
- message.data = reader.bytes();
- break;
- case /* optional bytes signature */ 3:
- message.signature = reader.bytes();
- break;
- case /* optional int64 timestamp */ 4:
- message.timestamp = reader.int64().toBigInt();
- break;
default:
let u = options.readUnknownField;
if (u === "throw")
@@ -243,15 +212,6 @@ class State$Type extends MessageType {
/* repeated bytes connectedTo = 1; */
for (let i = 0; i < message.connectedTo.length; i++)
writer.tag(1, WireType.LengthDelimited).bytes(message.connectedTo[i]);
- /* optional bytes data = 2; */
- if (message.data !== undefined)
- writer.tag(2, WireType.LengthDelimited).bytes(message.data);
- /* optional bytes signature = 3; */
- if (message.signature !== undefined)
- writer.tag(3, WireType.LengthDelimited).bytes(message.signature);
- /* optional int64 timestamp = 4; */
- if (message.timestamp !== undefined)
- writer.tag(4, WireType.Varint).int64(message.timestamp);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
diff --git a/test.js b/test.js
index 5368400..82ffede 100644
--- a/test.js
+++ b/test.js
@@ -3,11 +3,14 @@ const test = require("tape");
const Hyperswarm = require("hyperswarm");
const sodium = require("sodium-universal");
const b4a = require("b4a");
-const { default: DHTData } = require("./");
+const { default: DHTCache } = require("./");
const crypto = require("crypto");
const topicName = crypto.randomBytes(10);
const topic = b4a.allocUnsafe(32);
-sodium.crypto_generichash(topic, b4a.from(topicName));
+const item1 = b4a.allocUnsafe(32);
+const item2 = b4a.allocUnsafe(32);
+sodium.crypto_generichash(item1, b4a.from("item1"));
+sodium.crypto_generichash(item2, b4a.from("item2"));
test("Basic presence test / data propagation", (t) => {
t.plan(6);
@@ -19,11 +22,11 @@ test("Basic presence test / data propagation", (t) => {
const peer1 = peers.shift();
const peer2 = peers.shift();
- const p1 = new DHTData(peer1);
- const p2 = new DHTData(peer2);
+ const p1 = new DHTCache(peer1);
+ const p2 = new DHTCache(peer2);
- p1.data = { message: "Hello" };
- p2.data = { message: "World!" };
+ p1.addItem(item1.toString("hex"));
+ p2.addItem(item2.toString("hex"));
t.ok(p1.id, "Generated id 1");
t.ok(p2.id, "Generated id 2");
@@ -36,9 +39,9 @@ test("Basic presence test / data propagation", (t) => {
let hasFinished = false;
function handleOnline(list) {
- if (list.length === 2) {
- const peerData1 = p1.getPeerData(p2.id);
- const peerData2 = p2.getPeerData(p1.id);
+ if (list.size === 2) {
+ const peerData1 = [...p1.cache];
+ const peerData2 = [...p2.cache];
const hasP1 = peerData1 && Object.keys(peerData1).length;
const hasP2 = peerData2 && Object.keys(peerData2).length;
if (!hasP1 || !hasP2) {
@@ -51,8 +54,8 @@ test("Basic presence test / data propagation", (t) => {
setTimeout(() => {
t.pass("Seeing everyone online");
- t.deepEqual(peerData1, p2.data, "Got peer data from peer 2");
- t.deepEqual(peerData2, p1.data, "Got peer data from peer 2");
+ t.equals(true, p2.all_cache.includes(peerData1[0]));
+ t.equals(true, p1.all_cache.includes(peerData2[0]));
hasFinished = true;
peer2._allConnections.get(peer1.keyPair.publicKey).end();