diff --git a/package.json b/package.json index 92fe4be..346a301 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "b4a": "^1.6.1", "debug": "^4.3.4", "hyperswarm": "^4.3.5", + "prettier": "^2.7.1", "protoc": "^1.1.3", "sodium-universal": "^3.1.0", "tape": "^5.6.1" diff --git a/src/index.ts b/src/index.ts index 580e558..c2b5204 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,129 +5,140 @@ import LRU from "lru"; import debug0 from "debug"; // @ts-ignore import Protomux from "protomux"; -import {Packet, PacketType} from "./messages.js"; +import { Packet, PacketType } from "./messages.js"; // @ts-ignore -import c from "compact-encoding" -import b4a from "b4a" +import c from "compact-encoding"; +import b4a from "b4a"; -const debug = debug0('dht-flood') +const debug = debug0("dht-flood"); -const LRU_SIZE = 255 -const TTL = 255 -const PROTOCOL = "lumeweb.flood" +const LRU_SIZE = 255; +const TTL = 255; +const PROTOCOL = "lumeweb.flood"; -const FLOOD_SYMBOL = Symbol.for(PROTOCOL) +const FLOOD_SYMBOL = Symbol.for(PROTOCOL); export default class DHTFlood extends EventEmitter { - private id: Buffer; - private ttl: number; - private messageNumber: number; - private lru: LRU; - private swarm: any; + private id: Buffer; + private ttl: number; + private messageNumber: number; + private lru: LRU; + private swarm: any; - constructor({ - lruSize = LRU_SIZE, - ttl = TTL, - messageNumber = 0, - id = crypto.randomBytes(32), - swarm = null - } = {}) { - super() + constructor({ + lruSize = LRU_SIZE, + ttl = TTL, + messageNumber = 0, + id = crypto.randomBytes(32), + swarm = null, + } = {}) { + super(); - this.id = id - this.ttl = ttl - this.messageNumber = messageNumber - this.lru = new LRU(lruSize) - if (!swarm) { - throw new Error('swarm is required'); - } - this.swarm = swarm; + this.id = id; + this.ttl = ttl; + this.messageNumber = messageNumber; + this.lru = new LRU(lruSize); + if (!swarm) { + throw new Error("swarm is required"); + } + this.swarm = swarm; - this.swarm.on("connection", (peer: any) => { - const mux = Protomux.from(peer); - mux.pair({protocol: PROTOCOL}, () => this.setupPeer(peer)); - }); + this.swarm.on("connection", (peer: any) => { + const mux = Protomux.from(peer); + mux.pair({ protocol: PROTOCOL }, () => this.setupPeer(peer)); + }); + } + + private handleMessage( + { originId, messageNumber, ttl, data }: PacketType, + messenger: any + ) { + const originIdBuf = b4a.from(originId) as Buffer; + + // Ignore messages from ourselves + if (originIdBuf.equals(this.id)) + return debug("Got message from self", originId, messageNumber); + + // Ignore messages we've already seen + const key = originIdBuf.toString("hex") + messageNumber; + if (this.lru.get(key)) + return debug( + "Got message that was already seen", + originId, + messageNumber + ); + this.lru.set(key, true); + + this.emit("message", data, originId, messageNumber); + + if (ttl <= 0) + return debug("Got message at end of TTL", originId, messageNumber, ttl); + + messenger.send({ + originId, + messageNumber, + data, + ttl: ttl - 1, + }); + } + + private setupPeer(peer: any) { + const mux = Protomux.from(peer); + let chan: any; + + const self = this; + + if (!mux.opened({ protocol: PROTOCOL })) { + chan = mux.createChannel({ + protocol: PROTOCOL, + async onopen() { + self.emit("peer-open", peer); + }, + async ondestroy() { + self.emit("peer-remove", peer); + }, + }); + peer[FLOOD_SYMBOL] = chan; } - private handleMessage({originId, messageNumber, ttl, data}: PacketType, messenger: any) { - const originIdBuf = b4a.from(originId) as Buffer; - - // Ignore messages from ourselves - if (originIdBuf.equals(this.id)) return debug('Got message from self', originId, messageNumber) - - // Ignore messages we've already seen - const key = originIdBuf.toString('hex') + messageNumber - if (this.lru.get(key)) return debug('Got message that was already seen', originId, messageNumber) - this.lru.set(key, true) - - this.emit('message', data, originId, messageNumber) - - if (ttl <= 0) return debug('Got message at end of TTL', originId, messageNumber, ttl) - - messenger.send({ - originId, - messageNumber, - data, - ttl: ttl - 1 - }); + chan = peer[FLOOD_SYMBOL]; + if (!chan) { + throw new Error("could not find channel"); } - private setupPeer(peer: any) { - const mux = Protomux.from(peer); - let chan: any; - - const self = this; - - if (!mux.opened({protocol: PROTOCOL})) { - chan = mux.createChannel({ - protocol: PROTOCOL, - async onopen() { - self.emit('peer-open', peer) - }, - async ondestroy() { - self.emit('peer-remove', peer) - }, - }); - peer[FLOOD_SYMBOL] = chan; - } - - chan = peer[FLOOD_SYMBOL]; - - if (!chan) { - throw new Error('could not find channel'); - } - - if (!chan.messages.length) { - chan.addMessage({ - encoding: { - preencode: (state: any, m: any) => c.raw.preencode(state, Packet.toBinary(Packet.create(m))), - encode: (state: any, m: any) => c.raw.encode(state, Packet.toBinary(Packet.create(m))), - decode: (state: any) => Packet.fromBinary(c.raw.decode(state)), - }, - onmessage: (msg: any) => this.handleMessage(msg, chan.messages[0]), - }) - } - - if (!chan.opened) { - chan.open(); - } - - return chan.messages[0]; + if (!chan.messages.length) { + chan.addMessage({ + encoding: { + preencode: (state: any, m: any) => + c.raw.preencode(state, Packet.toBinary(Packet.create(m))), + encode: (state: any, m: any) => + c.raw.encode(state, Packet.toBinary(Packet.create(m))), + decode: (state: any) => Packet.fromBinary(c.raw.decode(state)), + }, + onmessage: (msg: any) => this.handleMessage(msg, chan.messages[0]), + }); } - broadcast(data: any, ttl = this.ttl) { - this.messageNumber++ - const {id, messageNumber} = this - - for (const peer of this.swarm.connections.values()) { - const message = this.setupPeer(peer); - message.send({ - originId: id, - messageNumber, - ttl, - data - }) - } + if (!chan.opened) { + chan.open(); } + + return chan.messages[0]; + } + + broadcast(data: any, ttl = this.ttl) { + this.messageNumber++; + const { id, messageNumber } = this; + + for (const peer of this.swarm.connections.values()) { + const message = this.setupPeer(peer); + message.send({ + originId: id, + messageNumber, + ttl, + data, + }); + } + } } diff --git a/src/messages.ts b/src/messages.ts index 8b2717b..c68402b 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -15,86 +15,120 @@ import { MessageType } from "@protobuf-ts/runtime"; * @generated from protobuf message Packet */ export interface PacketType { - /** - * @generated from protobuf field: bytes originId = 1; - */ - originId: Uint8Array; - /** - * @generated from protobuf field: uint32 messageNumber = 2; - */ - messageNumber: number; - /** - * @generated from protobuf field: uint32 ttl = 3; - */ - ttl: number; - /** - * @generated from protobuf field: bytes data = 4; - */ - data: Uint8Array; + /** + * @generated from protobuf field: bytes originId = 1; + */ + originId: Uint8Array; + /** + * @generated from protobuf field: uint32 messageNumber = 2; + */ + messageNumber: number; + /** + * @generated from protobuf field: uint32 ttl = 3; + */ + ttl: number; + /** + * @generated from protobuf field: bytes data = 4; + */ + data: Uint8Array; } // @generated message type with reflection information, may provide speed optimized methods class Packet$Type extends MessageType { - constructor() { - super("Packet", [ - { no: 1, name: "originId", kind: "scalar", T: 12 /*ScalarType.BYTES*/ }, - { no: 2, name: "messageNumber", kind: "scalar", T: 13 /*ScalarType.UINT32*/ }, - { no: 3, name: "ttl", kind: "scalar", T: 13 /*ScalarType.UINT32*/ }, - { no: 4, name: "data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ } - ]); - } - create(value?: PartialMessage): PacketType { - const message = { originId: new Uint8Array(0), messageNumber: 0, ttl: 0, data: new Uint8Array(0) }; - globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this }); - if (value !== undefined) - reflectionMergePartial(this, message, value); - return message; - } - internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: PacketType): PacketType { - let message = target ?? this.create(), end = reader.pos + length; - while (reader.pos < end) { - let [fieldNo, wireType] = reader.tag(); - switch (fieldNo) { - case /* bytes originId */ 1: - message.originId = reader.bytes(); - break; - case /* uint32 messageNumber */ 2: - message.messageNumber = reader.uint32(); - break; - case /* uint32 ttl */ 3: - message.ttl = reader.uint32(); - break; - case /* bytes data */ 4: - message.data = reader.bytes(); - break; - default: - let u = options.readUnknownField; - if (u === "throw") - throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); - let d = reader.skip(wireType); - if (u !== false) - (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); - } - } - return message; - } - internalBinaryWrite(message: PacketType, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { - /* bytes originId = 1; */ - if (message.originId.length) - writer.tag(1, WireType.LengthDelimited).bytes(message.originId); - /* uint32 messageNumber = 2; */ - if (message.messageNumber !== 0) - writer.tag(2, WireType.Varint).uint32(message.messageNumber); - /* uint32 ttl = 3; */ - if (message.ttl !== 0) - writer.tag(3, WireType.Varint).uint32(message.ttl); - /* bytes data = 4; */ - if (message.data.length) - writer.tag(4, WireType.LengthDelimited).bytes(message.data); - let u = options.writeUnknownFields; - if (u !== false) - (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); - return writer; + constructor() { + super("Packet", [ + { no: 1, name: "originId", kind: "scalar", T: 12 /*ScalarType.BYTES*/ }, + { + no: 2, + name: "messageNumber", + kind: "scalar", + T: 13 /*ScalarType.UINT32*/, + }, + { no: 3, name: "ttl", kind: "scalar", T: 13 /*ScalarType.UINT32*/ }, + { no: 4, name: "data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ }, + ]); + } + create(value?: PartialMessage): PacketType { + const message = { + originId: new Uint8Array(0), + messageNumber: 0, + ttl: 0, + data: new Uint8Array(0), + }; + globalThis.Object.defineProperty(message, MESSAGE_TYPE, { + enumerable: false, + value: this, + }); + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead( + reader: IBinaryReader, + length: number, + options: BinaryReadOptions, + target?: PacketType + ): PacketType { + let message = target ?? this.create(), + end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* bytes originId */ 1: + message.originId = reader.bytes(); + break; + case /* uint32 messageNumber */ 2: + message.messageNumber = reader.uint32(); + break; + case /* uint32 ttl */ 3: + message.ttl = reader.uint32(); + break; + case /* bytes data */ 4: + message.data = reader.bytes(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error( + `Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}` + ); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)( + this.typeName, + message, + fieldNo, + wireType, + d + ); + } } + return message; + } + internalBinaryWrite( + message: PacketType, + writer: IBinaryWriter, + options: BinaryWriteOptions + ): IBinaryWriter { + /* bytes originId = 1; */ + if (message.originId.length) + writer.tag(1, WireType.LengthDelimited).bytes(message.originId); + /* uint32 messageNumber = 2; */ + if (message.messageNumber !== 0) + writer.tag(2, WireType.Varint).uint32(message.messageNumber); + /* uint32 ttl = 3; */ + if (message.ttl !== 0) writer.tag(3, WireType.Varint).uint32(message.ttl); + /* bytes data = 4; */ + if (message.data.length) + writer.tag(4, WireType.LengthDelimited).bytes(message.data); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)( + this.typeName, + message, + writer + ); + return writer; + } } /** * @generated MessageType for protobuf message Packet