*Add prettier

This commit is contained in:
Derrick Hammer 2022-11-15 14:28:28 -05:00
parent 8d57c62994
commit aab5254ef2
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
3 changed files with 230 additions and 184 deletions

View File

@ -16,6 +16,7 @@
"b4a": "^1.6.1",
"debug": "^4.3.4",
"hyperswarm": "^4.3.5",
"prettier": "^2.7.1",
"protoc": "^1.1.3",
"sodium-universal": "^3.1.0",
"tape": "^5.6.1"

View File

@ -5,129 +5,140 @@ import LRU from "lru";
import debug0 from "debug";
// @ts-ignore
import Protomux from "protomux";
import {Packet, PacketType} from "./messages.js";
import { Packet, PacketType } from "./messages.js";
// @ts-ignore
import c from "compact-encoding"
import b4a from "b4a"
import c from "compact-encoding";
import b4a from "b4a";
const debug = debug0('dht-flood')
const debug = debug0("dht-flood");
const LRU_SIZE = 255
const TTL = 255
const PROTOCOL = "lumeweb.flood"
const LRU_SIZE = 255;
const TTL = 255;
const PROTOCOL = "lumeweb.flood";
const FLOOD_SYMBOL = Symbol.for(PROTOCOL)
const FLOOD_SYMBOL = Symbol.for(PROTOCOL);
export default class DHTFlood extends EventEmitter {
private id: Buffer;
private ttl: number;
private messageNumber: number;
private lru: LRU;
private swarm: any;
private id: Buffer;
private ttl: number;
private messageNumber: number;
private lru: LRU;
private swarm: any;
constructor({
lruSize = LRU_SIZE,
ttl = TTL,
messageNumber = 0,
id = crypto.randomBytes(32),
swarm = null
} = {}) {
super()
constructor({
lruSize = LRU_SIZE,
ttl = TTL,
messageNumber = 0,
id = crypto.randomBytes(32),
swarm = null,
} = {}) {
super();
this.id = id
this.ttl = ttl
this.messageNumber = messageNumber
this.lru = new LRU(lruSize)
if (!swarm) {
throw new Error('swarm is required');
}
this.swarm = swarm;
this.id = id;
this.ttl = ttl;
this.messageNumber = messageNumber;
this.lru = new LRU(lruSize);
if (!swarm) {
throw new Error("swarm is required");
}
this.swarm = swarm;
this.swarm.on("connection", (peer: any) => {
const mux = Protomux.from(peer);
mux.pair({protocol: PROTOCOL}, () => this.setupPeer(peer));
});
this.swarm.on("connection", (peer: any) => {
const mux = Protomux.from(peer);
mux.pair({ protocol: PROTOCOL }, () => this.setupPeer(peer));
});
}
private handleMessage(
{ originId, messageNumber, ttl, data }: PacketType,
messenger: any
) {
const originIdBuf = b4a.from(originId) as Buffer;
// Ignore messages from ourselves
if (originIdBuf.equals(this.id))
return debug("Got message from self", originId, messageNumber);
// Ignore messages we've already seen
const key = originIdBuf.toString("hex") + messageNumber;
if (this.lru.get(key))
return debug(
"Got message that was already seen",
originId,
messageNumber
);
this.lru.set(key, true);
this.emit("message", data, originId, messageNumber);
if (ttl <= 0)
return debug("Got message at end of TTL", originId, messageNumber, ttl);
messenger.send({
originId,
messageNumber,
data,
ttl: ttl - 1,
});
}
private setupPeer(peer: any) {
const mux = Protomux.from(peer);
let chan: any;
const self = this;
if (!mux.opened({ protocol: PROTOCOL })) {
chan = mux.createChannel({
protocol: PROTOCOL,
async onopen() {
self.emit("peer-open", peer);
},
async ondestroy() {
self.emit("peer-remove", peer);
},
});
peer[FLOOD_SYMBOL] = chan;
}
private handleMessage({originId, messageNumber, ttl, data}: PacketType, messenger: any) {
const originIdBuf = b4a.from(originId) as Buffer;
// Ignore messages from ourselves
if (originIdBuf.equals(this.id)) return debug('Got message from self', originId, messageNumber)
// Ignore messages we've already seen
const key = originIdBuf.toString('hex') + messageNumber
if (this.lru.get(key)) return debug('Got message that was already seen', originId, messageNumber)
this.lru.set(key, true)
this.emit('message', data, originId, messageNumber)
if (ttl <= 0) return debug('Got message at end of TTL', originId, messageNumber, ttl)
messenger.send({
originId,
messageNumber,
data,
ttl: ttl - 1
});
chan = peer[FLOOD_SYMBOL];
if (!chan) {
throw new Error("could not find channel");
}
private setupPeer(peer: any) {
const mux = Protomux.from(peer);
let chan: any;
const self = this;
if (!mux.opened({protocol: PROTOCOL})) {
chan = mux.createChannel({
protocol: PROTOCOL,
async onopen() {
self.emit('peer-open', peer)
},
async ondestroy() {
self.emit('peer-remove', peer)
},
});
peer[FLOOD_SYMBOL] = chan;
}
chan = peer[FLOOD_SYMBOL];
if (!chan) {
throw new Error('could not find channel');
}
if (!chan.messages.length) {
chan.addMessage({
encoding: {
preencode: (state: any, m: any) => c.raw.preencode(state, Packet.toBinary(Packet.create(m))),
encode: (state: any, m: any) => c.raw.encode(state, Packet.toBinary(Packet.create(m))),
decode: (state: any) => Packet.fromBinary(c.raw.decode(state)),
},
onmessage: (msg: any) => this.handleMessage(msg, chan.messages[0]),
})
}
if (!chan.opened) {
chan.open();
}
return chan.messages[0];
if (!chan.messages.length) {
chan.addMessage({
encoding: {
preencode: (state: any, m: any) =>
c.raw.preencode(state, Packet.toBinary(Packet.create(m))),
encode: (state: any, m: any) =>
c.raw.encode(state, Packet.toBinary(Packet.create(m))),
decode: (state: any) => Packet.fromBinary(c.raw.decode(state)),
},
onmessage: (msg: any) => this.handleMessage(msg, chan.messages[0]),
});
}
broadcast(data: any, ttl = this.ttl) {
this.messageNumber++
const {id, messageNumber} = this
for (const peer of this.swarm.connections.values()) {
const message = this.setupPeer(peer);
message.send({
originId: id,
messageNumber,
ttl,
data
})
}
if (!chan.opened) {
chan.open();
}
return chan.messages[0];
}
broadcast(data: any, ttl = this.ttl) {
this.messageNumber++;
const { id, messageNumber } = this;
for (const peer of this.swarm.connections.values()) {
const message = this.setupPeer(peer);
message.send({
originId: id,
messageNumber,
ttl,
data,
});
}
}
}

View File

@ -15,86 +15,120 @@ import { MessageType } from "@protobuf-ts/runtime";
* @generated from protobuf message Packet
*/
export interface PacketType {
/**
* @generated from protobuf field: bytes originId = 1;
*/
originId: Uint8Array;
/**
* @generated from protobuf field: uint32 messageNumber = 2;
*/
messageNumber: number;
/**
* @generated from protobuf field: uint32 ttl = 3;
*/
ttl: number;
/**
* @generated from protobuf field: bytes data = 4;
*/
data: Uint8Array;
/**
* @generated from protobuf field: bytes originId = 1;
*/
originId: Uint8Array;
/**
* @generated from protobuf field: uint32 messageNumber = 2;
*/
messageNumber: number;
/**
* @generated from protobuf field: uint32 ttl = 3;
*/
ttl: number;
/**
* @generated from protobuf field: bytes data = 4;
*/
data: Uint8Array;
}
// @generated message type with reflection information, may provide speed optimized methods
class Packet$Type extends MessageType<PacketType> {
constructor() {
super("Packet", [
{ no: 1, name: "originId", kind: "scalar", T: 12 /*ScalarType.BYTES*/ },
{ no: 2, name: "messageNumber", kind: "scalar", T: 13 /*ScalarType.UINT32*/ },
{ no: 3, name: "ttl", kind: "scalar", T: 13 /*ScalarType.UINT32*/ },
{ no: 4, name: "data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ }
]);
}
create(value?: PartialMessage<PacketType>): PacketType {
const message = { originId: new Uint8Array(0), messageNumber: 0, ttl: 0, data: new Uint8Array(0) };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<PacketType>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: PacketType): PacketType {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* bytes originId */ 1:
message.originId = reader.bytes();
break;
case /* uint32 messageNumber */ 2:
message.messageNumber = reader.uint32();
break;
case /* uint32 ttl */ 3:
message.ttl = reader.uint32();
break;
case /* bytes data */ 4:
message.data = reader.bytes();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: PacketType, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* bytes originId = 1; */
if (message.originId.length)
writer.tag(1, WireType.LengthDelimited).bytes(message.originId);
/* uint32 messageNumber = 2; */
if (message.messageNumber !== 0)
writer.tag(2, WireType.Varint).uint32(message.messageNumber);
/* uint32 ttl = 3; */
if (message.ttl !== 0)
writer.tag(3, WireType.Varint).uint32(message.ttl);
/* bytes data = 4; */
if (message.data.length)
writer.tag(4, WireType.LengthDelimited).bytes(message.data);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
constructor() {
super("Packet", [
{ no: 1, name: "originId", kind: "scalar", T: 12 /*ScalarType.BYTES*/ },
{
no: 2,
name: "messageNumber",
kind: "scalar",
T: 13 /*ScalarType.UINT32*/,
},
{ no: 3, name: "ttl", kind: "scalar", T: 13 /*ScalarType.UINT32*/ },
{ no: 4, name: "data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ },
]);
}
create(value?: PartialMessage<PacketType>): PacketType {
const message = {
originId: new Uint8Array(0),
messageNumber: 0,
ttl: 0,
data: new Uint8Array(0),
};
globalThis.Object.defineProperty(message, MESSAGE_TYPE, {
enumerable: false,
value: this,
});
if (value !== undefined)
reflectionMergePartial<PacketType>(this, message, value);
return message;
}
internalBinaryRead(
reader: IBinaryReader,
length: number,
options: BinaryReadOptions,
target?: PacketType
): PacketType {
let message = target ?? this.create(),
end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* bytes originId */ 1:
message.originId = reader.bytes();
break;
case /* uint32 messageNumber */ 2:
message.messageNumber = reader.uint32();
break;
case /* uint32 ttl */ 3:
message.ttl = reader.uint32();
break;
case /* bytes data */ 4:
message.data = reader.bytes();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(
`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`
);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(
this.typeName,
message,
fieldNo,
wireType,
d
);
}
}
return message;
}
internalBinaryWrite(
message: PacketType,
writer: IBinaryWriter,
options: BinaryWriteOptions
): IBinaryWriter {
/* bytes originId = 1; */
if (message.originId.length)
writer.tag(1, WireType.LengthDelimited).bytes(message.originId);
/* uint32 messageNumber = 2; */
if (message.messageNumber !== 0)
writer.tag(2, WireType.Varint).uint32(message.messageNumber);
/* uint32 ttl = 3; */
if (message.ttl !== 0) writer.tag(3, WireType.Varint).uint32(message.ttl);
/* bytes data = 4; */
if (message.data.length)
writer.tag(4, WireType.LengthDelimited).bytes(message.data);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(
this.typeName,
message,
writer
);
return writer;
}
}
/**
* @generated MessageType for protobuf message Packet