*Refactor to use message signing and track with timestamps

This commit is contained in:
Derrick Hammer 2022-11-19 19:44:55 -05:00
parent 579bc2bf5d
commit 3bad936cd4
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
5 changed files with 156 additions and 51 deletions

View File

@ -1,21 +1,25 @@
syntax = "proto2"; syntax = "proto2";
enum Type { enum Type {
BOOTSTRAP_REQUEST = 1; BOOTSTRAP_REQUEST = 1;
BOOTSTRAP_RESPONSE = 2; BOOTSTRAP_RESPONSE = 2;
CONNECTED = 3; CONNECTED = 3;
DISCONNECTED = 4; DISCONNECTED = 4;
STATE = 5; STATE = 5;
} }
message Message { message Message {
required Type type = 1; required Type type = 1;
map<string, State> bootstrap = 2; // For bootstrap events map<string, State> bootstrap = 2; // For bootstrap events
optional bytes data = 3; // For state event optional bytes data = 3; // For state event
optional bytes id = 4; // For connected and disconnected events optional bytes signature = 4;
optional int64 timestamp = 5;
optional bytes id = 6; // For connected and disconnected events
} }
message State { message State {
repeated bytes connectedTo = 1; repeated bytes connectedTo = 1;
optional bytes data = 2; optional bytes data = 2;
optional bytes signature = 3;
optional int64 timestamp = 4;
} }

View File

@ -6,11 +6,12 @@
"dependencies": { "dependencies": {
"@lumeweb/dht-flood": "https://git.lumeweb.com/LumeWeb/dht-flood.git", "@lumeweb/dht-flood": "https://git.lumeweb.com/LumeWeb/dht-flood.git",
"@protobuf-ts/plugin": "^2.8.1", "@protobuf-ts/plugin": "^2.8.1",
"@types/codecs": "^2.2.3", "b4a": "^1.6.1",
"codecs": "^3.0.0",
"compact-encoding": "^2.11.0", "compact-encoding": "^2.11.0",
"hypercore-crypto": "^3.3.0",
"jsnetworkx": "^0.3.4", "jsnetworkx": "^0.3.4",
"lru": "^3.1.0", "lru": "^3.1.0",
"ordered-json": "^0.1.1",
"protocol-buffers-encodings": "^1.2.0", "protocol-buffers-encodings": "^1.2.0",
"protomux-rpc": "^1.3.0" "protomux-rpc": "^1.3.0"
}, },

View File

@ -1,8 +1,15 @@
import EventEmitter from "events"; import EventEmitter from "events";
// @ts-ignore // @ts-ignore
import { DiGraph, hasPath } from "jsnetworkx"; import { DiGraph, hasPath } from "jsnetworkx";
import codecs from "codecs"; // @ts-ignore
import orderedJSON from "ordered-json";
// @ts-ignore
import crypto from "hypercore-crypto";
import b4a from "b4a";
import { Message, State, Type } from "./messages.js"; import { Message, State, Type } from "./messages.js";
// @ts-ignore
import sodium from "sodium-universal";
import debug from "debug";
const DEFAULT_ENCODING = "json"; const DEFAULT_ENCODING = "json";
@ -15,9 +22,9 @@ export default class DHTOnlineBase extends EventEmitter {
private bootstrapped: boolean; private bootstrapped: boolean;
private graph: any; private graph: any;
private connectedTo: Set<any>; private connectedTo: Set<any>;
private encoding: codecs.Codec<any>; protected swarm: any;
constructor(id: Buffer, { encoding = DEFAULT_ENCODING } = {}) { constructor(id: Buffer, { swarm }: { swarm?: any } = {}) {
super(); super();
if (!id) throw new TypeError("Must provide id for self"); if (!id) throw new TypeError("Must provide id for self");
@ -26,8 +33,8 @@ export default class DHTOnlineBase extends EventEmitter {
this.graph = new DiGraph(); this.graph = new DiGraph();
this.connectedTo = new Set(); this.connectedTo = new Set();
this._data = {}; this._data = {};
this.encoding = codecs(encoding || DEFAULT_ENCODING);
this._online = [this._maybeHexify(this.id)]; this._online = [this._maybeHexify(this.id)];
this.swarm = swarm;
} }
private _data: {}; private _data: {};
@ -39,8 +46,16 @@ export default class DHTOnlineBase extends EventEmitter {
set data(value: {}) { set data(value: {}) {
this._data = value; this._data = value;
this._setPeer(this.id, value); const timestamp = BigInt(Date.now());
const rawData = orderedJSON.stringify(value);
const signature = crypto.sign(
b4a.from(`${timestamp}${rawData}`),
this.swarm.keyPair.secretKey
);
this._setPeer(this.id, value, timestamp, signature);
this._broadcastData(); this._broadcastData();
} }
@ -55,7 +70,13 @@ export default class DHTOnlineBase extends EventEmitter {
} }
getPeerData(id: Buffer | string) { getPeerData(id: Buffer | string) {
return this.graph.node.get(this._maybeHexify(id)); let data = this.graph.node.get(this._maybeHexify(id));
if (!data) {
return false;
}
return data?.data;
} }
protected onAddPeer(id: Buffer) { protected onAddPeer(id: Buffer) {
@ -125,11 +146,19 @@ export default class DHTOnlineBase extends EventEmitter {
throw new Error("Missing Type In Message"); throw new Error("Missing Type In Message");
} }
if (type === Type.STATE) { if (type === Type.STATE) {
const { data: rawData } = decoded; const { data: rawData, timestamp, signature } = decoded;
const data = this.encoding.decode(rawData);
this._setPeer(id, data); if (
this.emit("peer-data", data, id); signature &&
this._recalculate(); crypto.verify(b4a.from(`${timestamp}${rawData}`), signature, id)
) {
const data = rawData ? orderedJSON.parse(rawData) : null;
this._setPeer(id, data, timestamp, signature);
this.emit("peer-data", data, id);
this._recalculate();
return;
}
debug(`Invalid signature received for peer ${id}`);
} else if (type === Type.CONNECTED) { } else if (type === Type.CONNECTED) {
const { id: toId } = decoded; const { id: toId } = decoded;
this._addPeerConnection(id, Buffer.from(toId as Uint8Array)); this._addPeerConnection(id, Buffer.from(toId as Uint8Array));
@ -162,12 +191,17 @@ export default class DHTOnlineBase extends EventEmitter {
if (!Object.keys(rawData).length) { if (!Object.keys(rawData).length) {
return; return;
} }
const data = this.encoding.encode(rawData); const data = orderedJSON.stringify(rawData);
const { timestamp, signature } = this.graph.node.get(
this._maybeHexify(this.id)
);
this.broadcast( this.broadcast(
Message.toBinary( Message.toBinary(
Message.create({ Message.create({
type: Type.STATE, type: Type.STATE,
data, data: b4a.from(data),
signature,
timestamp,
}) })
) )
); );
@ -177,9 +211,19 @@ export default class DHTOnlineBase extends EventEmitter {
return this.graph.hasNode(this._maybeHexify(id)); return this.graph.hasNode(this._maybeHexify(id));
} }
private _setPeer(id: Buffer | string, data: any) { private _setPeer(
this.graph.addNode(this._maybeHexify(id), data); id: Buffer | string,
data: any,
timestamp?: BigInt,
signature?: Uint8Array
) {
this.graph.addNode(this._maybeHexify(id), {
timestamp,
signature,
data,
});
} }
private _ensurePeer(id: Buffer | string) { private _ensurePeer(id: Buffer | string) {
id = this._maybeHexify(id); id = this._maybeHexify(id);
if (!this._hasSeenPeer(id)) { if (!this._hasSeenPeer(id)) {
@ -218,18 +262,29 @@ export default class DHTOnlineBase extends EventEmitter {
} }
for (const id in bootstrap) { for (const id in bootstrap) {
const { data, connectedTo } = bootstrap[id]; const { data, connectedTo, signature, timestamp } = bootstrap[id];
const parsedData = data ? this.encoding.decode(data) : null; if (id === this.id.toString("hex")) {
let peerData = parsedData || {}; continue;
if (id === this.id.toString("hex")) continue; }
// If we're already tracking them
if (this._hasSeenPeer(id)) { if (
// See what data we already have for them signature &&
// Add their existing data to what we got from the bootstrap crypto.verify(b4a.from(`${timestamp}${data}`), signature, b4a.from(id))
const existingPeerData = this.getPeerData(id); ) {
peerData = { ...existingPeerData, ...peerData }; const parsedData = data ? orderedJSON.parse(data) : null;
let peerData = parsedData || {};
// If we're already tracking them
if (this._hasSeenPeer(id)) {
// See what data we already have for them
// Add their existing data to what we got from the bootstrap
const existingPeerData = this.getPeerData(id);
peerData = { ...existingPeerData, ...peerData };
this._setPeer(id, peerData, timestamp, signature);
}
} else {
debug(`Invalid signature received for peer ${id}`);
} }
this._setPeer(id, peerData);
for (const connection of connectedTo) { for (const connection of connectedTo) {
this._addPeerConnection(id, Buffer.from(connection)); this._addPeerConnection(id, Buffer.from(connection));
} }
@ -239,14 +294,17 @@ export default class DHTOnlineBase extends EventEmitter {
this._recalculate(); this._recalculate();
} }
private _getBootstrapInfo() { private _getBootstrapInfo() {
const state: Bootstrap = {}; const state: Bootstrap = {};
for (const [id, rawData] of this.graph.nodes(true)) { for (const [id, rawData] of this.graph.nodes(true)) {
const connectedTo = this.graph const connectedTo = this.graph
.neighbors(id) .neighbors(id)
.map((id: string) => Buffer.from(id, "hex")); .map((id: string) => Buffer.from(id, "hex"));
const data = rawData ? this.encoding.encode(rawData) : null;
state[id] = { data, connectedTo }; const data = rawData ? orderedJSON.stringify(rawData?.data) : null;
const { timestamp = undefined, signature = undefined } = rawData;
state[id] = { data: b4a.from(data), connectedTo, timestamp, signature };
} }
return state; return state;
@ -278,7 +336,7 @@ export default class DHTOnlineBase extends EventEmitter {
} }
private _maybeHexify(data: Buffer | string): string { private _maybeHexify(data: Buffer | string): string {
if (Buffer.isBuffer(data)) { if (b4a.isBuffer(data)) {
return data.toString("hex"); return data.toString("hex");
} }

View File

@ -5,7 +5,6 @@ const DISCONNECT_SMOOTH = 500;
export default class DHTOnline extends DHTOnlineBase { export default class DHTOnline extends DHTOnlineBase {
private flood: DHTFlood; private flood: DHTFlood;
private swarm: any;
constructor( constructor(
swarm: any, swarm: any,
@ -15,9 +14,8 @@ export default class DHTOnline extends DHTOnlineBase {
...opts ...opts
}: { id?: Buffer; data?: {}; [key: string]: any } = {} }: { id?: Buffer; data?: {}; [key: string]: any } = {}
) { ) {
super(id, opts as any); super(id, { swarm, ...(opts as any) });
this.flood = new DHTFlood({ id, swarm, ...opts }); this.flood = new DHTFlood({ id, swarm, ...opts });
this.swarm = swarm;
this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer)); this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer));
this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer)); this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer));

View File

@ -30,7 +30,15 @@ export interface Message {
*/ */
data?: Uint8Array; // For state event data?: Uint8Array; // For state event
/** /**
* @generated from protobuf field: optional bytes id = 4; * @generated from protobuf field: optional bytes signature = 4;
*/
signature?: Uint8Array;
/**
* @generated from protobuf field: optional int64 timestamp = 5;
*/
timestamp?: bigint;
/**
* @generated from protobuf field: optional bytes id = 6;
*/ */
id?: Uint8Array; // For connected and disconnected events id?: Uint8Array; // For connected and disconnected events
} }
@ -46,6 +54,14 @@ export interface State {
* @generated from protobuf field: optional bytes data = 2; * @generated from protobuf field: optional bytes data = 2;
*/ */
data?: Uint8Array; data?: Uint8Array;
/**
* @generated from protobuf field: optional bytes signature = 3;
*/
signature?: Uint8Array;
/**
* @generated from protobuf field: optional int64 timestamp = 4;
*/
timestamp?: bigint;
} }
/** /**
* @generated from protobuf enum Type * @generated from protobuf enum Type
@ -83,7 +99,9 @@ class Message$Type extends MessageType<Message> {
{ no: 1, name: "type", kind: "enum", T: () => ["Type", Type] }, { no: 1, name: "type", kind: "enum", T: () => ["Type", Type] },
{ no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => State } }, { no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => State } },
{ no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }, { no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
{ no: 4, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ } { no: 4, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
{ no: 5, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ },
{ no: 6, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
]); ]);
} }
create(value?: PartialMessage<Message>): Message { create(value?: PartialMessage<Message>): Message {
@ -107,7 +125,13 @@ class Message$Type extends MessageType<Message> {
case /* optional bytes data */ 3: case /* optional bytes data */ 3:
message.data = reader.bytes(); message.data = reader.bytes();
break; break;
case /* optional bytes id */ 4: case /* optional bytes signature */ 4:
message.signature = reader.bytes();
break;
case /* optional int64 timestamp */ 5:
message.timestamp = reader.int64().toBigInt();
break;
case /* optional bytes id */ 6:
message.id = reader.bytes(); message.id = reader.bytes();
break; break;
default: default:
@ -151,9 +175,15 @@ class Message$Type extends MessageType<Message> {
/* optional bytes data = 3; */ /* optional bytes data = 3; */
if (message.data !== undefined) if (message.data !== undefined)
writer.tag(3, WireType.LengthDelimited).bytes(message.data); writer.tag(3, WireType.LengthDelimited).bytes(message.data);
/* optional bytes id = 4; */ /* optional bytes signature = 4; */
if (message.signature !== undefined)
writer.tag(4, WireType.LengthDelimited).bytes(message.signature);
/* optional int64 timestamp = 5; */
if (message.timestamp !== undefined)
writer.tag(5, WireType.Varint).int64(message.timestamp);
/* optional bytes id = 6; */
if (message.id !== undefined) if (message.id !== undefined)
writer.tag(4, WireType.LengthDelimited).bytes(message.id); writer.tag(6, WireType.LengthDelimited).bytes(message.id);
let u = options.writeUnknownFields; let u = options.writeUnknownFields;
if (u !== false) if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
@ -169,7 +199,9 @@ class State$Type extends MessageType<State> {
constructor() { constructor() {
super("State", [ super("State", [
{ no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ }, { no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ },
{ no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ } { no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
{ no: 3, name: "signature", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
{ no: 4, name: "timestamp", kind: "scalar", opt: true, T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ }
]); ]);
} }
create(value?: PartialMessage<State>): State { create(value?: PartialMessage<State>): State {
@ -190,6 +222,12 @@ class State$Type extends MessageType<State> {
case /* optional bytes data */ 2: case /* optional bytes data */ 2:
message.data = reader.bytes(); message.data = reader.bytes();
break; break;
case /* optional bytes signature */ 3:
message.signature = reader.bytes();
break;
case /* optional int64 timestamp */ 4:
message.timestamp = reader.int64().toBigInt();
break;
default: default:
let u = options.readUnknownField; let u = options.readUnknownField;
if (u === "throw") if (u === "throw")
@ -208,6 +246,12 @@ class State$Type extends MessageType<State> {
/* optional bytes data = 2; */ /* optional bytes data = 2; */
if (message.data !== undefined) if (message.data !== undefined)
writer.tag(2, WireType.LengthDelimited).bytes(message.data); writer.tag(2, WireType.LengthDelimited).bytes(message.data);
/* optional bytes signature = 3; */
if (message.signature !== undefined)
writer.tag(3, WireType.LengthDelimited).bytes(message.signature);
/* optional int64 timestamp = 4; */
if (message.timestamp !== undefined)
writer.tag(4, WireType.Varint).int64(message.timestamp);
let u = options.writeUnknownFields; let u = options.writeUnknownFields;
if (u !== false) if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);