222 lines
5.2 KiB
TypeScript
222 lines
5.2 KiB
TypeScript
import EventEmitter from "events";
|
|
import crypto from "crypto";
|
|
// @ts-ignore
|
|
import LRU from "lru";
|
|
import debug0 from "debug";
|
|
// @ts-ignore
|
|
import Protomux from "protomux";
|
|
import { Packet } from "./messages.js";
|
|
// @ts-ignore
|
|
import c from "compact-encoding";
|
|
import b4a from "b4a";
|
|
// @ts-ignore
|
|
import sodium from "sodium-universal";
|
|
|
|
const debug = debug0("dht-flood");
|
|
|
|
const LRU_SIZE = 255;
|
|
const TTL = 255;
|
|
const PROTOCOL = "lumeweb.flood";
|
|
|
|
export default class DHTFlood extends EventEmitter {
|
|
private id: Buffer;
|
|
private ttl: number;
|
|
private messageNumber: number;
|
|
private lru: LRU;
|
|
private swarm: any;
|
|
private protocol: string;
|
|
private topic: Buffer;
|
|
private symbol: Symbol;
|
|
private socketMap: Set<Function> = new Set<Function>();
|
|
private discovery: any;
|
|
|
|
constructor({
|
|
lruSize = LRU_SIZE,
|
|
ttl = TTL,
|
|
messageNumber = 0,
|
|
id = crypto.randomBytes(32),
|
|
swarm = null,
|
|
protocol = PROTOCOL,
|
|
} = {}) {
|
|
super();
|
|
|
|
this.id = id;
|
|
this.ttl = ttl;
|
|
this.messageNumber = messageNumber;
|
|
this.lru = new LRU(lruSize);
|
|
this.protocol = protocol;
|
|
if (!swarm) {
|
|
throw new Error("swarm is required");
|
|
}
|
|
this.swarm = swarm;
|
|
|
|
this.swarm.on("connection", (peer: any) => {
|
|
const mux = Protomux.from(peer);
|
|
mux.pair({ protocol: this.protocol }, () => this.setupPeer(peer));
|
|
});
|
|
|
|
const topic = b4a.from(this.protocol);
|
|
const topicHash = b4a.allocUnsafe(32);
|
|
sodium.crypto_generichash(topicHash, topic);
|
|
|
|
this.topic = topicHash as Buffer;
|
|
|
|
this.discovery = this.swarm.join(topicHash);
|
|
|
|
this.symbol = Symbol.for(this.protocol);
|
|
}
|
|
|
|
private handleMessage(
|
|
{ originId, messageNumber, ttl, data }: Packet,
|
|
messenger: any
|
|
) {
|
|
const originIdBuf = b4a.from(originId) as Buffer;
|
|
|
|
// Ignore messages from ourselves
|
|
if (originIdBuf.equals(this.id))
|
|
return debug("Got message from self", originId, messageNumber);
|
|
|
|
// Ignore messages we've already seen
|
|
const key = originIdBuf.toString("hex") + messageNumber;
|
|
if (this.lru.get(key))
|
|
return debug(
|
|
"Got message that was already seen",
|
|
originId,
|
|
messageNumber
|
|
);
|
|
this.lru.set(key, true);
|
|
|
|
this.emit("message", data, originId, messageNumber);
|
|
|
|
if (ttl <= 0) {
|
|
return debug("Got message at end of TTL", originId, messageNumber, ttl);
|
|
}
|
|
|
|
messenger.send({
|
|
originId,
|
|
messageNumber,
|
|
data,
|
|
ttl: ttl - 1,
|
|
});
|
|
}
|
|
|
|
private setupPeer(peer: any) {
|
|
const mux = Protomux.from(peer);
|
|
let chan: any;
|
|
|
|
const self = this;
|
|
if (!mux.opened({ protocol: this.protocol })) {
|
|
chan = mux.createChannel({
|
|
protocol: this.protocol,
|
|
async onopen() {
|
|
self.emit("peer-open", peer);
|
|
},
|
|
});
|
|
if (chan) {
|
|
// @ts-ignore
|
|
peer[this.symbol] = chan;
|
|
}
|
|
}
|
|
|
|
if (!this.socketMap.has(peer)) {
|
|
const close = () => {
|
|
self.emit("peer-remove", peer);
|
|
peer.removeListener("close", close);
|
|
this.socketMap.delete(peer);
|
|
};
|
|
|
|
peer.on("close", close);
|
|
this.socketMap.add(peer);
|
|
}
|
|
|
|
// @ts-ignore
|
|
chan = peer[this.symbol];
|
|
|
|
if (!chan) {
|
|
throw new Error("could not find channel");
|
|
}
|
|
|
|
if (!chan.messages.length) {
|
|
chan.addMessage({
|
|
encoding: {
|
|
preencode: (state: any, m: any) =>
|
|
c.raw.preencode(state, Packet.toBinary(Packet.create(m))),
|
|
encode: (state: any, m: any) =>
|
|
c.raw.encode(state, Packet.toBinary(Packet.create(m))),
|
|
decode: (state: any) => Packet.fromBinary(c.raw.decode(state)),
|
|
},
|
|
onmessage: (msg: any) => this.handleMessage(msg, chan.messages[0]),
|
|
});
|
|
}
|
|
|
|
if (!chan.opened) {
|
|
chan.open();
|
|
}
|
|
|
|
return chan.messages[0];
|
|
}
|
|
|
|
async broadcast(data: any, ttl = this.ttl) {
|
|
let failed: Buffer[] = [];
|
|
|
|
for (const peer of this.getAllPeers()) {
|
|
if (!this.trySendDataToPubkey(peer, data, ttl)) {
|
|
failed.push(peer);
|
|
}
|
|
}
|
|
|
|
if (failed.length) {
|
|
await this.discovery.refresh();
|
|
|
|
for (const peer of failed) {
|
|
this.trySendDataToPubkey(peer, data, ttl);
|
|
}
|
|
}
|
|
}
|
|
|
|
private getTopicPeers(): Buffer[] {
|
|
return [...this.swarm.peers.values()]
|
|
.filter((peerInfo: any) =>
|
|
peerInfo._seenTopics.has(this.topic.toString("hex"))
|
|
)
|
|
.map((peerInfo) => peerInfo.publicKey);
|
|
}
|
|
|
|
private getAllPeers(): Buffer[] {
|
|
return [...this.swarm._allConnections[Symbol.iterator]()].map(
|
|
(peer: any) => peer.remotePublicKey
|
|
);
|
|
}
|
|
|
|
private trySendDataToPubkey(peer: any, data: any, ttl: number) {
|
|
const topicPeers = this.getTopicPeers();
|
|
|
|
const found = topicPeers.filter((item) => b4a.equals(item, peer));
|
|
|
|
if (!found.length) {
|
|
return false;
|
|
}
|
|
|
|
const conn = this.swarm._allConnections.get(peer);
|
|
if (!conn) {
|
|
return false;
|
|
}
|
|
|
|
this.send(conn, data, ttl);
|
|
return true;
|
|
}
|
|
|
|
send(peer: any, data: any, ttl = this.ttl) {
|
|
this.messageNumber++;
|
|
const { id, messageNumber } = this;
|
|
|
|
const message = this.setupPeer(peer);
|
|
message.send({
|
|
originId: id,
|
|
messageNumber,
|
|
ttl,
|
|
data: b4a.from(data),
|
|
});
|
|
}
|
|
}
|