628 lines
16 KiB
TypeScript
628 lines
16 KiB
TypeScript
import EventEmitter from "events";
|
|
// @ts-ignore
|
|
import { DiGraph, hasPath } from "jsnetworkx";
|
|
// @ts-ignore
|
|
import orderedJSON from "ordered-json";
|
|
// @ts-ignore
|
|
import crypto from "hypercore-crypto";
|
|
import b4a from "b4a";
|
|
import { Message, State, Type } from "./messages.js";
|
|
// @ts-ignore
|
|
import sodium from "sodium-universal";
|
|
import type { PartialMessage } from "@protobuf-ts/runtime";
|
|
import DHTFlood from "@lumeweb/dht-flood";
|
|
import type { Logger } from "pino";
|
|
|
|
type Bootstrap = {
|
|
[key: string]: State;
|
|
};
|
|
|
|
const EntityType = {
|
|
PUBKEY: Symbol.for("PUBKEY"),
|
|
ITEM: Symbol.for("ITEM"),
|
|
};
|
|
|
|
const DISCONNECT_SMOOTH = 500;
|
|
|
|
export default class DHTCache extends EventEmitter {
|
|
protected swarm: any;
|
|
private id: Buffer;
|
|
private bootstrapped: boolean;
|
|
private graph: any;
|
|
private connectedTo: Set<any>;
|
|
private heartBeatInterval: number;
|
|
|
|
protected flood: DHTFlood;
|
|
private log: Logger;
|
|
|
|
constructor(
|
|
swarm: any,
|
|
{
|
|
id = swarm.keyPair.publicKey,
|
|
logger,
|
|
heartBeatInterval = 60,
|
|
...opts
|
|
}: { id?: Buffer; logger?: Logger; [key: string]: any } = {}
|
|
) {
|
|
super();
|
|
if (!id) {
|
|
throw new TypeError("Must provide id for self");
|
|
}
|
|
if (!logger) {
|
|
throw new TypeError("Must logger for self");
|
|
}
|
|
|
|
this.id = b4a.from(id) as Buffer;
|
|
this.bootstrapped = false;
|
|
this.graph = new DiGraph();
|
|
this.connectedTo = new Set();
|
|
this.heartBeatInterval = heartBeatInterval;
|
|
this._cache = new Set();
|
|
this._online = new Set([this._maybeHexify(this.id)]);
|
|
this.swarm = swarm;
|
|
this.flood = new DHTFlood({ id, swarm, ...opts });
|
|
this.log = logger;
|
|
|
|
this.flood.on("peer-open", (peer) => this.addPeerHandler(peer));
|
|
this.flood.on("peer-remove", (peer) => this.removePeerHandler(peer));
|
|
this.flood.on("message", (message, id) => this.onGetBroadcast(message, id));
|
|
|
|
this.swarm.on("connection", this._hello.bind(this));
|
|
|
|
[...this.swarm.peers.values()]
|
|
.map((item) => {
|
|
remotePublicKey: item.publicKey;
|
|
})
|
|
.filter((item: any) => !!item)
|
|
.forEach((item) => {
|
|
this.addPeerHandler(item);
|
|
});
|
|
|
|
this._ensurePeer(this.id);
|
|
setInterval(
|
|
() => this._heartbeatCheck(),
|
|
(this.heartBeatInterval / 4) * 1000
|
|
);
|
|
setInterval(() => this._emitHeartbeat(), this.heartBeatInterval * 1000);
|
|
}
|
|
|
|
private _cache: Set<string>;
|
|
|
|
public get cache(): string[] {
|
|
return [...this._cache].sort();
|
|
}
|
|
|
|
public get allCache(): string[] {
|
|
const items = [];
|
|
for (const id of this.graph.nodesIter()) {
|
|
const item = this.graph.node.get(id);
|
|
if (item?.type !== EntityType.ITEM) {
|
|
continue;
|
|
}
|
|
items.push(id);
|
|
}
|
|
return items.sort();
|
|
}
|
|
|
|
public peerHasItem(peer: string | Buffer, item: string | Buffer): boolean {
|
|
peer = this._maybeHexify(peer);
|
|
item = this._maybeHexify(item);
|
|
|
|
return this.graph.hasSuccessor(peer, item);
|
|
}
|
|
|
|
public addItem(item: string | Buffer) {
|
|
item = this._maybeHexify(item);
|
|
this._cache.add(item);
|
|
|
|
this._ensureItem(item);
|
|
this._addEntityConnection(this.id, item);
|
|
|
|
const broadcast = () => {
|
|
this._broadcastMessage(this._createAddItemRequest(item));
|
|
};
|
|
|
|
if (!this.bootstrapped) {
|
|
this.once("bootstrapped", broadcast);
|
|
} else {
|
|
broadcast();
|
|
}
|
|
}
|
|
|
|
private _compileMessage(message: PartialMessage<Message>): Uint8Array {
|
|
return Message.toBinary(Message.create(message));
|
|
}
|
|
|
|
private _broadcastMessage(message: PartialMessage<Message>, ttl?: number) {
|
|
this.broadcast(this._compileMessage(message), ttl);
|
|
}
|
|
|
|
public removeItem(item: string | Buffer): boolean {
|
|
item = this._maybeHexify(item);
|
|
|
|
if (!this._cache.has(item)) {
|
|
return false;
|
|
}
|
|
this._cache.delete(item);
|
|
|
|
this._removeEntity(item);
|
|
|
|
const broadcast = () => {
|
|
this._broadcastMessage({
|
|
type: Type.REMOVE_ITEM,
|
|
data: b4a.from(item as string, "hex"),
|
|
signature: this._signItem(item),
|
|
});
|
|
};
|
|
|
|
if (!this.bootstrapped) {
|
|
this.once("bootstrapped", broadcast);
|
|
} else {
|
|
broadcast();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
private _online: Set<string>;
|
|
|
|
public get online(): Set<string> {
|
|
return this._online;
|
|
}
|
|
|
|
public broadcast(message: any, ttl?: number) {
|
|
this.flood.broadcast(message, ttl);
|
|
}
|
|
|
|
public send(peer: any, message: any) {
|
|
this.flood.send(peer, message, 0);
|
|
}
|
|
|
|
protected addPeerHandler(peer: any) {
|
|
const id = peer.remotePublicKey;
|
|
const stringId = id.toString("hex");
|
|
if (!this.connectedTo.has(stringId)) {
|
|
this.connectedTo.add(stringId);
|
|
}
|
|
|
|
if (!this._hasSeenEntity(id)) {
|
|
this._ensurePeer(id);
|
|
this._addEntityConnection(this.id, id);
|
|
this.emit("peer-add", id);
|
|
|
|
this._recalculate();
|
|
|
|
this._broadcastMessage({
|
|
type: Type.CONNECTED,
|
|
id,
|
|
});
|
|
|
|
this.send(peer, this._compileMessage({ type: Type.BOOTSTRAP_REQUEST }));
|
|
|
|
this._emitHeartbeat(peer);
|
|
|
|
this.log.debug(`Relay peer connected: ${stringId}`);
|
|
}
|
|
|
|
if (this.bootstrapped) {
|
|
this._sendItemsToPeer(peer);
|
|
return;
|
|
}
|
|
|
|
this.log.debug(`Broadcasting bootstrap request`);
|
|
|
|
// If this is the first person we've met, get their graph
|
|
this._broadcastMessage({ type: Type.BOOTSTRAP_REQUEST }, 0);
|
|
}
|
|
|
|
removePeerHandler(peer: any) {
|
|
const id = peer.remotePublicKey;
|
|
// Wait for a bit and check if we're still disconnected before removing the peer
|
|
setTimeout(() => {
|
|
if (this.swarm._allConnections.has(id)) {
|
|
return;
|
|
}
|
|
this.onRemovePeer(peer);
|
|
}, DISCONNECT_SMOOTH);
|
|
|
|
this.log.debug(`Relay peer might have disconnected: ${id.toString("hex")}`);
|
|
}
|
|
|
|
protected onRemovePeer(peer: any) {
|
|
const id = peer.remotePublicKey;
|
|
|
|
this.connectedTo.delete(id.toString("hex"));
|
|
|
|
this._removeEntityConnection(this.id, id);
|
|
this.emit("peer-remove");
|
|
|
|
this._recalculate();
|
|
|
|
this._broadcastMessage(
|
|
{
|
|
type: Type.DISCONNECTED,
|
|
id,
|
|
},
|
|
0
|
|
);
|
|
|
|
this.log.debug(`Relay peer confirmed disconnected: ${id.toString("hex")}`);
|
|
}
|
|
|
|
protected onGetBroadcast(message: Buffer, id: Buffer) {
|
|
let decoded;
|
|
try {
|
|
decoded = Message.fromBinary(message);
|
|
} catch {
|
|
return;
|
|
}
|
|
|
|
const { type } = decoded;
|
|
if (!type) {
|
|
throw new Error("Missing Type In Message");
|
|
}
|
|
|
|
if ([Type.ADD_ITEM, Type.REMOVE_ITEM].includes(type)) {
|
|
const { data: rawData, signature } = decoded;
|
|
const bufData = b4a.from(rawData as Uint8Array) as Buffer;
|
|
if (signature && crypto.verify(bufData, signature, id)) {
|
|
if (Type.ADD_ITEM === type) {
|
|
this._ensureItem(bufData);
|
|
this._addEntityConnection(id, bufData);
|
|
this.emit("item-added", id, bufData);
|
|
this.log.debug(
|
|
`New item added: ${bufData.toString("hex")} from ${id.toString(
|
|
"hex"
|
|
)}`
|
|
);
|
|
}
|
|
|
|
if (Type.REMOVE_ITEM === type) {
|
|
this.removeItem(bufData);
|
|
this._pruneItems();
|
|
this.emit("item-removed", id, bufData);
|
|
this.log.debug(
|
|
`Item removed: ${bufData.toString("hex")} from ${id.toString(
|
|
"hex"
|
|
)}`
|
|
);
|
|
}
|
|
}
|
|
} else if (type === Type.CONNECTED) {
|
|
const { id: toId } = decoded;
|
|
let bufId = b4a.from(toId as Uint8Array) as Buffer;
|
|
|
|
this._ensurePeer(id);
|
|
this._ensurePeer(bufId);
|
|
this._addEntityConnection(id, bufId);
|
|
this.emit("peer-add-seen", id, bufId);
|
|
this._recalculate();
|
|
this.log.debug(`Network peer connected: ${bufId.toString("hex")}`);
|
|
} else if (type === Type.DISCONNECTED) {
|
|
const { id: toId } = decoded;
|
|
let bufId = b4a.from(toId as Uint8Array) as Buffer;
|
|
|
|
this._removeEntityConnection(id, b4a.from(toId as Uint8Array) as Buffer);
|
|
this.emit("peer-remove-seen", id, toId);
|
|
this._recalculate();
|
|
this._pruneItems();
|
|
|
|
this.log.debug(`Network peer disconnected: ${bufId.toString("hex")}`);
|
|
} else if (type === Type.BOOTSTRAP_REQUEST) {
|
|
const bootstrap = this._getBootstrapInfo();
|
|
this.broadcast(
|
|
Message.toBinary(
|
|
Message.create({
|
|
type: Type.BOOTSTRAP_RESPONSE,
|
|
bootstrap,
|
|
})
|
|
),
|
|
0
|
|
);
|
|
this.log.debug(`Bootstrap request received`);
|
|
} else if (type === Type.BOOTSTRAP_RESPONSE) {
|
|
const { bootstrap } = decoded;
|
|
this._bootstrapFrom(bootstrap);
|
|
|
|
if (this.swarm._allConnections.has(id)) {
|
|
this._sendItemsToPeer(this.swarm._allConnections.get(id));
|
|
}
|
|
|
|
this.log.debug(`Bootstrap response received`);
|
|
} else if (type === Type.HEARTBEAT) {
|
|
const { id: toId, signature, data: bufData } = decoded;
|
|
|
|
let bufId = b4a.from(toId as Uint8Array) as Buffer;
|
|
|
|
if (!(signature && crypto.verify(bufData, signature, bufId))) {
|
|
return;
|
|
}
|
|
|
|
this.addPeerHandler({
|
|
remotePublicKey: bufId,
|
|
});
|
|
|
|
this._setEntity(bufId as Buffer, { heartbeat: Date.now() });
|
|
this._heartbeatCheck();
|
|
|
|
this.log.debug(`Heartbeat received from ${bufId.toString("hex")}`);
|
|
}
|
|
}
|
|
|
|
private _signItem(item: string | Buffer): Buffer {
|
|
item = this._maybeHexify(item);
|
|
|
|
return crypto.sign(b4a.from(item, "hex"), this.swarm.keyPair.secretKey);
|
|
}
|
|
|
|
private _setEntity(id: Buffer | string, data: any): void {
|
|
this.graph.addNode(this._maybeHexify(id), data);
|
|
}
|
|
|
|
private _ensureItem(id: Buffer | string) {
|
|
this._ensureEntity(id, { type: EntityType.ITEM });
|
|
}
|
|
|
|
private _ensurePeer(id: Buffer | string) {
|
|
this._ensureEntity(id, { type: EntityType.PUBKEY });
|
|
}
|
|
|
|
private _ensureEntity(id: Buffer | string, def = {}) {
|
|
id = this._maybeHexify(id);
|
|
if (!this._hasSeenEntity(id)) {
|
|
this._setEntity(id, def);
|
|
}
|
|
}
|
|
|
|
private _hasSeenEntity(id: Buffer | string): boolean {
|
|
return this.graph.hasNode(this._maybeHexify(id));
|
|
}
|
|
|
|
private _addEntityConnection(
|
|
origin: Buffer | string,
|
|
destination: Buffer | string
|
|
) {
|
|
this._ensureEntity(origin);
|
|
this._ensureEntity(destination);
|
|
this.graph.addEdge(
|
|
this._maybeHexify(origin),
|
|
this._maybeHexify(destination)
|
|
);
|
|
}
|
|
|
|
private _removeEntityConnection(origin: Buffer, destination: Buffer) {
|
|
try {
|
|
this._ensureEntity(origin);
|
|
this._ensureEntity(destination);
|
|
this.graph.removeEdge(
|
|
origin.toString("hex"),
|
|
destination.toString("hex")
|
|
);
|
|
} catch (e) {
|
|
if ((e as Error).name !== "JSNetworkXError") throw e;
|
|
}
|
|
}
|
|
|
|
private _removeEntity(id: Buffer | string) {
|
|
this.graph.removeNode(this._maybeHexify(id));
|
|
}
|
|
|
|
private _bootstrapFrom(bootstrap: Bootstrap) {
|
|
for (const id in bootstrap) {
|
|
const { connectedTo } = bootstrap[id];
|
|
if (id === this.id.toString("hex")) {
|
|
continue;
|
|
}
|
|
|
|
if (!this.connectedTo.has(id)) {
|
|
this.swarm.joinPeer(b4a.from(id, "hex"));
|
|
}
|
|
|
|
for (const connection of connectedTo) {
|
|
const peer = b4a.from(connection) as Buffer;
|
|
|
|
if (b4a.equals(peer, this.id)) {
|
|
continue;
|
|
}
|
|
|
|
this._ensurePeer(peer);
|
|
this._addEntityConnection(id, peer);
|
|
|
|
if (!this.connectedTo.has(peer.toString("hex"))) {
|
|
this.swarm.joinPeer(peer);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!this.bootstrapped) {
|
|
this.bootstrapped = true;
|
|
this.emit("bootstrapped");
|
|
}
|
|
|
|
this._recalculate();
|
|
}
|
|
|
|
private _getBootstrapInfo() {
|
|
const state: Bootstrap = {};
|
|
for (const id of this.graph.nodesIter()) {
|
|
const item = this.graph.node.get(id);
|
|
if (item?.type !== EntityType.PUBKEY) {
|
|
continue;
|
|
}
|
|
const connectedTo = this.graph
|
|
.neighbors(id)
|
|
.filter(
|
|
(item: any) => this.graph.node.get(item)?.type === EntityType.PUBKEY
|
|
)
|
|
.map((id: string) => b4a.from(id, "hex"));
|
|
state[id] = { connectedTo };
|
|
}
|
|
|
|
return state;
|
|
}
|
|
|
|
// Calculate who's online and emit an event
|
|
private _recalculate() {
|
|
const online = new Set<string>();
|
|
const offline = new Set<string>();
|
|
|
|
for (const id of this.graph.nodesIter()) {
|
|
const item = this.graph.node.get(id);
|
|
if (item?.type !== EntityType.PUBKEY) {
|
|
continue;
|
|
}
|
|
|
|
if (
|
|
hasPath(this.graph, {
|
|
source: this._maybeHexify(this.id),
|
|
target: id,
|
|
})
|
|
) {
|
|
online.add(id);
|
|
} else {
|
|
offline.add(id);
|
|
}
|
|
}
|
|
|
|
for (const id of offline) {
|
|
this.graph.removeNode(id);
|
|
}
|
|
|
|
for (const id of online) {
|
|
if (b4a.equals(b4a.from(id, "hex"), this.id)) {
|
|
continue;
|
|
}
|
|
|
|
const item = this.graph.node.get(id);
|
|
if (!item?.online) {
|
|
online.delete(id);
|
|
}
|
|
}
|
|
|
|
this._online = online;
|
|
|
|
this.emit("online", online);
|
|
|
|
this.log.debug(
|
|
`Online list updated: ${online.size - 1} network peers online, ${
|
|
offline.size
|
|
} network peers offline and removed from DAG`
|
|
);
|
|
}
|
|
|
|
private _maybeHexify(data: Buffer | string): string {
|
|
if (b4a.isBuffer(data)) {
|
|
return data.toString("hex");
|
|
}
|
|
|
|
return data;
|
|
}
|
|
|
|
private _pruneItems(item?: Buffer) {
|
|
let items: string[] | Buffer[];
|
|
if (!item) {
|
|
items = this.graph.nodesIter();
|
|
} else {
|
|
items = [item];
|
|
}
|
|
|
|
(async () => {
|
|
for (const id of items) {
|
|
const item = this.graph.node.get(id);
|
|
if (item?.type !== EntityType.ITEM) {
|
|
continue;
|
|
}
|
|
|
|
(async () => {
|
|
try {
|
|
if (0 === this.graph.neighbors(id)?.length) {
|
|
this.graph.removeNode(id);
|
|
}
|
|
} catch {}
|
|
})();
|
|
}
|
|
})();
|
|
}
|
|
|
|
private _heartbeatCheck() {
|
|
let changed = false;
|
|
|
|
for (const peer of this.connectedTo) {
|
|
const pubkey = b4a.from(peer, "hex");
|
|
const node = this.graph.node.get(peer);
|
|
const heartbeat = node?.heartbeat;
|
|
|
|
const conn = this.swarm._allConnections.get(pubkey);
|
|
const online =
|
|
conn &&
|
|
heartbeat > 0 &&
|
|
Date.now() - heartbeat <= this.heartBeatInterval * 1000;
|
|
|
|
if (node?.online !== online) {
|
|
changed = true;
|
|
}
|
|
|
|
this._setEntity(peer, { online });
|
|
}
|
|
|
|
if (changed) {
|
|
this._recalculate();
|
|
}
|
|
}
|
|
|
|
private _emitHeartbeat(peer?: any) {
|
|
let peers = [...this.connectedTo];
|
|
|
|
if (peer) {
|
|
// @ts-ignore
|
|
peers = [b4a.from(peer.remotePublicKey).toString("hex")];
|
|
}
|
|
|
|
for (const peer of peers) {
|
|
const pubkey = b4a.from(peer, "hex");
|
|
const conn = this.swarm._allConnections.get(pubkey);
|
|
if (!conn) {
|
|
continue;
|
|
}
|
|
const data = b4a.from(Uint8Array.from([1]));
|
|
|
|
this.send(
|
|
conn,
|
|
this._compileMessage({
|
|
type: Type.HEARTBEAT,
|
|
id: this.id,
|
|
data,
|
|
signature: crypto.sign(data, this.swarm.keyPair.secretKey),
|
|
})
|
|
);
|
|
}
|
|
}
|
|
|
|
private _hello(peer: any) {
|
|
this.send(peer, b4a.from("hello"));
|
|
this.swarm.leavePeer(peer.remotePublicKey);
|
|
}
|
|
|
|
private _sendItemsToPeer(peer: any) {
|
|
for (let item of this.cache) {
|
|
item = this._maybeHexify(item);
|
|
this.send(peer, this._createAddItemRequestMessage(item));
|
|
}
|
|
}
|
|
|
|
private _createAddItemRequestMessage(item: string | Buffer): Uint8Array {
|
|
item = this._maybeHexify(item);
|
|
return Message.toBinary(Message.create(this._createAddItemRequest(item)));
|
|
}
|
|
|
|
private _createAddItemRequest(
|
|
item: string | Buffer
|
|
): PartialMessage<Message> {
|
|
return {
|
|
type: Type.ADD_ITEM,
|
|
data: b4a.from(item as string, "hex"),
|
|
signature: this._signItem(item),
|
|
};
|
|
}
|
|
}
|