From 1da4a4967e8a3161c14cf1a2a594de5e1b8637e9 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Mon, 19 Jun 2023 02:31:53 -0400 Subject: [PATCH] fix: fix broadcasting to find peers on topic correctly, and use the discovery object to refresh if required --- src/index.ts | 69 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/src/index.ts b/src/index.ts index 6536eec..88e3368 100644 --- a/src/index.ts +++ b/src/index.ts @@ -28,6 +28,7 @@ export default class DHTFlood extends EventEmitter { private topic: Buffer; private symbol: Symbol; private socketMap: Set = new Set(); + private discovery: any; constructor({ lruSize = LRU_SIZE, @@ -60,7 +61,7 @@ export default class DHTFlood extends EventEmitter { this.topic = topicHash as Buffer; - this.swarm.join(topicHash); + this.discovery = this.swarm.join(topicHash); this.symbol = Symbol.for(this.protocol); } @@ -155,30 +156,54 @@ export default class DHTFlood extends EventEmitter { return chan.messages[0]; } - broadcast(data: any, ttl = this.ttl) { - this.messageNumber++; - const { id, messageNumber } = this; + async broadcast(data: any, ttl = this.ttl) { + let failed: Buffer[] = []; - let topicString = this.topic.toString("hex"); - - let peers: Buffer[] = [...this.swarm.peers.values()] - .filter((peerInfo: any) => peerInfo._seenTopics.has(topicString)) - .map((peerInfo) => peerInfo.publicKey); - - for (const peer of peers) { - const conn = this.swarm._allConnections.get(peer); - if (!conn) { - continue; + for (const peer of this.getAllPeers()) { + if (!this.trySendDataToPubkey(peer, data, ttl)) { + failed.push(peer); } - - const message = this.setupPeer(conn); - message.send({ - originId: id, - messageNumber, - ttl, - data: b4a.from(data), - }); } + + if (failed.length) { + await this.discovery.refresh(); + + for (const peer of failed) { + this.trySendDataToPubkey(peer, data, ttl); + } + } + } + + private getTopicPeers(): Buffer[] { + return [...this.swarm.peers.values()] + .filter((peerInfo: any) => + peerInfo._seenTopics.has(this.topic.toString("hex")) + ) + .map((peerInfo) => peerInfo.publicKey); + } + + private getAllPeers(): Buffer[] { + return [...this.swarm._allConnections[Symbol.iterator]()].map( + (peer: any) => peer.remotePublicKey + ); + } + + private trySendDataToPubkey(peer: any, data: any, ttl: number) { + const topicPeers = this.getTopicPeers(); + + const found = topicPeers.filter((item) => b4a.equals(item, peer)); + + if (!found.length) { + return false; + } + + const conn = this.swarm._allConnections.get(peer); + if (!conn) { + return false; + } + + this.send(conn, data, ttl); + return true; } send(peer: any, data: any, ttl = this.ttl) {