From 3bad936cd45f66b198bdff44b50fd606fcb6ec31 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 19 Nov 2022 19:44:55 -0500 Subject: [PATCH] *Refactor to use message signing and track with timestamps --- messages.proto | 26 ++++++---- package.json | 5 +- src/dhtOnlineBase.ts | 116 ++++++++++++++++++++++++++++++++----------- src/index.ts | 4 +- src/messages.ts | 56 ++++++++++++++++++--- 5 files changed, 156 insertions(+), 51 deletions(-) diff --git a/messages.proto b/messages.proto index 03da559..f3737a1 100644 --- a/messages.proto +++ b/messages.proto @@ -1,21 +1,25 @@ syntax = "proto2"; enum Type { - BOOTSTRAP_REQUEST = 1; - BOOTSTRAP_RESPONSE = 2; - CONNECTED = 3; - DISCONNECTED = 4; - STATE = 5; + BOOTSTRAP_REQUEST = 1; + BOOTSTRAP_RESPONSE = 2; + CONNECTED = 3; + DISCONNECTED = 4; + STATE = 5; } message Message { - required Type type = 1; - map bootstrap = 2; // For bootstrap events - optional bytes data = 3; // For state event - optional bytes id = 4; // For connected and disconnected events + required Type type = 1; + map bootstrap = 2; // For bootstrap events + optional bytes data = 3; // For state event + optional bytes signature = 4; + optional int64 timestamp = 5; + optional bytes id = 6; // For connected and disconnected events } message State { - repeated bytes connectedTo = 1; - optional bytes data = 2; + repeated bytes connectedTo = 1; + optional bytes data = 2; + optional bytes signature = 3; + optional int64 timestamp = 4; } diff --git a/package.json b/package.json index 33be596..d93d20e 100644 --- a/package.json +++ b/package.json @@ -6,11 +6,12 @@ "dependencies": { "@lumeweb/dht-flood": "https://git.lumeweb.com/LumeWeb/dht-flood.git", "@protobuf-ts/plugin": "^2.8.1", - "@types/codecs": "^2.2.3", - "codecs": "^3.0.0", + "b4a": "^1.6.1", "compact-encoding": "^2.11.0", + "hypercore-crypto": "^3.3.0", "jsnetworkx": "^0.3.4", "lru": "^3.1.0", + "ordered-json": "^0.1.1", "protocol-buffers-encodings": "^1.2.0", "protomux-rpc": "^1.3.0" }, diff --git a/src/dhtOnlineBase.ts b/src/dhtOnlineBase.ts index 1782521..fea4b32 100644 --- a/src/dhtOnlineBase.ts +++ b/src/dhtOnlineBase.ts @@ -1,8 +1,15 @@ import EventEmitter from "events"; // @ts-ignore import { DiGraph, hasPath } from "jsnetworkx"; -import codecs from "codecs"; +// @ts-ignore +import orderedJSON from "ordered-json"; +// @ts-ignore +import crypto from "hypercore-crypto"; +import b4a from "b4a"; import { Message, State, Type } from "./messages.js"; +// @ts-ignore +import sodium from "sodium-universal"; +import debug from "debug"; const DEFAULT_ENCODING = "json"; @@ -15,9 +22,9 @@ export default class DHTOnlineBase extends EventEmitter { private bootstrapped: boolean; private graph: any; private connectedTo: Set; - private encoding: codecs.Codec; + protected swarm: any; - constructor(id: Buffer, { encoding = DEFAULT_ENCODING } = {}) { + constructor(id: Buffer, { swarm }: { swarm?: any } = {}) { super(); if (!id) throw new TypeError("Must provide id for self"); @@ -26,8 +33,8 @@ export default class DHTOnlineBase extends EventEmitter { this.graph = new DiGraph(); this.connectedTo = new Set(); this._data = {}; - this.encoding = codecs(encoding || DEFAULT_ENCODING); this._online = [this._maybeHexify(this.id)]; + this.swarm = swarm; } private _data: {}; @@ -39,8 +46,16 @@ export default class DHTOnlineBase extends EventEmitter { set data(value: {}) { this._data = value; - this._setPeer(this.id, value); + const timestamp = BigInt(Date.now()); + const rawData = orderedJSON.stringify(value); + + const signature = crypto.sign( + b4a.from(`${timestamp}${rawData}`), + this.swarm.keyPair.secretKey + ); + + this._setPeer(this.id, value, timestamp, signature); this._broadcastData(); } @@ -55,7 +70,13 @@ export default class DHTOnlineBase extends EventEmitter { } getPeerData(id: Buffer | string) { - return this.graph.node.get(this._maybeHexify(id)); + let data = this.graph.node.get(this._maybeHexify(id)); + + if (!data) { + return false; + } + + return data?.data; } protected onAddPeer(id: Buffer) { @@ -125,11 +146,19 @@ export default class DHTOnlineBase extends EventEmitter { throw new Error("Missing Type In Message"); } if (type === Type.STATE) { - const { data: rawData } = decoded; - const data = this.encoding.decode(rawData); - this._setPeer(id, data); - this.emit("peer-data", data, id); - this._recalculate(); + const { data: rawData, timestamp, signature } = decoded; + + if ( + signature && + crypto.verify(b4a.from(`${timestamp}${rawData}`), signature, id) + ) { + const data = rawData ? orderedJSON.parse(rawData) : null; + this._setPeer(id, data, timestamp, signature); + this.emit("peer-data", data, id); + this._recalculate(); + return; + } + debug(`Invalid signature received for peer ${id}`); } else if (type === Type.CONNECTED) { const { id: toId } = decoded; this._addPeerConnection(id, Buffer.from(toId as Uint8Array)); @@ -162,12 +191,17 @@ export default class DHTOnlineBase extends EventEmitter { if (!Object.keys(rawData).length) { return; } - const data = this.encoding.encode(rawData); + const data = orderedJSON.stringify(rawData); + const { timestamp, signature } = this.graph.node.get( + this._maybeHexify(this.id) + ); this.broadcast( Message.toBinary( Message.create({ type: Type.STATE, - data, + data: b4a.from(data), + signature, + timestamp, }) ) ); @@ -177,9 +211,19 @@ export default class DHTOnlineBase extends EventEmitter { return this.graph.hasNode(this._maybeHexify(id)); } - private _setPeer(id: Buffer | string, data: any) { - this.graph.addNode(this._maybeHexify(id), data); + private _setPeer( + id: Buffer | string, + data: any, + timestamp?: BigInt, + signature?: Uint8Array + ) { + this.graph.addNode(this._maybeHexify(id), { + timestamp, + signature, + data, + }); } + private _ensurePeer(id: Buffer | string) { id = this._maybeHexify(id); if (!this._hasSeenPeer(id)) { @@ -218,18 +262,29 @@ export default class DHTOnlineBase extends EventEmitter { } for (const id in bootstrap) { - const { data, connectedTo } = bootstrap[id]; - const parsedData = data ? this.encoding.decode(data) : null; - let peerData = parsedData || {}; - if (id === this.id.toString("hex")) continue; - // If we're already tracking them - if (this._hasSeenPeer(id)) { - // See what data we already have for them - // Add their existing data to what we got from the bootstrap - const existingPeerData = this.getPeerData(id); - peerData = { ...existingPeerData, ...peerData }; + const { data, connectedTo, signature, timestamp } = bootstrap[id]; + if (id === this.id.toString("hex")) { + continue; + } + + if ( + signature && + crypto.verify(b4a.from(`${timestamp}${data}`), signature, b4a.from(id)) + ) { + const parsedData = data ? orderedJSON.parse(data) : null; + let peerData = parsedData || {}; + + // If we're already tracking them + if (this._hasSeenPeer(id)) { + // See what data we already have for them + // Add their existing data to what we got from the bootstrap + const existingPeerData = this.getPeerData(id); + peerData = { ...existingPeerData, ...peerData }; + this._setPeer(id, peerData, timestamp, signature); + } + } else { + debug(`Invalid signature received for peer ${id}`); } - this._setPeer(id, peerData); for (const connection of connectedTo) { this._addPeerConnection(id, Buffer.from(connection)); } @@ -239,14 +294,17 @@ export default class DHTOnlineBase extends EventEmitter { this._recalculate(); } + private _getBootstrapInfo() { const state: Bootstrap = {}; for (const [id, rawData] of this.graph.nodes(true)) { const connectedTo = this.graph .neighbors(id) .map((id: string) => Buffer.from(id, "hex")); - const data = rawData ? this.encoding.encode(rawData) : null; - state[id] = { data, connectedTo }; + + const data = rawData ? orderedJSON.stringify(rawData?.data) : null; + const { timestamp = undefined, signature = undefined } = rawData; + state[id] = { data: b4a.from(data), connectedTo, timestamp, signature }; } return state; @@ -278,7 +336,7 @@ export default class DHTOnlineBase extends EventEmitter { } private _maybeHexify(data: Buffer | string): string { - if (Buffer.isBuffer(data)) { + if (b4a.isBuffer(data)) { return data.toString("hex"); } diff --git a/src/index.ts b/src/index.ts index 1a387a8..c46c7a5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,7 +5,6 @@ const DISCONNECT_SMOOTH = 500; export default class DHTOnline extends DHTOnlineBase { private flood: DHTFlood; - private swarm: any; constructor( swarm: any, @@ -15,9 +14,8 @@ export default class DHTOnline extends DHTOnlineBase { ...opts }: { id?: Buffer; data?: {}; [key: string]: any } = {} ) { - super(id, opts as any); + super(id, { swarm, ...(opts as any) }); this.flood = new DHTFlood({ id, swarm, ...opts }); - this.swarm = swarm; this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer)); this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer)); diff --git a/src/messages.ts b/src/messages.ts index 82ee8dc..cbc8085 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -30,7 +30,15 @@ export interface Message { */ data?: Uint8Array; // For state event /** - * @generated from protobuf field: optional bytes id = 4; + * @generated from protobuf field: optional bytes signature = 4; + */ + signature?: Uint8Array; + /** + * @generated from protobuf field: optional int64 timestamp = 5; + */ + timestamp?: bigint; + /** + * @generated from protobuf field: optional bytes id = 6; */ id?: Uint8Array; // For connected and disconnected events } @@ -46,6 +54,14 @@ export interface State { * @generated from protobuf field: optional bytes data = 2; */ data?: Uint8Array; + /** + * @generated from protobuf field: optional bytes signature = 3; + */ + signature?: Uint8Array; + /** + * @generated from protobuf field: optional int64 timestamp = 4; + */ + timestamp?: bigint; } /** * @generated from protobuf enum Type @@ -83,7 +99,9 @@ class Message$Type extends MessageType { { no: 1, name: "type", kind: "enum", T: () => ["Type", Type] }, { no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => State } }, { no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, - { no: 4, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ } + { no: 4, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, + { no: 5, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ }, + { no: 6, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ } ]); } create(value?: PartialMessage): Message { @@ -107,7 +125,13 @@ class Message$Type extends MessageType { case /* optional bytes data */ 3: message.data = reader.bytes(); break; - case /* optional bytes id */ 4: + case /* optional bytes signature */ 4: + message.signature = reader.bytes(); + break; + case /* optional int64 timestamp */ 5: + message.timestamp = reader.int64().toBigInt(); + break; + case /* optional bytes id */ 6: message.id = reader.bytes(); break; default: @@ -151,9 +175,15 @@ class Message$Type extends MessageType { /* optional bytes data = 3; */ if (message.data !== undefined) writer.tag(3, WireType.LengthDelimited).bytes(message.data); - /* optional bytes id = 4; */ + /* optional bytes signature = 4; */ + if (message.signature !== undefined) + writer.tag(4, WireType.LengthDelimited).bytes(message.signature); + /* optional int64 timestamp = 5; */ + if (message.timestamp !== undefined) + writer.tag(5, WireType.Varint).int64(message.timestamp); + /* optional bytes id = 6; */ if (message.id !== undefined) - writer.tag(4, WireType.LengthDelimited).bytes(message.id); + writer.tag(6, WireType.LengthDelimited).bytes(message.id); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); @@ -169,7 +199,9 @@ class State$Type extends MessageType { constructor() { super("State", [ { no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ }, - { no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ } + { no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, + { no: 3, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, + { no: 4, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ } ]); } create(value?: PartialMessage): State { @@ -190,6 +222,12 @@ class State$Type extends MessageType { case /* optional bytes data */ 2: message.data = reader.bytes(); break; + case /* optional bytes signature */ 3: + message.signature = reader.bytes(); + break; + case /* optional int64 timestamp */ 4: + message.timestamp = reader.int64().toBigInt(); + break; default: let u = options.readUnknownField; if (u === "throw") @@ -208,6 +246,12 @@ class State$Type extends MessageType { /* optional bytes data = 2; */ if (message.data !== undefined) writer.tag(2, WireType.LengthDelimited).bytes(message.data); + /* optional bytes signature = 3; */ + if (message.signature !== undefined) + writer.tag(3, WireType.LengthDelimited).bytes(message.signature); + /* optional int64 timestamp = 4; */ + if (message.timestamp !== undefined) + writer.tag(4, WireType.Varint).int64(message.timestamp); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);