367 lines
8.8 KiB
TypeScript
367 lines
8.8 KiB
TypeScript
import EventEmitter from "events";
|
|
// @ts-ignore
|
|
import { DiGraph, hasPath } from "jsnetworkx";
|
|
// @ts-ignore
|
|
import orderedJSON from "ordered-json";
|
|
// @ts-ignore
|
|
import crypto from "hypercore-crypto";
|
|
import b4a from "b4a";
|
|
import { Message, State, Type } from "./messages.js";
|
|
// @ts-ignore
|
|
import sodium from "sodium-universal";
|
|
import debug from "debug";
|
|
|
|
type Bootstrap = {
|
|
[key: string]: State;
|
|
};
|
|
|
|
export default class DHTDataBase extends EventEmitter {
|
|
protected swarm: any;
|
|
private id: Buffer;
|
|
private bootstrapped: boolean;
|
|
private graph: any;
|
|
private connectedTo: Set<any>;
|
|
|
|
constructor(id: Buffer, { swarm }: { swarm?: any } = {}) {
|
|
super();
|
|
if (!id) throw new TypeError("Must provide id for self");
|
|
|
|
this.id = id;
|
|
this.bootstrapped = false;
|
|
this.graph = new DiGraph();
|
|
this.connectedTo = new Set();
|
|
this._data = {};
|
|
this._online = [this._maybeHexify(this.id)];
|
|
this.swarm = swarm;
|
|
}
|
|
|
|
private _data: {};
|
|
|
|
get data(): {} {
|
|
return { ...this._data };
|
|
}
|
|
|
|
set data(value: {}) {
|
|
this._data = value;
|
|
|
|
const timestamp = BigInt(Date.now());
|
|
|
|
const rawData = orderedJSON.stringify(value);
|
|
|
|
const signature = crypto.sign(
|
|
b4a.from(`${timestamp}${rawData}`),
|
|
this.swarm.keyPair.secretKey
|
|
);
|
|
|
|
this._setPeer(this.id, value, timestamp, signature);
|
|
this._broadcastData();
|
|
}
|
|
|
|
private _online: string[];
|
|
|
|
get online(): string[] {
|
|
return this._online;
|
|
}
|
|
|
|
broadcast(data: any, ttl?: number) {
|
|
throw new TypeError("Broadcast has not been implemented");
|
|
}
|
|
|
|
getPeerRaw(id: Buffer | string) {
|
|
return this.graph.node.get(this._maybeHexify(id));
|
|
}
|
|
|
|
getPeerField(id: Buffer | string, field: string) {
|
|
return this.getPeerRaw(id)?.[field];
|
|
}
|
|
|
|
getPeerData(id: Buffer | string) {
|
|
return this.getPeerField(id, "data");
|
|
}
|
|
|
|
getPeerTimestamp(id: Buffer | string) {
|
|
return this.getPeerField(id, "timestamp");
|
|
}
|
|
|
|
getPeerSignature(id: Buffer | string) {
|
|
return this.getPeerField(id, "signature");
|
|
}
|
|
|
|
protected onAddPeer(id: Buffer) {
|
|
const stringId = id.toString("hex");
|
|
if (this.connectedTo.has(stringId)) {
|
|
return;
|
|
} // Already know we're connected here
|
|
|
|
this.connectedTo.add(stringId);
|
|
this._addPeerConnection(this.id, id);
|
|
this.emit("peer-add", id);
|
|
|
|
this._recalculate();
|
|
|
|
this.broadcast(
|
|
Message.toBinary(
|
|
Message.create({
|
|
type: Type.CONNECTED,
|
|
id,
|
|
})
|
|
)
|
|
);
|
|
this._broadcastData();
|
|
|
|
if (this.bootstrapped) {
|
|
return;
|
|
}
|
|
|
|
// If this is the first person we've met, get their graph
|
|
this.broadcast(
|
|
Message.toBinary(
|
|
Message.create({
|
|
type: Type.BOOTSTRAP_REQUEST,
|
|
})
|
|
),
|
|
0
|
|
);
|
|
}
|
|
|
|
protected onRemovePeer(id: Buffer) {
|
|
this.connectedTo.delete(id.toString("hex"));
|
|
this._removePeerConnection(this.id, id);
|
|
this.emit("peer-remove");
|
|
|
|
this._recalculate();
|
|
|
|
this.broadcast(
|
|
Message.toBinary(
|
|
Message.create({
|
|
type: Type.DISCONNECTED,
|
|
id,
|
|
})
|
|
)
|
|
);
|
|
}
|
|
|
|
protected onGetBroadcast(message: Buffer, id: Buffer) {
|
|
let decoded;
|
|
try {
|
|
decoded = Message.fromBinary(message);
|
|
} catch {
|
|
return;
|
|
}
|
|
|
|
const { type } = decoded;
|
|
if (!type) {
|
|
throw new Error("Missing Type In Message");
|
|
}
|
|
if (type === Type.STATE) {
|
|
const { data: rawData, timestamp, signature } = decoded;
|
|
|
|
if (
|
|
signature &&
|
|
crypto.verify(b4a.from(`${timestamp}${rawData}`), signature, id)
|
|
) {
|
|
if ((timestamp || 0) <= this.getPeerTimestamp(id)) {
|
|
debug(`Received old data for peer ${id}`);
|
|
return;
|
|
}
|
|
|
|
const data = rawData ? orderedJSON.parse(rawData) : null;
|
|
this._setPeer(id, data, timestamp, signature);
|
|
this.emit("peer-data", data, id);
|
|
this._recalculate();
|
|
return;
|
|
}
|
|
|
|
debug(`Invalid signature received for peer ${id}`);
|
|
} else if (type === Type.CONNECTED) {
|
|
const { id: toId } = decoded;
|
|
this._addPeerConnection(id, Buffer.from(toId as Uint8Array));
|
|
this.emit("peer-add-seen", id, toId);
|
|
this._recalculate();
|
|
} else if (type === Type.DISCONNECTED) {
|
|
const { id: toId } = decoded;
|
|
this._removePeerConnection(id, Buffer.from(toId as Uint8Array));
|
|
this.emit("peer-remove-seen", id, toId);
|
|
this._recalculate();
|
|
} else if (type === Type.BOOTSTRAP_REQUEST) {
|
|
const bootstrap = this._getBootstrapInfo();
|
|
this.broadcast(
|
|
Message.toBinary(
|
|
Message.create({
|
|
type: Type.BOOTSTRAP_RESPONSE,
|
|
bootstrap,
|
|
})
|
|
),
|
|
0
|
|
);
|
|
} else if (type === Type.BOOTSTRAP_RESPONSE) {
|
|
const { bootstrap } = decoded;
|
|
this._bootstrapFrom(bootstrap);
|
|
}
|
|
}
|
|
|
|
private _broadcastData() {
|
|
const rawData = this._data;
|
|
if (!Object.keys(rawData).length) {
|
|
return;
|
|
}
|
|
const data = orderedJSON.stringify(rawData);
|
|
const { timestamp, signature } = this.getPeerRaw(this.id);
|
|
this.broadcast(
|
|
Message.toBinary(
|
|
Message.create({
|
|
type: Type.STATE,
|
|
data: b4a.from(data),
|
|
signature,
|
|
timestamp,
|
|
})
|
|
)
|
|
);
|
|
}
|
|
|
|
private _hasSeenPeer(id: Buffer | string) {
|
|
return this.graph.hasNode(this._maybeHexify(id));
|
|
}
|
|
|
|
private _setPeer(
|
|
id: Buffer | string,
|
|
data: any,
|
|
timestamp?: BigInt,
|
|
signature?: Uint8Array
|
|
) {
|
|
this.graph.addNode(this._maybeHexify(id), {
|
|
timestamp,
|
|
signature,
|
|
data,
|
|
});
|
|
}
|
|
|
|
private _ensurePeer(id: Buffer | string) {
|
|
id = this._maybeHexify(id);
|
|
if (!this._hasSeenPeer(id)) {
|
|
this._setPeer(id, {});
|
|
}
|
|
}
|
|
|
|
private _addPeerConnection(
|
|
origin: Buffer | string,
|
|
destination: Buffer | string
|
|
) {
|
|
this._ensurePeer(origin);
|
|
this._ensurePeer(destination);
|
|
this.graph.addEdge(
|
|
this._maybeHexify(origin),
|
|
this._maybeHexify(destination)
|
|
);
|
|
}
|
|
|
|
private _removePeerConnection(origin: Buffer, destination: Buffer) {
|
|
try {
|
|
this._ensurePeer(origin);
|
|
this._ensurePeer(destination);
|
|
this.graph.removeEdge(
|
|
origin.toString("hex"),
|
|
destination.toString("hex")
|
|
);
|
|
} catch (e: any) {
|
|
if (e.name !== "JSNetworkXError") throw e;
|
|
}
|
|
}
|
|
|
|
private _bootstrapFrom(bootstrap: Bootstrap) {
|
|
if (this.bootstrapped) {
|
|
return;
|
|
}
|
|
|
|
for (const id in bootstrap) {
|
|
const { data, connectedTo, signature, timestamp } = bootstrap[id];
|
|
if (id === this.id.toString("hex")) {
|
|
continue;
|
|
}
|
|
|
|
if (
|
|
signature &&
|
|
crypto.verify(
|
|
b4a.from(`${timestamp}${data}`),
|
|
signature,
|
|
b4a.from(id, "hex")
|
|
)
|
|
) {
|
|
const parsedData = data ? orderedJSON.parse(data) : null;
|
|
let peerData = parsedData || {};
|
|
|
|
// If we're already tracking them
|
|
if (this._hasSeenPeer(id)) {
|
|
// Ensure we don't have old data
|
|
if ((timestamp || 0) > this.getPeerTimestamp(id)) {
|
|
// See what data we already have for them
|
|
// Add their existing data to what we got from the bootstrap
|
|
const existingPeerData = this.getPeerData(id);
|
|
peerData = { ...existingPeerData, ...peerData };
|
|
this._setPeer(id, peerData, timestamp, signature);
|
|
} else {
|
|
debug(`Received old data for peer ${id}`);
|
|
}
|
|
}
|
|
} else {
|
|
debug(`Invalid signature received for peer ${id}`);
|
|
}
|
|
for (const connection of connectedTo) {
|
|
this._addPeerConnection(id, Buffer.from(connection));
|
|
}
|
|
}
|
|
|
|
this.emit("bootstrapped");
|
|
|
|
this._recalculate();
|
|
}
|
|
|
|
private _getBootstrapInfo() {
|
|
const state: Bootstrap = {};
|
|
for (const [id, rawData] of this.graph.nodes(true)) {
|
|
const connectedTo = this.graph
|
|
.neighbors(id)
|
|
.map((id: string) => Buffer.from(id, "hex"));
|
|
|
|
const data = rawData ? orderedJSON.stringify(rawData?.data) : null;
|
|
const { timestamp = undefined, signature = undefined } = rawData;
|
|
state[id] = { data: b4a.from(data), connectedTo, timestamp, signature };
|
|
}
|
|
|
|
return state;
|
|
}
|
|
|
|
// Calculate who's online and emit an event
|
|
private _recalculate() {
|
|
const online = this.graph.nodes().filter((id: string) => {
|
|
return hasPath(this.graph, {
|
|
source: this._maybeHexify(this.id),
|
|
target: id,
|
|
});
|
|
});
|
|
|
|
const offline = this.graph.nodes().filter((id: string) => {
|
|
return !hasPath(this.graph, {
|
|
source: this._maybeHexify(this.id),
|
|
target: id,
|
|
});
|
|
});
|
|
|
|
for (const id of offline) {
|
|
this.graph.removeNode(id);
|
|
}
|
|
|
|
this._online = online;
|
|
|
|
this.emit("online", online);
|
|
}
|
|
|
|
private _maybeHexify(data: Buffer | string): string {
|
|
if (b4a.isBuffer(data)) {
|
|
return data.toString("hex");
|
|
}
|
|
|
|
return data;
|
|
}
|
|
}
|