dht-flood/dist/index.js

130 lines
4.6 KiB
JavaScript
Raw Normal View History

2022-11-15 18:48:21 +00:00
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
2022-11-16 06:33:52 +00:00
exports.FLOOD_SYMBOL = void 0;
2022-11-15 18:48:21 +00:00
const events_1 = __importDefault(require("events"));
const crypto_1 = __importDefault(require("crypto"));
// @ts-ignore
const lru_1 = __importDefault(require("lru"));
const debug_1 = __importDefault(require("debug"));
// @ts-ignore
const protomux_1 = __importDefault(require("protomux"));
const messages_js_1 = require("./messages.js");
// @ts-ignore
const compact_encoding_1 = __importDefault(require("compact-encoding"));
const b4a_1 = __importDefault(require("b4a"));
2022-11-15 19:28:57 +00:00
const debug = (0, debug_1.default)("dht-flood");
2022-11-15 18:48:21 +00:00
const LRU_SIZE = 255;
const TTL = 255;
const PROTOCOL = "lumeweb.flood";
2022-11-16 06:33:52 +00:00
exports.FLOOD_SYMBOL = Symbol.for(PROTOCOL);
2022-11-15 18:48:21 +00:00
class DHTFlood extends events_1.default {
id;
ttl;
messageNumber;
lru;
swarm;
2022-11-16 07:41:56 +00:00
protocol;
constructor({ lruSize = LRU_SIZE, ttl = TTL, messageNumber = 0, id = crypto_1.default.randomBytes(32), swarm = null, protocol = PROTOCOL, } = {}) {
2022-11-15 18:48:21 +00:00
super();
this.id = id;
this.ttl = ttl;
this.messageNumber = messageNumber;
this.lru = new lru_1.default(lruSize);
2022-11-16 07:41:56 +00:00
this.protocol = protocol;
2022-11-15 18:48:21 +00:00
if (!swarm) {
2022-11-15 19:28:57 +00:00
throw new Error("swarm is required");
2022-11-15 18:48:21 +00:00
}
this.swarm = swarm;
this.swarm.on("connection", (peer) => {
const mux = protomux_1.default.from(peer);
2022-11-16 07:41:56 +00:00
mux.pair({ protocol: this.protocol }, () => this.setupPeer(peer));
2022-11-15 18:48:21 +00:00
});
}
handleMessage({ originId, messageNumber, ttl, data }, messenger) {
const originIdBuf = b4a_1.default.from(originId);
// Ignore messages from ourselves
if (originIdBuf.equals(this.id))
2022-11-15 19:28:57 +00:00
return debug("Got message from self", originId, messageNumber);
2022-11-15 18:48:21 +00:00
// Ignore messages we've already seen
2022-11-15 19:28:57 +00:00
const key = originIdBuf.toString("hex") + messageNumber;
2022-11-15 18:48:21 +00:00
if (this.lru.get(key))
2022-11-15 19:28:57 +00:00
return debug("Got message that was already seen", originId, messageNumber);
2022-11-15 18:48:21 +00:00
this.lru.set(key, true);
2022-11-15 19:28:57 +00:00
this.emit("message", data, originId, messageNumber);
2022-11-16 06:33:52 +00:00
if (ttl <= 0) {
2022-11-15 19:28:57 +00:00
return debug("Got message at end of TTL", originId, messageNumber, ttl);
2022-11-16 06:33:52 +00:00
}
2022-11-15 18:48:21 +00:00
messenger.send({
originId,
messageNumber,
data,
2022-11-15 19:28:57 +00:00
ttl: ttl - 1,
2022-11-15 18:48:21 +00:00
});
}
setupPeer(peer) {
const mux = protomux_1.default.from(peer);
let chan;
2022-11-15 19:28:57 +00:00
const self = this;
2022-11-26 23:09:41 +00:00
if (!mux.opened({ protocol: this.protocol })) {
2022-11-15 18:48:21 +00:00
chan = mux.createChannel({
2022-11-16 07:41:56 +00:00
protocol: this.protocol,
2022-11-15 19:28:57 +00:00
async onopen() {
self.emit("peer-open", peer);
},
async ondestroy() {
2022-11-15 19:28:57 +00:00
self.emit("peer-remove", peer);
},
2022-11-15 18:48:21 +00:00
});
2022-11-26 23:09:41 +00:00
if (chan) {
peer[exports.FLOOD_SYMBOL] = chan;
}
2022-11-15 18:48:21 +00:00
}
2022-11-16 06:33:52 +00:00
chan = peer[exports.FLOOD_SYMBOL];
2022-11-15 18:48:21 +00:00
if (!chan) {
2022-11-15 19:28:57 +00:00
throw new Error("could not find channel");
2022-11-15 18:48:21 +00:00
}
if (!chan.messages.length) {
chan.addMessage({
encoding: {
2022-11-15 19:28:57 +00:00
preencode: (state, m) => compact_encoding_1.default.raw.preencode(state, messages_js_1.Packet.toBinary(messages_js_1.Packet.create(m))),
encode: (state, m) => compact_encoding_1.default.raw.encode(state, messages_js_1.Packet.toBinary(messages_js_1.Packet.create(m))),
decode: (state) => messages_js_1.Packet.fromBinary(compact_encoding_1.default.raw.decode(state)),
2022-11-15 18:48:21 +00:00
},
onmessage: (msg) => this.handleMessage(msg, chan.messages[0]),
});
}
if (!chan.opened) {
chan.open();
}
return chan.messages[0];
}
broadcast(data, ttl = this.ttl) {
this.messageNumber++;
const { id, messageNumber } = this;
for (const peer of this.swarm.connections.values()) {
const message = this.setupPeer(peer);
message.send({
originId: id,
messageNumber,
ttl,
2022-11-15 20:48:26 +00:00
data: b4a_1.default.from(data),
2022-11-15 18:48:21 +00:00
});
}
}
2022-11-15 20:53:25 +00:00
send(peer, data, ttl = this.ttl) {
this.messageNumber++;
const { id, messageNumber } = this;
const message = this.setupPeer(peer);
message.send({
originId: id,
messageNumber,
ttl,
data: b4a_1.default.from(data),
});
}
2022-11-15 18:48:21 +00:00
}
exports.default = DHTFlood;