dht-data/dist/DHTDataBase.js

276 lines
9.9 KiB
JavaScript

"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const events_1 = __importDefault(require("events"));
// @ts-ignore
const jsnetworkx_1 = require("jsnetworkx");
// @ts-ignore
const ordered_json_1 = __importDefault(require("ordered-json"));
// @ts-ignore
const hypercore_crypto_1 = __importDefault(require("hypercore-crypto"));
const b4a_1 = __importDefault(require("b4a"));
const messages_js_1 = require("./messages.js");
const debug_1 = __importDefault(require("debug"));
class DHTDataBase extends events_1.default {
swarm;
id;
bootstrapped;
graph;
connectedTo;
constructor(id, { swarm } = {}) {
super();
if (!id)
throw new TypeError("Must provide id for self");
this.id = id;
this.bootstrapped = false;
this.graph = new jsnetworkx_1.DiGraph();
this.connectedTo = new Set();
this._data = {};
this._online = [this._maybeHexify(this.id)];
this.swarm = swarm;
}
_data;
get data() {
return { ...this._data };
}
set data(value) {
this._data = value;
const timestamp = BigInt(Date.now());
const rawData = ordered_json_1.default.stringify(value);
const signature = hypercore_crypto_1.default.sign(b4a_1.default.from(`${timestamp}${rawData}`), this.swarm.keyPair.secretKey);
this._setPeer(this.id, value, timestamp, signature);
this._broadcastData();
}
_online;
get online() {
return this._online;
}
broadcast(data, ttl) {
throw new TypeError("Broadcast has not been implemented");
}
getPeerRaw(id) {
return this.graph.node.get(this._maybeHexify(id));
}
getPeerField(id, field) {
return this.getPeerRaw(id)?.[field];
}
getPeerData(id) {
return this.getPeerField(id, "data");
}
getPeerTimestamp(id) {
return this.getPeerField(id, "timestamp");
}
getPeerSignature(id) {
return this.getPeerField(id, "signature");
}
onAddPeer(id) {
const stringId = id.toString("hex");
if (this.connectedTo.has(stringId)) {
return;
} // Already know we're connected here
this.connectedTo.add(stringId);
this._addPeerConnection(this.id, id);
this.emit("peer-add", id);
this._recalculate();
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.CONNECTED,
id,
})));
this._broadcastData();
if (this.bootstrapped) {
return;
}
// If this is the first person we've met, get their graph
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.BOOTSTRAP_REQUEST,
})), 0);
}
onRemovePeer(id) {
this.connectedTo.delete(id.toString("hex"));
this._removePeerConnection(this.id, id);
this.emit("peer-remove");
this._recalculate();
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.DISCONNECTED,
id,
})));
}
onGetBroadcast(message, id) {
let decoded;
try {
decoded = messages_js_1.Message.fromBinary(message);
}
catch {
return;
}
const { type } = decoded;
if (!type) {
throw new Error("Missing Type In Message");
}
if (type === messages_js_1.Type.STATE) {
const { data: rawData, timestamp, signature } = decoded;
if (signature &&
hypercore_crypto_1.default.verify(b4a_1.default.from(`${timestamp}${rawData}`), signature, id)) {
if ((timestamp || 0) <= this.getPeerTimestamp(id)) {
(0, debug_1.default)(`Received old data for peer ${id}`);
return;
}
const data = rawData ? ordered_json_1.default.parse(rawData) : null;
this._setPeer(id, data, timestamp, signature);
this.emit("peer-data", data, id);
this._recalculate();
return;
}
(0, debug_1.default)(`Invalid signature received for peer ${id}`);
}
else if (type === messages_js_1.Type.CONNECTED) {
const { id: toId } = decoded;
this._addPeerConnection(id, Buffer.from(toId));
this.emit("peer-add-seen", id, toId);
this._recalculate();
}
else if (type === messages_js_1.Type.DISCONNECTED) {
const { id: toId } = decoded;
this._removePeerConnection(id, Buffer.from(toId));
this.emit("peer-remove-seen", id, toId);
this._recalculate();
}
else if (type === messages_js_1.Type.BOOTSTRAP_REQUEST) {
const bootstrap = this._getBootstrapInfo();
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.BOOTSTRAP_RESPONSE,
bootstrap,
})), 0);
}
else if (type === messages_js_1.Type.BOOTSTRAP_RESPONSE) {
const { bootstrap } = decoded;
this._bootstrapFrom(bootstrap);
}
}
_broadcastData() {
const rawData = this._data;
if (!Object.keys(rawData).length) {
return;
}
const data = ordered_json_1.default.stringify(rawData);
const { timestamp, signature } = this.getPeerRaw(this.id);
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.STATE,
data: b4a_1.default.from(data),
signature,
timestamp,
})));
}
_hasSeenPeer(id) {
return this.graph.hasNode(this._maybeHexify(id));
}
_setPeer(id, data, timestamp, signature) {
this.graph.addNode(this._maybeHexify(id), {
timestamp,
signature,
data,
});
}
_ensurePeer(id) {
id = this._maybeHexify(id);
if (!this._hasSeenPeer(id)) {
this._setPeer(id, {});
}
}
_addPeerConnection(origin, destination) {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.addEdge(this._maybeHexify(origin), this._maybeHexify(destination));
}
_removePeerConnection(origin, destination) {
try {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.removeEdge(origin.toString("hex"), destination.toString("hex"));
}
catch (e) {
if (e.name !== "JSNetworkXError")
throw e;
}
}
_bootstrapFrom(bootstrap) {
if (this.bootstrapped) {
return;
}
for (const id in bootstrap) {
const { data, connectedTo, signature, timestamp } = bootstrap[id];
if (id === this.id.toString("hex")) {
continue;
}
if (signature &&
hypercore_crypto_1.default.verify(b4a_1.default.from(`${timestamp}${data}`), signature, b4a_1.default.from(id, "hex"))) {
const parsedData = data ? ordered_json_1.default.parse(data) : null;
let peerData = parsedData || {};
// If we're already tracking them
if (this._hasSeenPeer(id)) {
// Ensure we don't have old data
if ((timestamp || 0) > this.getPeerTimestamp(id)) {
// See what data we already have for them
// Add their existing data to what we got from the bootstrap
const existingPeerData = this.getPeerData(id);
peerData = { ...existingPeerData, ...peerData };
this._setPeer(id, peerData, timestamp, signature);
}
else {
(0, debug_1.default)(`Received old data for peer ${id}`);
}
}
}
else {
(0, debug_1.default)(`Invalid signature received for peer ${id}`);
}
for (const connection of connectedTo) {
this._addPeerConnection(id, Buffer.from(connection));
}
}
this.emit("bootstrapped");
this._recalculate();
}
_getBootstrapInfo() {
const state = {};
for (const [id, rawData] of this.graph.nodes(true)) {
const connectedTo = this.graph
.neighbors(id)
.map((id) => Buffer.from(id, "hex"));
const data = rawData ? ordered_json_1.default.stringify(rawData?.data) : null;
const { timestamp = undefined, signature = undefined } = rawData;
state[id] = { data: b4a_1.default.from(data), connectedTo, timestamp, signature };
}
return state;
}
// Calculate who's online and emit an event
_recalculate() {
const online = this.graph.nodes().filter((id) => {
return (0, jsnetworkx_1.hasPath)(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
const offline = this.graph.nodes().filter((id) => {
return !(0, jsnetworkx_1.hasPath)(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
for (const id of offline) {
this.graph.removeNode(id);
}
this._online = online;
this.emit("online", online);
}
_maybeHexify(data) {
if (b4a_1.default.isBuffer(data)) {
return data.toString("hex");
}
return data;
}
}
exports.default = DHTDataBase;