From 76731d435464a6921abc9ef42bd7013c4fa092c9 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Tue, 15 Nov 2022 06:34:01 -0500 Subject: [PATCH] *Initial version --- LICENSE | 4 +- messages.proto | 8 +++ package.json | 23 ++++++++ src/index.ts | 126 ++++++++++++++++++++++++++++++++++++++++ src/messages.ts | 149 ++++++++++++++++++++++++++++++++++++++++++++++++ test.js | 62 ++++++++++++++++++++ tsconfig.json | 23 ++++++++ 7 files changed, 394 insertions(+), 1 deletion(-) create mode 100644 messages.proto create mode 100644 package.json create mode 100644 src/index.ts create mode 100644 src/messages.ts create mode 100644 test.js create mode 100644 tsconfig.json diff --git a/LICENSE b/LICENSE index 2071b23..fb9b493 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,8 @@ MIT License -Copyright (c) +Copyright (c) 2022 Hammer Technologies LLC + +Credits to https://github.com/RangerMauve/hyper-flood for original version Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/messages.proto b/messages.proto new file mode 100644 index 0000000..1037516 --- /dev/null +++ b/messages.proto @@ -0,0 +1,8 @@ +// type=0 +message Packet { + required bytes originId = 1; + required uint32 messageNumber = 2; + required uint32 ttl = 3; + required bytes data = 4; +} + diff --git a/package.json b/package.json new file mode 100644 index 0000000..9d982c8 --- /dev/null +++ b/package.json @@ -0,0 +1,23 @@ +{ + "name": "@lumeweb/dht-flood", + "type": "commonjs", + "version": "0.1.0", + "main": "dist/index.js", + "dependencies": { + "compact-encoding": "^2.11.0", + "lru": "^3.1.0", + "protocol-buffers-encodings": "^1.2.0", + "protomux-rpc": "^1.3.0" + }, + "devDependencies": { + "@types/b4a": "^1.6.0", + "@types/debug": "^4.1.7", + "b4a": "^1.6.1", + "debug": "^4.3.4", + "hyperswarm": "^4.3.5", + "protoc": "^1.1.3", + "sodium-universal": "^3.1.0", + "tape": "^5.6.1", + "ts-proto": "^1.131.2" + } +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..4f42aaf --- /dev/null +++ b/src/index.ts @@ -0,0 +1,126 @@ +import EventEmitter from "events"; +import crypto from "crypto"; +// @ts-ignore +import LRU from "lru"; +import debug0 from "debug"; +// @ts-ignore +import Protomux from "protomux"; +import {Packet} from "./messages.js"; +// @ts-ignore +import c from "compact-encoding" +import b4a from "b4a" + +const debug = debug0('dht-flood') + +const LRU_SIZE = 255 +const TTL = 255 +const PROTOCOL = "lumeweb.flood" + +const FLOOD_SYMBOL = Symbol.for(PROTOCOL) + +export default class DHTFlood extends EventEmitter { + private id: Buffer; + private ttl: number; + private messageNumber: number; + private lru: LRU; + private swarm: any; + private mux: any; + + constructor({ + lruSize = LRU_SIZE, + ttl = TTL, + messageNumber = 0, + id = crypto.randomBytes(32), + swarm = null + } = {}) { + super() + + this.id = id + this.ttl = ttl + this.messageNumber = messageNumber + this.lru = new LRU(lruSize) + if (!swarm) { + throw new Error('swarm is required'); + } + this.swarm = swarm; + + this.swarm.on("connection", (peer: any) => { + const mux = Protomux.from(peer); + mux.pair({protocol: PROTOCOL}, () => this.setupPeer(peer)); + }); + } + + private handleMessage({originId, messageNumber, ttl, data}: Packet, messenger: any) { + const originIdBuf = b4a.from(originId) as Buffer; + + // Ignore messages from ourselves + if (originIdBuf.equals(this.id)) return debug('Got message from self', originId, messageNumber) + + // Ignore messages we've already seen + const key = originIdBuf.toString('hex') + messageNumber + if (this.lru.get(key)) return debug('Got message that was already seen', originId, messageNumber) + this.lru.set(key, true) + + this.emit('message', data, originId, messageNumber) + + if (ttl <= 0) return debug('Got message at end of TTL', originId, messageNumber, ttl) + + messenger.send({ + originId, + messageNumber, + data, + ttl: ttl - 1 + }); + + } + + private setupPeer(peer: any) { + const mux = Protomux.from(peer); + let chan: any; + + if (!mux.opened({protocol: PROTOCOL})) { + chan = mux.createChannel({ + protocol: PROTOCOL, + }); + peer[FLOOD_SYMBOL] = chan; + } + + chan = peer[FLOOD_SYMBOL]; + + if (!chan) { + throw new Error('could not find channel'); + } + + if (!chan.messages.length) { + chan.addMessage({ + encoding: { + preencode: (state: any, m: any) => c.raw.preencode(state, Packet.encode(m).finish()), + encode: (state: any, m: any) => c.raw.encode(state, Packet.encode(m).finish()), + decode: (state: any) => Packet.decode(c.raw.decode(state)), + }, + onmessage: (msg: any) => this.handleMessage(msg, chan.messages[0]), + }) + } + + if (!chan.opened) { + chan.open(); + } + + return chan.messages[0]; + } + + broadcast(data: any, ttl = this.ttl) { + this.messageNumber++ + const {id, messageNumber} = this + + for (const peer of this.swarm.connections.values()) { + const message = this.setupPeer(peer); + message.send({ + originId: id, + messageNumber, + ttl, + data + }) + } + } +} diff --git a/src/messages.ts b/src/messages.ts new file mode 100644 index 0000000..6af8f9d --- /dev/null +++ b/src/messages.ts @@ -0,0 +1,149 @@ +/* eslint-disable */ +import * as _m0 from "protobufjs/minimal"; + +export const protobufPackage = ""; + +/** type=0 */ +export interface Packet { + originId: Uint8Array; + messageNumber: number; + ttl: number; + data: Uint8Array; +} + +function createBasePacket(): Packet { + return { originId: new Uint8Array(), messageNumber: 0, ttl: 0, data: new Uint8Array() }; +} + +export const Packet = { + encode(message: Packet, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.originId.length !== 0) { + writer.uint32(10).bytes(message.originId); + } + if (message.messageNumber !== 0) { + writer.uint32(16).uint32(message.messageNumber); + } + if (message.ttl !== 0) { + writer.uint32(24).uint32(message.ttl); + } + if (message.data.length !== 0) { + writer.uint32(34).bytes(message.data); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): Packet { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePacket(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.originId = reader.bytes(); + break; + case 2: + message.messageNumber = reader.uint32(); + break; + case 3: + message.ttl = reader.uint32(); + break; + case 4: + message.data = reader.bytes(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): Packet { + return { + originId: isSet(object.originId) ? bytesFromBase64(object.originId) : new Uint8Array(), + messageNumber: isSet(object.messageNumber) ? Number(object.messageNumber) : 0, + ttl: isSet(object.ttl) ? Number(object.ttl) : 0, + data: isSet(object.data) ? bytesFromBase64(object.data) : new Uint8Array(), + }; + }, + + toJSON(message: Packet): unknown { + const obj: any = {}; + message.originId !== undefined && + (obj.originId = base64FromBytes(message.originId !== undefined ? message.originId : new Uint8Array())); + message.messageNumber !== undefined && (obj.messageNumber = Math.round(message.messageNumber)); + message.ttl !== undefined && (obj.ttl = Math.round(message.ttl)); + message.data !== undefined && + (obj.data = base64FromBytes(message.data !== undefined ? message.data : new Uint8Array())); + return obj; + }, + + fromPartial, I>>(object: I): Packet { + const message = createBasePacket(); + message.originId = object.originId ?? new Uint8Array(); + message.messageNumber = object.messageNumber ?? 0; + message.ttl = object.ttl ?? 0; + message.data = object.data ?? new Uint8Array(); + return message; + }, +}; + +declare var self: any | undefined; +declare var window: any | undefined; +declare var global: any | undefined; +var globalThis: any = (() => { + if (typeof globalThis !== "undefined") { + return globalThis; + } + if (typeof self !== "undefined") { + return self; + } + if (typeof window !== "undefined") { + return window; + } + if (typeof global !== "undefined") { + return global; + } + throw "Unable to locate global object"; +})(); + +function bytesFromBase64(b64: string): Uint8Array { + if (globalThis.Buffer) { + return Uint8Array.from(globalThis.Buffer.from(b64, "base64")); + } else { + const bin = globalThis.atob(b64); + const arr = new Uint8Array(bin.length); + for (let i = 0; i < bin.length; ++i) { + arr[i] = bin.charCodeAt(i); + } + return arr; + } +} + +function base64FromBytes(arr: Uint8Array): string { + if (globalThis.Buffer) { + return globalThis.Buffer.from(arr).toString("base64"); + } else { + const bin: string[] = []; + arr.forEach((byte) => { + bin.push(String.fromCharCode(byte)); + }); + return globalThis.btoa(bin.join("")); + } +} + +type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; + +export type DeepPartial = T extends Builtin ? T + : T extends Array ? Array> : T extends ReadonlyArray ? ReadonlyArray> + : T extends {} ? { [K in keyof T]?: DeepPartial } + : Partial; + +type KeysOfUnion = T extends T ? keyof T : never; +export type Exact = P extends Builtin ? P + : P & { [K in keyof P]: Exact } & { [K in Exclude>]: never }; + +function isSet(value: any): boolean { + return value !== null && value !== undefined; +} diff --git a/test.js b/test.js new file mode 100644 index 0000000..d814e4d --- /dev/null +++ b/test.js @@ -0,0 +1,62 @@ +const test = require('tape') +const Hyperswarm = require('hyperswarm') +const sodium = require('sodium-universal') +const b4a = require('b4a') +const { default: DHTFlood } = require('./') +const crypto = require('crypto') + +const topicName = crypto.randomBytes(10) + +test('Broadcast through several peers', (t) => { + const peer1 = createPeer() + const peer2 = createPeer() + const peer3 = createPeer() + t.plan(2) + + Promise.all([peer1, peer2, peer3]).then((peers) => { + const peer1 = peers.shift() + const peer2 = peers.shift() + const peer3 = peers.shift() + + const flood1 = new DHTFlood({ swarm: peer1 }) + const flood2 = new DHTFlood({ swarm: peer2 }) + const flood3 = new DHTFlood({ swarm: peer3 }) + const data = Buffer.from('Hello World') + + flood1.on('message', () => t.error('Got own message')) + + flood2.on('message', (message) => { + t.deepEquals(message, data, 'Data got broadcast') + }) + flood3.on('message', (message) => { + t.deepEquals(message, data, 'Data got broadcast') + }) + + function maybeFlood () { + if (peer1.peers.size === 2) { + flood1.broadcast(data) + } + } + + peer1.on('connection', () => { + maybeFlood() + }) + + t.teardown(() => { + [peer1, peer2, peer3].forEach((item) => item.destroy()) + }) + }) +}) + +async function createPeer () { + const swarm = new Hyperswarm() + await swarm.dht.ready() + await swarm.listen() + + const topic = b4a.allocUnsafe(32) + sodium.crypto_generichash(topic, b4a.from(topicName)) + + swarm.join(topic) + + return swarm +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..5bbcd3b --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,23 @@ +{ + "compilerOptions": { + "declaration": true, + "strict": true, + "module": "commonjs", + "target": "esnext", + "esModuleInterop": true, + "sourceMap": false, + "rootDir": "src", + "outDir": "dist", + "typeRoots": [ + "node_modules/@types", + ], + "moduleResolution": "node", + "declarationMap": true, + "declarationDir": "dist", + "emitDeclarationOnly": false, + "allowJs": true + }, + "include": [ + "src" + ] +}