dht-data/src/dhtOnlineBase.ts

293 lines
7.0 KiB
TypeScript

import EventEmitter from "events";
// @ts-ignore
import { DiGraph, hasPath } from "jsnetworkx";
import codecs from "codecs";
import { Message, State, Type } from "./messages.js";
const DEFAULT_ENCODING = "json";
type Bootstrap = {
[key: string]: State;
};
export default class DHTOnlineBase extends EventEmitter {
private id: Buffer;
private bootstrapped: boolean;
private graph: any;
private connectedTo: Set<any>;
private data: {};
private encoding: codecs.Codec<any>;
constructor(id: Buffer, { encoding = DEFAULT_ENCODING } = {}) {
super();
if (!id) throw new TypeError("Must provide id for self");
this.id = id;
this.bootstrapped = false;
this.graph = new DiGraph();
this.connectedTo = new Set();
this.data = {};
this.encoding = codecs(encoding || DEFAULT_ENCODING);
this._online = [this._maybeHexify(this.id)];
}
private _online: string[];
get online(): string[] {
return this._online;
}
broadcast(data: any, ttl?: number) {
throw new TypeError("Broadcast has not been implemented");
}
getPeerData(id: Buffer | string) {
return this.graph.node.get(this._maybeHexify(id));
}
protected setData(data: any) {
this.data = data;
this._setPeer(this.id, data);
this._broadcastData();
}
private _broadcastData() {
const rawData = this.data;
if (!Object.keys(rawData).length) {
return;
}
const data = this.encoding.encode(rawData);
this.broadcast(
Message.toBinary(
Message.create({
type: Type.STATE,
data,
})
)
);
}
protected onAddPeer(id: Buffer) {
const stringId = id.toString("hex");
if (this.connectedTo.has(stringId)) {
return;
} // Already know we're connected here
this.connectedTo.add(stringId);
this._addPeerConnection(this.id, id);
this.emit("peer-add", id);
this._recalculate();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.CONNECTED,
id,
})
)
);
this._broadcastData();
if (this.bootstrapped) {
return;
}
// If this is the first person we've met, get their graph
this.broadcast(
Message.toBinary(
Message.create({
type: Type.BOOTSTRAP_REQUEST,
})
),
0
);
}
protected onRemovePeer(id: Buffer) {
this.connectedTo.delete(id.toString("hex"));
this._removePeerConnection(this.id, id);
this.emit("peer-remove");
this._recalculate();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.DISCONNECTED,
id,
})
)
);
}
protected onGetBroadcast(message: Buffer, id: Buffer) {
let decoded;
try {
decoded = Message.fromBinary(message);
} catch {
return;
}
const { type } = decoded;
if (!type) {
throw new Error("Missing Type In Message");
}
if (type === Type.STATE) {
const { data: rawData } = decoded;
const data = this.encoding.decode(rawData);
this._setPeer(id, data);
this.emit("peer-data", data, id);
this._recalculate();
} else if (type === Type.CONNECTED) {
const { id: toId } = decoded;
this._addPeerConnection(id, Buffer.from(toId as Uint8Array));
this.emit("peer-add-seen", id, toId);
this._recalculate();
} else if (type === Type.DISCONNECTED) {
const { id: toId } = decoded;
this._removePeerConnection(id, Buffer.from(toId as Uint8Array));
this.emit("peer-remove-seen", id, toId);
this._recalculate();
} else if (type === Type.BOOTSTRAP_REQUEST) {
const bootstrap = this._getBootstrapInfo();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.BOOTSTRAP_RESPONSE,
bootstrap,
})
),
0
);
} else if (type === Type.BOOTSTRAP_RESPONSE) {
const { bootstrap } = decoded;
this._bootstrapFrom(bootstrap);
}
}
private _hasSeenPeer(id: Buffer | string) {
return this.graph.hasNode(this._maybeHexify(id));
}
private _setPeer(id: Buffer | string, data: any) {
this.graph.addNode(this._maybeHexify(id), data);
}
private _removePeer(id: Buffer | string) {
this.graph.removeNode(this._maybeHexify(id));
}
private _ensurePeer(id: Buffer | string) {
id = this._maybeHexify(id);
if (!this._hasSeenPeer(id)) {
this._setPeer(id, {});
}
}
private _addPeerConnection(
origin: Buffer | string,
destination: Buffer | string
) {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.addEdge(
this._maybeHexify(origin),
this._maybeHexify(destination)
);
}
private _removePeerConnection(origin: Buffer, destination: Buffer) {
try {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.removeEdge(
origin.toString("hex"),
destination.toString("hex")
);
} catch (e: any) {
if (e.name !== "JSNetworkXError") throw e;
}
}
private _bootstrapFrom(bootstrap: Bootstrap) {
if (this.bootstrapped) {
return;
}
for (const id in bootstrap) {
const { data, connectedTo } = bootstrap[id];
const parsedData = data ? this.encoding.decode(data) : null;
let peerData = parsedData || {};
if (id === this.id.toString("hex")) continue;
// If we're already tracking them
if (this._hasSeenPeer(id)) {
// See what data we already have for them
// Add their existing data to what we got from the bootstrap
const existingPeerData = this.getPeerData(id);
peerData = { ...existingPeerData, ...peerData };
}
this._setPeer(id, peerData);
for (const connection of connectedTo) {
this._addPeerConnection(id, Buffer.from(connection));
}
}
this.emit("bootstrapped");
this._recalculate();
}
private _getPeerConnectedTo(id: string) {
return this.graph.successors(id);
}
private _getBootstrapInfo() {
const state: Bootstrap = {};
for (const [id, rawData] of this.graph.nodes(true)) {
const connectedTo = this.graph
.neighbors(id)
.map((id: string) => Buffer.from(id, "hex"));
const data = rawData ? this.encoding.encode(rawData) : null;
state[id] = { data, connectedTo };
}
return state;
}
// Calculate who's online and emit an event
private _recalculate() {
const online = this.graph.nodes().filter((id: string) => {
return hasPath(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
const offline = this.graph.nodes().filter((id: string) => {
return !hasPath(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
for (const id of offline) {
this.graph.removeNode(id);
}
this._online = online;
this.emit("online", online);
}
private _maybeHexify(data: Buffer | string): string {
if (Buffer.isBuffer(data)) {
return data.toString("hex");
}
return data;
}
}