dht-cache/src/index.ts

509 lines
12 KiB
TypeScript

import EventEmitter from "events";
// @ts-ignore
import { DiGraph, hasPath } from "jsnetworkx";
// @ts-ignore
import orderedJSON from "ordered-json";
// @ts-ignore
import crypto from "hypercore-crypto";
import b4a from "b4a";
import { Message, State, Type } from "./messages.js";
// @ts-ignore
import sodium from "sodium-universal";
import type { PartialMessage } from "@protobuf-ts/runtime";
import DHTFlood from "@lumeweb/dht-flood";
type Bootstrap = {
[key: string]: State;
};
const EntityType = {
PUBKEY: Symbol.for("PUBKEY"),
ITEM: Symbol.for("ITEM"),
};
const DISCONNECT_SMOOTH = 500;
export default class DHTCache extends EventEmitter {
protected swarm: any;
private id: Buffer;
private bootstrapped: boolean;
private graph: any;
private connectedTo: Set<any>;
protected flood: DHTFlood;
constructor(
swarm: any,
{
id = swarm.keyPair.publicKey,
...opts
}: { id?: Buffer; [key: string]: any } = {}
) {
super();
if (!id) throw new TypeError("Must provide id for self");
this.id = b4a.from(id) as Buffer;
this.bootstrapped = false;
this.graph = new DiGraph();
this.connectedTo = new Set();
this._cache = new Set();
this._online = new Set([this._maybeHexify(this.id)]);
this.swarm = swarm;
this.flood = new DHTFlood({ id, swarm, ...opts });
this.flood.on("peer-open", (peer) => this.addPeerHandler(peer));
this.flood.on("peer-remove", (peer) => this.removePeerHandler(peer));
this.flood.on("message", (message, id) => this.onGetBroadcast(message, id));
this.swarm.on("connection", (peer: any) =>
this.send(peer, b4a.from("hello"))
);
[...this.swarm.peers.values()]
.map((item) => {
remotePublicKey: item.publicKey;
})
.filter((item: any) => !!item)
.forEach((item) => {
this.addPeerHandler(item);
});
this._ensurePeer(this.id);
setInterval(() => this._heartbeatCheck(), 5 * 1000);
setInterval(() => this._emitHeartbeat(), 60 * 1000);
}
private _cache: Set<string>;
public get cache(): string[] {
return [...this._cache].sort();
}
public get allCache(): string[] {
const items = [];
for (const id of this.graph.nodesIter()) {
const item = this.graph.node.get(id);
if (item?.type !== EntityType.ITEM) {
continue;
}
items.push(id);
}
return items.sort();
}
public peerHasItem(peer: string | Buffer, item: string | Buffer): boolean {
peer = this._maybeHexify(peer);
item = this._maybeHexify(item);
return this.graph.hasSuccessor(peer, item);
}
public addItem(item: string | Buffer) {
item = this._maybeHexify(item);
this._cache.add(item);
this._ensureItem(item);
this._addEntityConnection(this.id, item);
const broadcast = () => {
this._broadcastMessage({
type: Type.ADD_ITEM,
data: b4a.from(item as string, "hex"),
signature: this._signItem(item),
});
};
if (!this.bootstrapped) {
this.once("bootstrapped", broadcast);
} else {
broadcast();
}
}
private _compileMessage(message: PartialMessage<Message>): Uint8Array {
return Message.toBinary(Message.create(message));
}
private _broadcastMessage(message: PartialMessage<Message>, ttl?: number) {
this.broadcast(this._compileMessage(message), ttl);
}
public removeItem(item: string | Buffer): boolean {
item = this._maybeHexify(item);
if (!this._cache.has(item)) {
return false;
}
this._cache.delete(item);
this._removeEntity(item);
const broadcast = () => {
this._broadcastMessage({
type: Type.REMOVE_ITEM,
data: b4a.from(item as string, "hex"),
signature: this._signItem(item),
});
};
if (!this.bootstrapped) {
this.once("bootstrapped", broadcast);
} else {
broadcast();
}
return true;
}
private _online: Set<string>;
public get online(): Set<string> {
return this._online;
}
public broadcast(message: any, ttl?: number) {
this.flood.broadcast(message, ttl);
}
public send(peer: any, message: any) {
this.flood.send(peer, message, 0);
}
protected addPeerHandler(peer: any) {
const id = peer.remotePublicKey;
const stringId = id.toString("hex");
if (this.connectedTo.has(stringId)) {
return;
} // Already know we're connected here
this.connectedTo.add(stringId);
this._ensurePeer(id);
this._addEntityConnection(this.id, id);
this.emit("peer-add", id);
this._recalculate();
this._broadcastMessage({
type: Type.CONNECTED,
id,
});
this._emitHeartbeat(peer);
if (this.bootstrapped) {
return;
}
// If this is the first person we've met, get their graph
this._broadcastMessage({ type: Type.BOOTSTRAP_REQUEST }, 0);
}
removePeerHandler(peer: any) {
const id = peer.remotePublicKey;
// Wait for a bit and check if we're still disconnected before removing the peer
setTimeout(() => {
if (this.swarm._allConnections.has(id)) {
return;
}
this.onRemovePeer(peer);
}, DISCONNECT_SMOOTH);
}
protected onRemovePeer(peer: any) {
const id = peer.remotePublicKey;
this.connectedTo.delete(id.toString("hex"));
this._removeEntityConnection(this.id, id);
this.emit("peer-remove");
this._recalculate();
this._broadcastMessage(
{
type: Type.DISCONNECTED,
id,
},
0
);
}
protected onGetBroadcast(message: Buffer, id: Buffer) {
let decoded;
try {
decoded = Message.fromBinary(message);
} catch {
return;
}
const { type } = decoded;
if (!type) {
throw new Error("Missing Type In Message");
}
if ([Type.ADD_ITEM, Type.REMOVE_ITEM].includes(type)) {
const { data: rawData, signature } = decoded;
const bufData = b4a.from(rawData as Uint8Array) as Buffer;
if (signature && crypto.verify(bufData, signature, id)) {
if (Type.ADD_ITEM === type) {
this._ensureItem(bufData);
this._addEntityConnection(id, bufData);
this.emit("item-added", id, bufData);
}
if (Type.REMOVE_ITEM === type) {
this.removeItem(bufData);
this._pruneItems();
this.emit("item-removed", id, bufData);
}
}
} else if (type === Type.CONNECTED) {
const { id: toId } = decoded;
this._addEntityConnection(id, b4a.from(toId as Uint8Array) as Buffer);
this.emit("peer-add-seen", id, toId);
this._recalculate();
} else if (type === Type.DISCONNECTED) {
const { id: toId } = decoded;
this._removeEntityConnection(id, b4a.from(toId as Uint8Array) as Buffer);
this.emit("peer-remove-seen", id, toId);
this._recalculate();
this._pruneItems();
} else if (type === Type.BOOTSTRAP_REQUEST) {
const bootstrap = this._getBootstrapInfo();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.BOOTSTRAP_RESPONSE,
bootstrap,
})
),
0
);
} else if (type === Type.BOOTSTRAP_RESPONSE) {
const { bootstrap } = decoded;
this._bootstrapFrom(bootstrap);
} else if (type === Type.HEARTBEAT) {
let { id: toId, signature, data: bufData } = decoded;
toId = b4a.from(toId as Uint8Array);
if (signature && crypto.verify(bufData, signature, id)) {
this._addEntityConnection(id, toId as Buffer);
this._setEntity(toId as Buffer, { heartbeat: Date.now() });
}
}
}
private _signItem(item: string | Buffer): Buffer {
item = this._maybeHexify(item);
return crypto.sign(b4a.from(item, "hex"), this.swarm.keyPair.secretKey);
}
private _setEntity(id: Buffer | string, data: any): void {
this.graph.addNode(this._maybeHexify(id), data);
}
private _ensureItem(id: Buffer | string) {
this._ensureEntity(id, { type: EntityType.ITEM });
}
private _ensurePeer(id: Buffer | string) {
this._ensureEntity(id, { type: EntityType.PUBKEY });
}
private _ensureEntity(id: Buffer | string, def = {}) {
id = this._maybeHexify(id);
if (!this._hasSeenEntity(id)) {
this._setEntity(id, def);
}
}
private _hasSeenEntity(id: Buffer | string): boolean {
return this.graph.hasNode(this._maybeHexify(id));
}
private _addEntityConnection(
origin: Buffer | string,
destination: Buffer | string
) {
this._ensureEntity(origin);
this._ensureEntity(destination);
this.graph.addEdge(
this._maybeHexify(origin),
this._maybeHexify(destination)
);
}
private _removeEntityConnection(origin: Buffer, destination: Buffer) {
try {
this._ensureEntity(origin);
this._ensureEntity(destination);
this.graph.removeEdge(
origin.toString("hex"),
destination.toString("hex")
);
} catch (e: any) {
if (e.name !== "JSNetworkXError") throw e;
}
}
private _removeEntity(id: Buffer | string) {
this.graph.removeNode(this._maybeHexify(id));
}
private _bootstrapFrom(bootstrap: Bootstrap) {
if (this.bootstrapped) {
return;
}
for (const id in bootstrap) {
const { connectedTo } = bootstrap[id];
if (id === this.id.toString("hex")) {
continue;
}
for (const connection of connectedTo) {
const peer = b4a.from(connection) as Buffer;
this._ensurePeer(peer);
this._addEntityConnection(id, peer);
}
}
this.bootstrapped = true;
this.emit("bootstrapped");
this._recalculate();
}
private _getBootstrapInfo() {
const state: Bootstrap = {};
for (const id of this.graph.nodesIter()) {
const item = this.graph.node.get(id);
if (item?.type !== EntityType.PUBKEY) {
continue;
}
const connectedTo = this.graph
.neighbors(id)
.filter(
(item: any) => this.graph.node.get(item)?.type === EntityType.PUBKEY
)
.map((id: string) => b4a.from(id, "hex"));
state[id] = { connectedTo };
}
return state;
}
// Calculate who's online and emit an event
private _recalculate() {
const online = new Set<string>();
const offline = new Set<string>();
for (const id of this.graph.nodesIter()) {
const item = this.graph.node.get(id);
if (item?.type !== EntityType.PUBKEY) {
continue;
}
if (
hasPath(this.graph, {
source: this._maybeHexify(this.id),
target: id,
})
) {
online.add(id);
} else {
offline.add(id);
}
}
for (const id of offline) {
this.graph.removeNode(id);
}
this._online = online;
this.emit("online", online);
}
private _maybeHexify(data: Buffer | string): string {
if (b4a.isBuffer(data)) {
return data.toString("hex");
}
return data;
}
private _pruneItems(item?: Buffer) {
let items: string[] | Buffer[];
if (!item) {
items = this.graph.nodesIter();
} else {
items = [item];
}
(async () => {
for (const id of items) {
const item = this.graph.node.get(id);
if (item?.type !== EntityType.ITEM) {
continue;
}
(async () => {
try {
if (0 === this.graph.neighbors(id)?.length) {
this.graph.removeNode(id);
}
} catch {}
})();
}
})();
}
private _heartbeatCheck() {
for (const peer of this.connectedTo) {
const pubkey = b4a.from(peer, "hex");
const heartbeat = this.graph.node.get(peer)?.heartbeat;
const conn = this.swarm._allConnections.get(pubkey);
if (!conn) {
this.onRemovePeer({ remotePublicKey: pubkey });
continue;
}
if (heartbeat > 0 && Date.now() - heartbeat > 60 * 1000) {
conn.end();
}
}
}
private _emitHeartbeat(peer?: any) {
let peers = [...this.connectedTo];
if (peer) {
// @ts-ignore
peers = [b4a.from(peer.remotePublicKey).toString("hex")];
}
for (const peer of peers) {
const pubkey = b4a.from(peer, "hex");
const conn = this.swarm._allConnections.get(pubkey);
if (!conn) {
continue;
}
const data = b4a.from(Uint8Array.from([1]));
this.send(
conn,
this._compileMessage({
type: Type.HEARTBEAT,
id: this.id,
data,
signature: crypto.sign(data, this.swarm.keyPair.secretKey),
})
);
}
}
}