dht-data/src/DHTDataBase.ts

351 lines
8.4 KiB
TypeScript

import EventEmitter from "events";
// @ts-ignore
import { DiGraph, hasPath } from "jsnetworkx";
// @ts-ignore
import orderedJSON from "ordered-json";
// @ts-ignore
import crypto from "hypercore-crypto";
import b4a from "b4a";
import { Message, State, Type } from "./messages.js";
// @ts-ignore
import sodium from "sodium-universal";
import debug from "debug";
type Bootstrap = {
[key: string]: State;
};
export default class DHTDataBase extends EventEmitter {
protected swarm: any;
private id: Buffer;
private bootstrapped: boolean;
private graph: any;
private connectedTo: Set<any>;
constructor(id: Buffer, { swarm }: { swarm?: any } = {}) {
super();
if (!id) throw new TypeError("Must provide id for self");
this.id = id;
this.bootstrapped = false;
this.graph = new DiGraph();
this.connectedTo = new Set();
this._data = {};
this._online = [this._maybeHexify(this.id)];
this.swarm = swarm;
}
private _data: {};
get data(): {} {
return { ...this._data };
}
set data(value: {}) {
this._data = value;
const timestamp = BigInt(Date.now());
const rawData = orderedJSON.stringify(value);
const signature = crypto.sign(
b4a.from(`${timestamp}${rawData}`),
this.swarm.keyPair.secretKey
);
this._setPeer(this.id, value, timestamp, signature);
this._broadcastData();
}
private _online: string[];
get online(): string[] {
return this._online;
}
broadcast(data: any, ttl?: number) {
throw new TypeError("Broadcast has not been implemented");
}
getPeerRaw(id: Buffer | string) {
return this.graph.node.get(this._maybeHexify(id));
}
getPeerField(id: Buffer | string, field: string) {
return this.getPeerRaw(id)?.[field];
}
getPeerData(id: Buffer | string) {
return this.getPeerField(id, "data");
}
getPeerTimestamp(id: Buffer | string) {
return this.getPeerField(id, "timestamp");
}
getPeerSignature(id: Buffer | string) {
return this.getPeerField(id, "signature");
}
protected onAddPeer(id: Buffer) {
const stringId = id.toString("hex");
if (this.connectedTo.has(stringId)) {
return;
} // Already know we're connected here
this.connectedTo.add(stringId);
this._addPeerConnection(this.id, id);
this.emit("peer-add", id);
this._recalculate();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.CONNECTED,
id,
})
)
);
this._broadcastData();
if (this.bootstrapped) {
return;
}
// If this is the first person we've met, get their graph
this.broadcast(
Message.toBinary(
Message.create({
type: Type.BOOTSTRAP_REQUEST,
})
),
0
);
}
protected onRemovePeer(id: Buffer) {
this.connectedTo.delete(id.toString("hex"));
this._removePeerConnection(this.id, id);
this.emit("peer-remove");
this._recalculate();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.DISCONNECTED,
id,
})
)
);
}
protected onGetBroadcast(message: Buffer, id: Buffer) {
let decoded;
try {
decoded = Message.fromBinary(message);
} catch {
return;
}
const { type } = decoded;
if (!type) {
throw new Error("Missing Type In Message");
}
if (type === Type.STATE) {
const { data: rawData, timestamp, signature } = decoded;
if (
signature &&
crypto.verify(b4a.from(`${timestamp}${rawData}`), signature, id)
) {
const data = rawData ? orderedJSON.parse(rawData) : null;
this._setPeer(id, data, timestamp, signature);
this.emit("peer-data", data, id);
this._recalculate();
return;
}
debug(`Invalid signature received for peer ${id}`);
} else if (type === Type.CONNECTED) {
const { id: toId } = decoded;
this._addPeerConnection(id, Buffer.from(toId as Uint8Array));
this.emit("peer-add-seen", id, toId);
this._recalculate();
} else if (type === Type.DISCONNECTED) {
const { id: toId } = decoded;
this._removePeerConnection(id, Buffer.from(toId as Uint8Array));
this.emit("peer-remove-seen", id, toId);
this._recalculate();
} else if (type === Type.BOOTSTRAP_REQUEST) {
const bootstrap = this._getBootstrapInfo();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.BOOTSTRAP_RESPONSE,
bootstrap,
})
),
0
);
} else if (type === Type.BOOTSTRAP_RESPONSE) {
const { bootstrap } = decoded;
this._bootstrapFrom(bootstrap);
}
}
private _broadcastData() {
const rawData = this._data;
if (!Object.keys(rawData).length) {
return;
}
const data = orderedJSON.stringify(rawData);
const { timestamp, signature } = this.getPeerRaw(this.id);
this.broadcast(
Message.toBinary(
Message.create({
type: Type.STATE,
data: b4a.from(data),
signature,
timestamp,
})
)
);
}
private _hasSeenPeer(id: Buffer | string) {
return this.graph.hasNode(this._maybeHexify(id));
}
private _setPeer(
id: Buffer | string,
data: any,
timestamp?: BigInt,
signature?: Uint8Array
) {
this.graph.addNode(this._maybeHexify(id), {
timestamp,
signature,
data,
});
}
private _ensurePeer(id: Buffer | string) {
id = this._maybeHexify(id);
if (!this._hasSeenPeer(id)) {
this._setPeer(id, {});
}
}
private _addPeerConnection(
origin: Buffer | string,
destination: Buffer | string
) {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.addEdge(
this._maybeHexify(origin),
this._maybeHexify(destination)
);
}
private _removePeerConnection(origin: Buffer, destination: Buffer) {
try {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.removeEdge(
origin.toString("hex"),
destination.toString("hex")
);
} catch (e: any) {
if (e.name !== "JSNetworkXError") throw e;
}
}
private _bootstrapFrom(bootstrap: Bootstrap) {
if (this.bootstrapped) {
return;
}
for (const id in bootstrap) {
const { data, connectedTo, signature, timestamp } = bootstrap[id];
if (id === this.id.toString("hex")) {
continue;
}
if (
signature &&
crypto.verify(b4a.from(`${timestamp}${data}`), signature, b4a.from(id))
) {
const parsedData = data ? orderedJSON.parse(data) : null;
let peerData = parsedData || {};
// If we're already tracking them
if (this._hasSeenPeer(id)) {
// See what data we already have for them
// Add their existing data to what we got from the bootstrap
const existingPeerData = this.getPeerData(id);
peerData = { ...existingPeerData, ...peerData };
this._setPeer(id, peerData, timestamp, signature);
}
} else {
debug(`Invalid signature received for peer ${id}`);
}
for (const connection of connectedTo) {
this._addPeerConnection(id, Buffer.from(connection));
}
}
this.emit("bootstrapped");
this._recalculate();
}
private _getBootstrapInfo() {
const state: Bootstrap = {};
for (const [id, rawData] of this.graph.nodes(true)) {
const connectedTo = this.graph
.neighbors(id)
.map((id: string) => Buffer.from(id, "hex"));
const data = rawData ? orderedJSON.stringify(rawData?.data) : null;
const { timestamp = undefined, signature = undefined } = rawData;
state[id] = { data: b4a.from(data), connectedTo, timestamp, signature };
}
return state;
}
// Calculate who's online and emit an event
private _recalculate() {
const online = this.graph.nodes().filter((id: string) => {
return hasPath(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
const offline = this.graph.nodes().filter((id: string) => {
return !hasPath(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
for (const id of offline) {
this.graph.removeNode(id);
}
this._online = online;
this.emit("online", online);
}
private _maybeHexify(data: Buffer | string): string {
if (b4a.isBuffer(data)) {
return data.toString("hex");
}
return data;
}
}