dht-cache/dist/index.js

476 lines
17 KiB
JavaScript

"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const events_1 = __importDefault(require("events"));
// @ts-ignore
const jsnetworkx_1 = require("jsnetworkx");
// @ts-ignore
const hypercore_crypto_1 = __importDefault(require("hypercore-crypto"));
const b4a_1 = __importDefault(require("b4a"));
const messages_js_1 = require("./messages.js");
const dht_flood_1 = __importDefault(require("@lumeweb/dht-flood"));
const EntityType = {
PUBKEY: Symbol.for("PUBKEY"),
ITEM: Symbol.for("ITEM"),
};
const DISCONNECT_SMOOTH = 500;
class DHTCache extends events_1.default {
swarm;
id;
bootstrapped;
graph;
connectedTo;
heartBeatInterval;
flood;
log;
constructor(swarm, { id = swarm.keyPair.publicKey, logger, heartBeatInterval = 60, ...opts } = {}) {
super();
if (!id) {
throw new TypeError("Must provide id for self");
}
if (!logger) {
throw new TypeError("Must logger for self");
}
this.id = b4a_1.default.from(id);
this.bootstrapped = false;
this.graph = new jsnetworkx_1.DiGraph();
this.connectedTo = new Set();
this.heartBeatInterval = heartBeatInterval;
this._cache = new Set();
this._online = new Set([this._maybeHexify(this.id)]);
this.swarm = swarm;
this.flood = new dht_flood_1.default({ id, swarm, ...opts });
this.log = logger;
this.flood.on("peer-open", (peer) => this.addPeerHandler(peer));
this.flood.on("peer-remove", (peer) => this.removePeerHandler(peer));
this.flood.on("message", (message, id) => this.onGetBroadcast(message, id));
this.swarm.on("connection", this._hello.bind(this));
[...this.swarm.peers.values()]
.map((item) => {
remotePublicKey: item.publicKey;
})
.filter((item) => !!item)
.forEach((item) => {
this.addPeerHandler(item);
});
this._ensurePeer(this.id);
setInterval(() => this._heartbeatCheck(), (this.heartBeatInterval / 4) * 1000);
setInterval(() => this._emitHeartbeat(), this.heartBeatInterval * 1000);
}
_cache;
get cache() {
return [...this._cache].sort();
}
get allCache() {
const items = [];
for (const id of this.graph.nodesIter()) {
const item = this.graph.node.get(id);
if (item?.type !== EntityType.ITEM) {
continue;
}
items.push(id);
}
return items.sort();
}
peerHasItem(peer, item) {
peer = this._maybeHexify(peer);
item = this._maybeHexify(item);
return this.graph.hasSuccessor(peer, item);
}
addItem(item) {
item = this._maybeHexify(item);
this._cache.add(item);
this._ensureItem(item);
this._addEntityConnection(this.id, item);
const broadcast = () => {
this._broadcastMessage(this._createAddItemRequest(item));
};
if (!this.bootstrapped) {
this.once("bootstrapped", broadcast);
}
else {
broadcast();
}
}
_compileMessage(message) {
return messages_js_1.Message.toBinary(messages_js_1.Message.create(message));
}
_broadcastMessage(message, ttl) {
this.broadcast(this._compileMessage(message), ttl);
}
removeItem(item) {
item = this._maybeHexify(item);
if (!this._cache.has(item)) {
return false;
}
this._cache.delete(item);
this._removeEntity(item);
const broadcast = () => {
this._broadcastMessage({
type: messages_js_1.Type.REMOVE_ITEM,
data: b4a_1.default.from(item, "hex"),
signature: this._signItem(item),
});
};
if (!this.bootstrapped) {
this.once("bootstrapped", broadcast);
}
else {
broadcast();
}
return true;
}
_online;
get online() {
return this._online;
}
broadcast(message, ttl) {
this.flood.broadcast(message, ttl);
}
send(peer, message) {
this.flood.send(peer, message, 0);
}
addPeerHandler(peer) {
const id = peer.remotePublicKey;
const stringId = id.toString("hex");
if (!this.connectedTo.has(stringId)) {
this.connectedTo.add(stringId);
}
if (!this._hasSeenEntity(id)) {
this._ensurePeer(id);
this._addEntityConnection(this.id, id);
this.emit("peer-add", id);
this._recalculate();
this._broadcastMessage({
type: messages_js_1.Type.CONNECTED,
id,
});
this.send(peer, this._compileMessage({ type: messages_js_1.Type.BOOTSTRAP_REQUEST }));
this._emitHeartbeat(peer);
this.log.debug(`Relay peer connected: ${stringId}`);
}
if (this.bootstrapped) {
this._sendItemsToPeer(peer);
return;
}
this.log.debug(`Broadcasting bootstrap request`);
// If this is the first person we've met, get their graph
this._broadcastMessage({ type: messages_js_1.Type.BOOTSTRAP_REQUEST }, 0);
}
removePeerHandler(peer) {
const id = peer.remotePublicKey;
// Wait for a bit and check if we're still disconnected before removing the peer
setTimeout(() => {
if (this.swarm._allConnections.has(id)) {
return;
}
this.onRemovePeer(peer);
}, DISCONNECT_SMOOTH);
this.log.debug(`Relay peer might have disconnected: ${id.toString("hex")}`);
}
onRemovePeer(peer) {
const id = peer.remotePublicKey;
this.connectedTo.delete(id.toString("hex"));
this._removeEntityConnection(this.id, id);
this.emit("peer-remove");
this._recalculate();
this._broadcastMessage({
type: messages_js_1.Type.DISCONNECTED,
id,
}, 0);
this.log.debug(`Relay peer confirmed disconnected: ${id.toString("hex")}`);
}
onGetBroadcast(message, id) {
let decoded;
try {
decoded = messages_js_1.Message.fromBinary(message);
}
catch {
return;
}
const { type } = decoded;
if (!type) {
throw new Error("Missing Type In Message");
}
if ([messages_js_1.Type.ADD_ITEM, messages_js_1.Type.REMOVE_ITEM].includes(type)) {
const { data: rawData, signature } = decoded;
const bufData = b4a_1.default.from(rawData);
if (signature && hypercore_crypto_1.default.verify(bufData, signature, id)) {
if (messages_js_1.Type.ADD_ITEM === type) {
this._ensureItem(bufData);
this._addEntityConnection(id, bufData);
this.emit("item-added", id, bufData);
this.log.debug(`New item added: ${bufData.toString("hex")} from ${id.toString("hex")}`);
}
if (messages_js_1.Type.REMOVE_ITEM === type) {
this.removeItem(bufData);
this._pruneItems();
this.emit("item-removed", id, bufData);
this.log.debug(`Item removed: ${bufData.toString("hex")} from ${id.toString("hex")}`);
}
}
}
else if (type === messages_js_1.Type.CONNECTED) {
const { id: toId } = decoded;
let bufId = b4a_1.default.from(toId);
this._ensurePeer(id);
this._ensurePeer(bufId);
this._addEntityConnection(id, bufId);
this.emit("peer-add-seen", id, bufId);
this._recalculate();
this.log.debug(`Network peer connected: ${bufId.toString("hex")}`);
}
else if (type === messages_js_1.Type.DISCONNECTED) {
const { id: toId } = decoded;
let bufId = b4a_1.default.from(toId);
this._removeEntityConnection(id, b4a_1.default.from(toId));
this.emit("peer-remove-seen", id, toId);
this._recalculate();
this._pruneItems();
this.log.debug(`Network peer disconnected: ${bufId.toString("hex")}`);
}
else if (type === messages_js_1.Type.BOOTSTRAP_REQUEST) {
const bootstrap = this._getBootstrapInfo();
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.BOOTSTRAP_RESPONSE,
bootstrap,
})), 0);
this.log.debug(`Bootstrap request received`);
}
else if (type === messages_js_1.Type.BOOTSTRAP_RESPONSE) {
const { bootstrap } = decoded;
this._bootstrapFrom(bootstrap);
if (this.swarm._allConnections.has(id)) {
this._sendItemsToPeer(this.swarm._allConnections.get(id));
}
this.log.debug(`Bootstrap response received`);
}
else if (type === messages_js_1.Type.HEARTBEAT) {
const { id: toId, signature, data: bufData } = decoded;
let bufId = b4a_1.default.from(toId);
if (!(signature && hypercore_crypto_1.default.verify(bufData, signature, bufId))) {
return;
}
this.addPeerHandler({
remotePublicKey: bufId,
});
this._setEntity(bufId, { heartbeat: Date.now() });
this._heartbeatCheck();
this.log.debug(`Heartbeat received from ${bufId.toString("hex")}`);
}
}
_signItem(item) {
item = this._maybeHexify(item);
return hypercore_crypto_1.default.sign(b4a_1.default.from(item, "hex"), this.swarm.keyPair.secretKey);
}
_setEntity(id, data) {
this.graph.addNode(this._maybeHexify(id), data);
}
_ensureItem(id) {
this._ensureEntity(id, { type: EntityType.ITEM });
}
_ensurePeer(id) {
this._ensureEntity(id, { type: EntityType.PUBKEY });
}
_ensureEntity(id, def = {}) {
id = this._maybeHexify(id);
if (!this._hasSeenEntity(id)) {
this._setEntity(id, def);
}
}
_hasSeenEntity(id) {
return this.graph.hasNode(this._maybeHexify(id));
}
_addEntityConnection(origin, destination) {
this._ensureEntity(origin);
this._ensureEntity(destination);
this.graph.addEdge(this._maybeHexify(origin), this._maybeHexify(destination));
}
_removeEntityConnection(origin, destination) {
try {
this._ensureEntity(origin);
this._ensureEntity(destination);
this.graph.removeEdge(origin.toString("hex"), destination.toString("hex"));
}
catch (e) {
if (e.name !== "JSNetworkXError")
throw e;
}
}
_removeEntity(id) {
this.graph.removeNode(this._maybeHexify(id));
}
_bootstrapFrom(bootstrap) {
for (const id in bootstrap) {
const { connectedTo } = bootstrap[id];
if (id === this.id.toString("hex")) {
continue;
}
if (!this.connectedTo.has(id)) {
this.swarm.joinPeer(b4a_1.default.from(id, "hex"));
}
for (const connection of connectedTo) {
const peer = b4a_1.default.from(connection);
if (b4a_1.default.equals(peer, this.id)) {
continue;
}
this._ensurePeer(peer);
this._addEntityConnection(id, peer);
if (!this.connectedTo.has(peer.toString("hex"))) {
this.swarm.joinPeer(peer);
}
}
}
if (!this.bootstrapped) {
this.bootstrapped = true;
this.emit("bootstrapped");
}
this._recalculate();
}
_getBootstrapInfo() {
const state = {};
for (const id of this.graph.nodesIter()) {
const item = this.graph.node.get(id);
if (item?.type !== EntityType.PUBKEY) {
continue;
}
const connectedTo = this.graph
.neighbors(id)
.filter((item) => this.graph.node.get(item)?.type === EntityType.PUBKEY)
.map((id) => b4a_1.default.from(id, "hex"));
state[id] = { connectedTo };
}
return state;
}
// Calculate who's online and emit an event
_recalculate() {
const online = new Set();
const offline = new Set();
for (const id of this.graph.nodesIter()) {
const item = this.graph.node.get(id);
if (item?.type !== EntityType.PUBKEY) {
continue;
}
if ((0, jsnetworkx_1.hasPath)(this.graph, {
source: this._maybeHexify(this.id),
target: id,
})) {
online.add(id);
}
else {
offline.add(id);
}
}
for (const id of offline) {
this.graph.removeNode(id);
}
for (const id of online) {
if (b4a_1.default.equals(b4a_1.default.from(id, "hex"), this.id)) {
continue;
}
const item = this.graph.node.get(id);
if (!item?.online) {
online.delete(id);
}
}
this._online = online;
this.emit("online", online);
this.log.debug(`Online list updated: ${online.size - 1} network peers online, ${offline.size} network peers offline and removed from DAG`);
}
_maybeHexify(data) {
if (b4a_1.default.isBuffer(data)) {
return data.toString("hex");
}
return data;
}
_pruneItems(item) {
let items;
if (!item) {
items = this.graph.nodesIter();
}
else {
items = [item];
}
(async () => {
for (const id of items) {
const item = this.graph.node.get(id);
if (item?.type !== EntityType.ITEM) {
continue;
}
(async () => {
try {
if (0 === this.graph.neighbors(id)?.length) {
this.graph.removeNode(id);
}
}
catch { }
})();
}
})();
}
_heartbeatCheck() {
let changed = false;
for (const peer of this.connectedTo) {
const pubkey = b4a_1.default.from(peer, "hex");
const node = this.graph.node.get(peer);
const heartbeat = node?.heartbeat;
const conn = this.swarm._allConnections.get(pubkey);
const online = conn &&
heartbeat > 0 &&
Date.now() - heartbeat <= this.heartBeatInterval * 1000;
if (node?.online !== online) {
changed = true;
}
this._setEntity(peer, { online });
}
if (changed) {
this._recalculate();
}
}
_emitHeartbeat(peer) {
let peers = [...this.connectedTo];
if (peer) {
// @ts-ignore
peers = [b4a_1.default.from(peer.remotePublicKey).toString("hex")];
}
for (const peer of peers) {
const pubkey = b4a_1.default.from(peer, "hex");
const conn = this.swarm._allConnections.get(pubkey);
if (!conn) {
continue;
}
const data = b4a_1.default.from(Uint8Array.from([1]));
this.send(conn, this._compileMessage({
type: messages_js_1.Type.HEARTBEAT,
id: this.id,
data,
signature: hypercore_crypto_1.default.sign(data, this.swarm.keyPair.secretKey),
}));
}
}
_hello(peer) {
this.send(peer, b4a_1.default.from("hello"));
this.swarm.leavePeer(peer.remotePublicKey);
}
_sendItemsToPeer(peer) {
for (let item of this.cache) {
item = this._maybeHexify(item);
this.send(peer, this._createAddItemRequestMessage(item));
}
}
_createAddItemRequestMessage(item) {
item = this._maybeHexify(item);
return messages_js_1.Message.toBinary(messages_js_1.Message.create(this._createAddItemRequest(item)));
}
_createAddItemRequest(item) {
return {
type: messages_js_1.Type.ADD_ITEM,
data: b4a_1.default.from(item, "hex"),
signature: this._signItem(item),
};
}
}
exports.default = DHTCache;