Compare commits

...

2 Commits

Author SHA1 Message Date
Derrick Hammer 0ff1a24de2
*Add dist 2022-11-16 02:47:03 -05:00
Derrick Hammer 9b6a916f00
*Initial version 2022-11-16 02:45:10 -05:00
17 changed files with 1315 additions and 1 deletions

View File

@ -1,6 +1,8 @@
MIT License
Copyright (c) <year> <copyright holders>
Copyright (c) 2022 Hammer Technologies LLC
Credits to https://github.com/RangerMauve/hyper-presence for original version
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

35
dist/dhtOnlineBase.d.ts vendored Normal file
View File

@ -0,0 +1,35 @@
/// <reference types="node" />
/// <reference types="node" />
import EventEmitter from "events";
export default class DHTOnlineBase extends EventEmitter {
private id;
private bootstrapped;
private graph;
private connectedTo;
private data;
private encoding;
constructor(id: Buffer, { encoding }?: {
encoding?: string | undefined;
});
private _online;
get online(): string[];
broadcast(data: any, ttl?: number): void;
getPeerData(id: Buffer | string): any;
protected setData(data: any): void;
private _broadcastData;
protected onAddPeer(id: Buffer): void;
protected onRemovePeer(id: Buffer): void;
protected onGetBroadcast(message: Buffer, id: Buffer): void;
private _hasSeenPeer;
private _setPeer;
private _removePeer;
private _ensurePeer;
private _addPeerConnection;
private _removePeerConnection;
private _bootstrapFrom;
private _getPeerConnectedTo;
private _getBootstrapInfo;
private _recalculate;
private _maybeHexify;
}
//# sourceMappingURL=dhtOnlineBase.d.ts.map

1
dist/dhtOnlineBase.d.ts.map vendored Normal file
View File

@ -0,0 +1 @@
{"version":3,"file":"dhtOnlineBase.d.ts","sourceRoot":"","sources":["../src/dhtOnlineBase.ts"],"names":[],"mappings":";;AAAA,OAAO,YAAY,MAAM,QAAQ,CAAC;AAYlC,MAAM,CAAC,OAAO,OAAO,aAAc,SAAQ,YAAY;IACrD,OAAO,CAAC,EAAE,CAAS;IACnB,OAAO,CAAC,YAAY,CAAU;IAC9B,OAAO,CAAC,KAAK,CAAM;IACnB,OAAO,CAAC,WAAW,CAAW;IAC9B,OAAO,CAAC,IAAI,CAAK;IACjB,OAAO,CAAC,QAAQ,CAAoB;gBAExB,EAAE,EAAE,MAAM,EAAE,EAAE,QAA2B,EAAE;;KAAK;IAa5D,OAAO,CAAC,OAAO,CAAW;IAE1B,IAAI,MAAM,IAAI,MAAM,EAAE,CAErB;IAED,SAAS,CAAC,IAAI,EAAE,GAAG,EAAE,GAAG,CAAC,EAAE,MAAM;IAIjC,WAAW,CAAC,EAAE,EAAE,MAAM,GAAG,MAAM;IAI/B,SAAS,CAAC,OAAO,CAAC,IAAI,EAAE,GAAG;IAQ3B,OAAO,CAAC,cAAc;IAgBtB,SAAS,CAAC,SAAS,CAAC,EAAE,EAAE,MAAM;IAqC9B,SAAS,CAAC,YAAY,CAAC,EAAE,EAAE,MAAM;IAiBjC,SAAS,CAAC,cAAc,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE,EAAE,MAAM;IA6CpD,OAAO,CAAC,YAAY;IAIpB,OAAO,CAAC,QAAQ;IAIhB,OAAO,CAAC,WAAW;IAInB,OAAO,CAAC,WAAW;IAOnB,OAAO,CAAC,kBAAkB;IAY1B,OAAO,CAAC,qBAAqB;IAa7B,OAAO,CAAC,cAAc;IA4BtB,OAAO,CAAC,mBAAmB;IAI3B,OAAO,CAAC,iBAAiB;IAczB,OAAO,CAAC,YAAY;IAwBpB,OAAO,CAAC,YAAY;CAOrB"}

229
dist/dhtOnlineBase.js vendored Normal file
View File

@ -0,0 +1,229 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const events_1 = __importDefault(require("events"));
// @ts-ignore
const jsnetworkx_1 = require("jsnetworkx");
const codecs_1 = __importDefault(require("codecs"));
const messages_js_1 = require("./messages.js");
const DEFAULT_ENCODING = "json";
class DHTOnlineBase extends events_1.default {
id;
bootstrapped;
graph;
connectedTo;
data;
encoding;
constructor(id, { encoding = DEFAULT_ENCODING } = {}) {
super();
if (!id)
throw new TypeError("Must provide id for self");
this.id = id;
this.bootstrapped = false;
this.graph = new jsnetworkx_1.DiGraph();
this.connectedTo = new Set();
this.data = {};
this.encoding = (0, codecs_1.default)(encoding || DEFAULT_ENCODING);
this._online = [this._maybeHexify(this.id)];
}
_online;
get online() {
return this._online;
}
broadcast(data, ttl) {
throw new TypeError("Broadcast has not been implemented");
}
getPeerData(id) {
return this.graph.node.get(this._maybeHexify(id));
}
setData(data) {
this.data = data;
this._setPeer(this.id, data);
this._broadcastData();
}
_broadcastData() {
const rawData = this.data;
if (!Object.keys(rawData).length) {
return;
}
const data = this.encoding.encode(rawData);
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.STATE,
data,
})));
}
onAddPeer(id) {
const stringId = id.toString("hex");
if (this.connectedTo.has(stringId)) {
return;
} // Already know we're connected here
this.connectedTo.add(stringId);
this._addPeerConnection(this.id, id);
this.emit("peer-add", id);
this._recalculate();
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.CONNECTED,
id,
})));
this._broadcastData();
if (this.bootstrapped) {
return;
}
// If this is the first person we've met, get their graph
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.BOOTSTRAP_REQUEST,
})), 0);
}
onRemovePeer(id) {
this.connectedTo.delete(id.toString("hex"));
this._removePeerConnection(this.id, id);
this.emit("peer-remove");
this._recalculate();
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.DISCONNECTED,
id,
})));
}
onGetBroadcast(message, id) {
let decoded;
try {
decoded = messages_js_1.Message.fromBinary(message);
}
catch {
return;
}
const { type } = decoded;
if (!type) {
throw new Error("Missing Type In Message");
}
if (type === messages_js_1.Type.STATE) {
const { data: rawData } = decoded;
const data = this.encoding.decode(rawData);
this._setPeer(id, data);
this.emit("peer-data", data, id);
this._recalculate();
}
else if (type === messages_js_1.Type.CONNECTED) {
const { id: toId } = decoded;
this._addPeerConnection(id, Buffer.from(toId));
this.emit("peer-add-seen", id, toId);
this._recalculate();
}
else if (type === messages_js_1.Type.DISCONNECTED) {
const { id: toId } = decoded;
this._removePeerConnection(id, Buffer.from(toId));
this.emit("peer-remove-seen", id, toId);
this._recalculate();
}
else if (type === messages_js_1.Type.BOOTSTRAP_REQUEST) {
const bootstrap = this._getBootstrapInfo();
this.broadcast(messages_js_1.Message.toBinary(messages_js_1.Message.create({
type: messages_js_1.Type.BOOTSTRAP_RESPONSE,
bootstrap,
})), 0);
}
else if (type === messages_js_1.Type.BOOTSTRAP_RESPONSE) {
const { bootstrap } = decoded;
this._bootstrapFrom(bootstrap);
}
}
_hasSeenPeer(id) {
return this.graph.hasNode(this._maybeHexify(id));
}
_setPeer(id, data) {
this.graph.addNode(this._maybeHexify(id), data);
}
_removePeer(id) {
this.graph.removeNode(this._maybeHexify(id));
}
_ensurePeer(id) {
id = this._maybeHexify(id);
if (!this._hasSeenPeer(id)) {
this._setPeer(id, {});
}
}
_addPeerConnection(origin, destination) {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.addEdge(this._maybeHexify(origin), this._maybeHexify(destination));
}
_removePeerConnection(origin, destination) {
try {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.removeEdge(origin.toString("hex"), destination.toString("hex"));
}
catch (e) {
if (e.name !== "JSNetworkXError")
throw e;
}
}
_bootstrapFrom(bootstrap) {
if (this.bootstrapped) {
return;
}
for (const id in bootstrap) {
const { data, connectedTo } = bootstrap[id];
const parsedData = data ? this.encoding.decode(data) : null;
let peerData = parsedData || {};
if (id === this.id.toString("hex"))
continue;
// If we're already tracking them
if (this._hasSeenPeer(id)) {
// See what data we already have for them
// Add their existing data to what we got from the bootstrap
const existingPeerData = this.getPeerData(id);
peerData = { ...existingPeerData, ...peerData };
}
this._setPeer(id, peerData);
for (const connection of connectedTo) {
this._addPeerConnection(id, Buffer.from(connection));
}
}
this.emit("bootstrapped");
this._recalculate();
}
_getPeerConnectedTo(id) {
return this.graph.successors(id);
}
_getBootstrapInfo() {
const state = {};
for (const [id, rawData] of this.graph.nodes(true)) {
const connectedTo = this.graph
.neighbors(id)
.map((id) => Buffer.from(id, "hex"));
const data = rawData ? this.encoding.encode(rawData) : null;
state[id] = { data, connectedTo };
}
return state;
}
// Calculate who's online and emit an event
_recalculate() {
const online = this.graph.nodes().filter((id) => {
return (0, jsnetworkx_1.hasPath)(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
const offline = this.graph.nodes().filter((id) => {
return !(0, jsnetworkx_1.hasPath)(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
for (const id of offline) {
this.graph.removeNode(id);
}
this._online = online;
this.emit("online", online);
}
_maybeHexify(data) {
if (Buffer.isBuffer(data)) {
return data.toString("hex");
}
return data;
}
}
exports.default = DHTOnlineBase;

13
dist/index.d.ts vendored Normal file
View File

@ -0,0 +1,13 @@
import DHTOnlineBase from "./dhtOnlineBase.js";
export default class DHTOnline extends DHTOnlineBase {
private flood;
private swarm;
constructor(swarm: any, { id, data, ...opts }?: {
id?: any;
data?: {} | undefined;
});
handlePeerAdd(peer: any): void;
handlePeerRemove(peer: any): void;
broadcast(message: any, ttl?: number): void;
}
//# sourceMappingURL=index.d.ts.map

1
dist/index.d.ts.map vendored Normal file
View File

@ -0,0 +1 @@
{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,aAAa,MAAM,oBAAoB,CAAC;AAK/C,MAAM,CAAC,OAAO,OAAO,SAAU,SAAQ,aAAa;IAClD,OAAO,CAAC,KAAK,CAAW;IACxB,OAAO,CAAC,KAAK,CAAM;gBAGjB,KAAK,EAAE,GAAG,EACV,EAAE,EAA4B,EAAE,IAAS,EAAE,GAAG,IAAI,EAAE;;;KAAK;IAmB3D,aAAa,CAAC,IAAI,EAAE,GAAG;IAKvB,gBAAgB,CAAC,IAAI,EAAE,GAAG;IAW1B,SAAS,CAAC,OAAO,EAAE,GAAG,EAAE,GAAG,CAAC,EAAE,MAAM;CAGrC"}

41
dist/index.js vendored Normal file
View File

@ -0,0 +1,41 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const dhtOnlineBase_js_1 = __importDefault(require("./dhtOnlineBase.js"));
const dht_flood_1 = __importDefault(require("@lumeweb/dht-flood"));
const DISCONNECT_SMOOTH = 500;
class DHTOnline extends dhtOnlineBase_js_1.default {
flood;
swarm;
constructor(swarm, { id = swarm.keyPair.publicKey, data = {}, ...opts } = {}) {
super(id, opts);
this.flood = new dht_flood_1.default({ id, swarm, ...opts });
this.swarm = swarm;
this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer));
this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer));
this.flood.on("message", (message, id) => this.onGetBroadcast(message, id));
this.swarm.on("connection", (peer) => this.flood.send(peer, Buffer.from("hello"), 0));
this.setData(data);
[...this.swarm.peers.values()].forEach(this.handlePeerAdd.bind(this));
}
handlePeerAdd(peer) {
const id = peer.remotePublicKey;
this.onAddPeer(id);
}
handlePeerRemove(peer) {
const id = peer.remotePublicKey;
// Wait for a bit and check if we're still disconnected before removing the peer
setTimeout(() => {
if (this.swarm._allConnections.has(id)) {
return;
}
this.onRemovePeer(id);
}, DISCONNECT_SMOOTH);
}
broadcast(message, ttl) {
this.flood.broadcast(message, ttl);
}
}
exports.default = DHTOnline;

94
dist/messages.d.ts vendored Normal file
View File

@ -0,0 +1,94 @@
import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
import type { IBinaryWriter } from "@protobuf-ts/runtime";
import type { BinaryReadOptions } from "@protobuf-ts/runtime";
import type { IBinaryReader } from "@protobuf-ts/runtime";
import type { PartialMessage } from "@protobuf-ts/runtime";
import { MessageType } from "@protobuf-ts/runtime";
/**
* @generated from protobuf message Message
*/
export interface Message {
/**
* @generated from protobuf field: Type type = 1;
*/
type: Type;
/**
* @generated from protobuf field: map<string, State> bootstrap = 2;
*/
bootstrap: {
[key: string]: State;
};
/**
* @generated from protobuf field: optional bytes data = 3;
*/
data?: Uint8Array;
/**
* @generated from protobuf field: optional bytes id = 4;
*/
id?: Uint8Array;
}
/**
* @generated from protobuf message State
*/
export interface State {
/**
* @generated from protobuf field: repeated bytes connectedTo = 1;
*/
connectedTo: Uint8Array[];
/**
* @generated from protobuf field: optional bytes data = 2;
*/
data?: Uint8Array;
}
/**
* @generated from protobuf enum Type
*/
export declare enum Type {
/**
* @generated synthetic value - protobuf-ts requires all enums to have a 0 value
*/
UNSPECIFIED$ = 0,
/**
* @generated from protobuf enum value: BOOTSTRAP_REQUEST = 1;
*/
BOOTSTRAP_REQUEST = 1,
/**
* @generated from protobuf enum value: BOOTSTRAP_RESPONSE = 2;
*/
BOOTSTRAP_RESPONSE = 2,
/**
* @generated from protobuf enum value: CONNECTED = 3;
*/
CONNECTED = 3,
/**
* @generated from protobuf enum value: DISCONNECTED = 4;
*/
DISCONNECTED = 4,
/**
* @generated from protobuf enum value: STATE = 5;
*/
STATE = 5
}
declare class Message$Type extends MessageType<Message> {
constructor();
create(value?: PartialMessage<Message>): Message;
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: Message): Message;
private binaryReadMap2;
internalBinaryWrite(message: Message, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter;
}
/**
* @generated MessageType for protobuf message Message
*/
export declare const Message: Message$Type;
declare class State$Type extends MessageType<State> {
constructor();
create(value?: PartialMessage<State>): State;
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: State): State;
internalBinaryWrite(message: State, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter;
}
/**
* @generated MessageType for protobuf message State
*/
export declare const State: State$Type;
export {};
//# sourceMappingURL=messages.d.ts.map

1
dist/messages.d.ts.map vendored Normal file
View File

@ -0,0 +1 @@
{"version":3,"file":"messages.d.ts","sourceRoot":"","sources":["../src/messages.ts"],"names":[],"mappings":"AAGA,OAAO,KAAK,EAAE,kBAAkB,EAAE,MAAM,sBAAsB,CAAC;AAC/D,OAAO,KAAK,EAAE,aAAa,EAAE,MAAM,sBAAsB,CAAC;AAE1D,OAAO,KAAK,EAAE,iBAAiB,EAAE,MAAM,sBAAsB,CAAC;AAC9D,OAAO,KAAK,EAAE,aAAa,EAAE,MAAM,sBAAsB,CAAC;AAE1D,OAAO,KAAK,EAAE,cAAc,EAAE,MAAM,sBAAsB,CAAC;AAG3D,OAAO,EAAE,WAAW,EAAE,MAAM,sBAAsB,CAAC;AACnD;;GAEG;AACH,MAAM,WAAW,OAAO;IACpB;;OAEG;IACH,IAAI,EAAE,IAAI,CAAC;IACX;;OAEG;IACH,SAAS,EAAE;QACP,CAAC,GAAG,EAAE,MAAM,GAAG,KAAK,CAAC;KACxB,CAAC;IACF;;OAEG;IACH,IAAI,CAAC,EAAE,UAAU,CAAC;IAClB;;OAEG;IACH,EAAE,CAAC,EAAE,UAAU,CAAC;CACnB;AACD;;GAEG;AACH,MAAM,WAAW,KAAK;IAClB;;OAEG;IACH,WAAW,EAAE,UAAU,EAAE,CAAC;IAC1B;;OAEG;IACH,IAAI,CAAC,EAAE,UAAU,CAAC;CACrB;AACD;;GAEG;AACH,oBAAY,IAAI;IACZ;;OAEG;IACH,YAAY,IAAI;IAChB;;OAEG;IACH,iBAAiB,IAAI;IACrB;;OAEG;IACH,kBAAkB,IAAI;IACtB;;OAEG;IACH,SAAS,IAAI;IACb;;OAEG;IACH,YAAY,IAAI;IAChB;;OAEG;IACH,KAAK,IAAI;CACZ;AAED,cAAM,YAAa,SAAQ,WAAW,CAAC,OAAO,CAAC;;IAS3C,MAAM,CAAC,KAAK,CAAC,EAAE,cAAc,CAAC,OAAO,CAAC,GAAG,OAAO;IAOhD,kBAAkB,CAAC,MAAM,EAAE,aAAa,EAAE,MAAM,EAAE,MAAM,EAAE,OAAO,EAAE,iBAAiB,EAAE,MAAM,CAAC,EAAE,OAAO,GAAG,OAAO;IA4BhH,OAAO,CAAC,cAAc;IAgBtB,mBAAmB,CAAC,OAAO,EAAE,OAAO,EAAE,MAAM,EAAE,aAAa,EAAE,OAAO,EAAE,kBAAkB,GAAG,aAAa;CAsB3G;AACD;;GAEG;AACH,eAAO,MAAM,OAAO,cAAqB,CAAC;AAE1C,cAAM,UAAW,SAAQ,WAAW,CAAC,KAAK,CAAC;;IAOvC,MAAM,CAAC,KAAK,CAAC,EAAE,cAAc,CAAC,KAAK,CAAC,GAAG,KAAK;IAO5C,kBAAkB,CAAC,MAAM,EAAE,aAAa,EAAE,MAAM,EAAE,MAAM,EAAE,OAAO,EAAE,iBAAiB,EAAE,MAAM,CAAC,EAAE,KAAK,GAAG,KAAK;IAsB5G,mBAAmB,CAAC,OAAO,EAAE,KAAK,EAAE,MAAM,EAAE,aAAa,EAAE,OAAO,EAAE,kBAAkB,GAAG,aAAa;CAYzG;AACD;;GAEG;AACH,eAAO,MAAM,KAAK,YAAmB,CAAC"}

180
dist/messages.js vendored Normal file
View File

@ -0,0 +1,180 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.State = exports.Message = exports.Type = void 0;
const runtime_1 = require("@protobuf-ts/runtime");
const runtime_2 = require("@protobuf-ts/runtime");
const runtime_3 = require("@protobuf-ts/runtime");
const runtime_4 = require("@protobuf-ts/runtime");
const runtime_5 = require("@protobuf-ts/runtime");
/**
* @generated from protobuf enum Type
*/
var Type;
(function (Type) {
/**
* @generated synthetic value - protobuf-ts requires all enums to have a 0 value
*/
Type[Type["UNSPECIFIED$"] = 0] = "UNSPECIFIED$";
/**
* @generated from protobuf enum value: BOOTSTRAP_REQUEST = 1;
*/
Type[Type["BOOTSTRAP_REQUEST"] = 1] = "BOOTSTRAP_REQUEST";
/**
* @generated from protobuf enum value: BOOTSTRAP_RESPONSE = 2;
*/
Type[Type["BOOTSTRAP_RESPONSE"] = 2] = "BOOTSTRAP_RESPONSE";
/**
* @generated from protobuf enum value: CONNECTED = 3;
*/
Type[Type["CONNECTED"] = 3] = "CONNECTED";
/**
* @generated from protobuf enum value: DISCONNECTED = 4;
*/
Type[Type["DISCONNECTED"] = 4] = "DISCONNECTED";
/**
* @generated from protobuf enum value: STATE = 5;
*/
Type[Type["STATE"] = 5] = "STATE";
})(Type = exports.Type || (exports.Type = {}));
// @generated message type with reflection information, may provide speed optimized methods
class Message$Type extends runtime_5.MessageType {
constructor() {
super("Message", [
{ no: 1, name: "type", kind: "enum", T: () => ["Type", Type] },
{ no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => exports.State } },
{ no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
{ no: 4, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
]);
}
create(value) {
const message = { type: 0, bootstrap: {} };
globalThis.Object.defineProperty(message, runtime_4.MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
(0, runtime_3.reflectionMergePartial)(this, message, value);
return message;
}
internalBinaryRead(reader, length, options, target) {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* Type type */ 1:
message.type = reader.int32();
break;
case /* map<string, State> bootstrap */ 2:
this.binaryReadMap2(message.bootstrap, reader, options);
break;
case /* optional bytes data */ 3:
message.data = reader.bytes();
break;
case /* optional bytes id */ 4:
message.id = reader.bytes();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? runtime_2.UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
binaryReadMap2(map, reader, options) {
let len = reader.uint32(), end = reader.pos + len, key, val;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case 1:
key = reader.string();
break;
case 2:
val = exports.State.internalBinaryRead(reader, reader.uint32(), options);
break;
default: throw new globalThis.Error("unknown map entry field for field Message.bootstrap");
}
}
map[key ?? ""] = val ?? exports.State.create();
}
internalBinaryWrite(message, writer, options) {
/* Type type = 1; */
if (message.type !== 0)
writer.tag(1, runtime_1.WireType.Varint).int32(message.type);
/* map<string, State> bootstrap = 2; */
for (let k of Object.keys(message.bootstrap)) {
writer.tag(2, runtime_1.WireType.LengthDelimited).fork().tag(1, runtime_1.WireType.LengthDelimited).string(k);
writer.tag(2, runtime_1.WireType.LengthDelimited).fork();
exports.State.internalBinaryWrite(message.bootstrap[k], writer, options);
writer.join().join();
}
/* optional bytes data = 3; */
if (message.data !== undefined)
writer.tag(3, runtime_1.WireType.LengthDelimited).bytes(message.data);
/* optional bytes id = 4; */
if (message.id !== undefined)
writer.tag(4, runtime_1.WireType.LengthDelimited).bytes(message.id);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? runtime_2.UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message Message
*/
exports.Message = new Message$Type();
// @generated message type with reflection information, may provide speed optimized methods
class State$Type extends runtime_5.MessageType {
constructor() {
super("State", [
{ no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ },
{ no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
]);
}
create(value) {
const message = { connectedTo: [] };
globalThis.Object.defineProperty(message, runtime_4.MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
(0, runtime_3.reflectionMergePartial)(this, message, value);
return message;
}
internalBinaryRead(reader, length, options, target) {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* repeated bytes connectedTo */ 1:
message.connectedTo.push(reader.bytes());
break;
case /* optional bytes data */ 2:
message.data = reader.bytes();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? runtime_2.UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message, writer, options) {
/* repeated bytes connectedTo = 1; */
for (let i = 0; i < message.connectedTo.length; i++)
writer.tag(1, runtime_1.WireType.LengthDelimited).bytes(message.connectedTo[i]);
/* optional bytes data = 2; */
if (message.data !== undefined)
writer.tag(2, runtime_1.WireType.LengthDelimited).bytes(message.data);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? runtime_2.UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message State
*/
exports.State = new State$Type();

21
messages.proto Normal file
View File

@ -0,0 +1,21 @@
syntax = "proto2";
enum Type {
BOOTSTRAP_REQUEST = 1;
BOOTSTRAP_RESPONSE = 2;
CONNECTED = 3;
DISCONNECTED = 4;
STATE = 5;
}
message Message {
required Type type = 1;
map<string, State> bootstrap = 2; // For bootstrap events
optional bytes data = 3; // For state event
optional bytes id = 4; // For connected and disconnected events
}
message State {
repeated bytes connectedTo = 1;
optional bytes data = 2;
}

29
package.json Normal file
View File

@ -0,0 +1,29 @@
{
"name": "@lumeweb/dht-online",
"type": "commonjs",
"version": "0.1.0",
"main": "dist/index.js",
"dependencies": {
"@lumeweb/dht-flood": "https://git.lumeweb.com/LumeWeb/dht-flood.git",
"@protobuf-ts/plugin": "^2.8.1",
"@types/codecs": "^2.2.3",
"codecs": "^3.0.0",
"compact-encoding": "^2.11.0",
"jsnetworkx": "^0.3.4",
"lru": "^3.1.0",
"protocol-buffers-encodings": "^1.2.0",
"protomux-rpc": "^1.3.0"
},
"devDependencies": {
"@types/b4a": "^1.6.0",
"@types/debug": "^4.1.7",
"crypto": "^1.0.1",
"debug": "^4.3.4",
"hyperswarm": "^4.3.5",
"prettier": "^2.7.1",
"protoc": "^1.1.3",
"sodium-universal": "^3.1.0",
"tape": "^5.6.1",
"ts-proto": "^1.131.2"
}
}

292
src/dhtOnlineBase.ts Normal file
View File

@ -0,0 +1,292 @@
import EventEmitter from "events";
// @ts-ignore
import { DiGraph, hasPath } from "jsnetworkx";
import codecs from "codecs";
import { Message, State, Type } from "./messages.js";
const DEFAULT_ENCODING = "json";
type Bootstrap = {
[key: string]: State;
};
export default class DHTOnlineBase extends EventEmitter {
private id: Buffer;
private bootstrapped: boolean;
private graph: any;
private connectedTo: Set<any>;
private data: {};
private encoding: codecs.Codec<any>;
constructor(id: Buffer, { encoding = DEFAULT_ENCODING } = {}) {
super();
if (!id) throw new TypeError("Must provide id for self");
this.id = id;
this.bootstrapped = false;
this.graph = new DiGraph();
this.connectedTo = new Set();
this.data = {};
this.encoding = codecs(encoding || DEFAULT_ENCODING);
this._online = [this._maybeHexify(this.id)];
}
private _online: string[];
get online(): string[] {
return this._online;
}
broadcast(data: any, ttl?: number) {
throw new TypeError("Broadcast has not been implemented");
}
getPeerData(id: Buffer | string) {
return this.graph.node.get(this._maybeHexify(id));
}
protected setData(data: any) {
this.data = data;
this._setPeer(this.id, data);
this._broadcastData();
}
private _broadcastData() {
const rawData = this.data;
if (!Object.keys(rawData).length) {
return;
}
const data = this.encoding.encode(rawData);
this.broadcast(
Message.toBinary(
Message.create({
type: Type.STATE,
data,
})
)
);
}
protected onAddPeer(id: Buffer) {
const stringId = id.toString("hex");
if (this.connectedTo.has(stringId)) {
return;
} // Already know we're connected here
this.connectedTo.add(stringId);
this._addPeerConnection(this.id, id);
this.emit("peer-add", id);
this._recalculate();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.CONNECTED,
id,
})
)
);
this._broadcastData();
if (this.bootstrapped) {
return;
}
// If this is the first person we've met, get their graph
this.broadcast(
Message.toBinary(
Message.create({
type: Type.BOOTSTRAP_REQUEST,
})
),
0
);
}
protected onRemovePeer(id: Buffer) {
this.connectedTo.delete(id.toString("hex"));
this._removePeerConnection(this.id, id);
this.emit("peer-remove");
this._recalculate();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.DISCONNECTED,
id,
})
)
);
}
protected onGetBroadcast(message: Buffer, id: Buffer) {
let decoded;
try {
decoded = Message.fromBinary(message);
} catch {
return;
}
const { type } = decoded;
if (!type) {
throw new Error("Missing Type In Message");
}
if (type === Type.STATE) {
const { data: rawData } = decoded;
const data = this.encoding.decode(rawData);
this._setPeer(id, data);
this.emit("peer-data", data, id);
this._recalculate();
} else if (type === Type.CONNECTED) {
const { id: toId } = decoded;
this._addPeerConnection(id, Buffer.from(toId as Uint8Array));
this.emit("peer-add-seen", id, toId);
this._recalculate();
} else if (type === Type.DISCONNECTED) {
const { id: toId } = decoded;
this._removePeerConnection(id, Buffer.from(toId as Uint8Array));
this.emit("peer-remove-seen", id, toId);
this._recalculate();
} else if (type === Type.BOOTSTRAP_REQUEST) {
const bootstrap = this._getBootstrapInfo();
this.broadcast(
Message.toBinary(
Message.create({
type: Type.BOOTSTRAP_RESPONSE,
bootstrap,
})
),
0
);
} else if (type === Type.BOOTSTRAP_RESPONSE) {
const { bootstrap } = decoded;
this._bootstrapFrom(bootstrap);
}
}
private _hasSeenPeer(id: Buffer | string) {
return this.graph.hasNode(this._maybeHexify(id));
}
private _setPeer(id: Buffer | string, data: any) {
this.graph.addNode(this._maybeHexify(id), data);
}
private _removePeer(id: Buffer | string) {
this.graph.removeNode(this._maybeHexify(id));
}
private _ensurePeer(id: Buffer | string) {
id = this._maybeHexify(id);
if (!this._hasSeenPeer(id)) {
this._setPeer(id, {});
}
}
private _addPeerConnection(
origin: Buffer | string,
destination: Buffer | string
) {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.addEdge(
this._maybeHexify(origin),
this._maybeHexify(destination)
);
}
private _removePeerConnection(origin: Buffer, destination: Buffer) {
try {
this._ensurePeer(origin);
this._ensurePeer(destination);
this.graph.removeEdge(
origin.toString("hex"),
destination.toString("hex")
);
} catch (e: any) {
if (e.name !== "JSNetworkXError") throw e;
}
}
private _bootstrapFrom(bootstrap: Bootstrap) {
if (this.bootstrapped) {
return;
}
for (const id in bootstrap) {
const { data, connectedTo } = bootstrap[id];
const parsedData = data ? this.encoding.decode(data) : null;
let peerData = parsedData || {};
if (id === this.id.toString("hex")) continue;
// If we're already tracking them
if (this._hasSeenPeer(id)) {
// See what data we already have for them
// Add their existing data to what we got from the bootstrap
const existingPeerData = this.getPeerData(id);
peerData = { ...existingPeerData, ...peerData };
}
this._setPeer(id, peerData);
for (const connection of connectedTo) {
this._addPeerConnection(id, Buffer.from(connection));
}
}
this.emit("bootstrapped");
this._recalculate();
}
private _getPeerConnectedTo(id: string) {
return this.graph.successors(id);
}
private _getBootstrapInfo() {
const state: Bootstrap = {};
for (const [id, rawData] of this.graph.nodes(true)) {
const connectedTo = this.graph
.neighbors(id)
.map((id: string) => Buffer.from(id, "hex"));
const data = rawData ? this.encoding.encode(rawData) : null;
state[id] = { data, connectedTo };
}
return state;
}
// Calculate who's online and emit an event
private _recalculate() {
const online = this.graph.nodes().filter((id: string) => {
return hasPath(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
const offline = this.graph.nodes().filter((id: string) => {
return !hasPath(this.graph, {
source: this._maybeHexify(this.id),
target: id,
});
});
for (const id of offline) {
this.graph.removeNode(id);
}
this._online = online;
this.emit("online", online);
}
private _maybeHexify(data: Buffer | string): string {
if (Buffer.isBuffer(data)) {
return data.toString("hex");
}
return data;
}
}

50
src/index.ts Normal file
View File

@ -0,0 +1,50 @@
import DHTOnlineBase from "./dhtOnlineBase.js";
import DHTFlood from "@lumeweb/dht-flood";
const DISCONNECT_SMOOTH = 500;
export default class DHTOnline extends DHTOnlineBase {
private flood: DHTFlood;
private swarm: any;
constructor(
swarm: any,
{ id = swarm.keyPair.publicKey, data = {}, ...opts } = {}
) {
super(id, opts);
this.flood = new DHTFlood({ id, swarm, ...opts });
this.swarm = swarm;
this.flood.on("peer-open", (peer) => this.handlePeerAdd(peer));
this.flood.on("peer-remove", (peer) => this.handlePeerRemove(peer));
this.flood.on("message", (message, id) => this.onGetBroadcast(message, id));
this.swarm.on("connection", (peer: any) =>
this.flood.send(peer, Buffer.from("hello"), 0)
);
this.setData(data);
[...this.swarm.peers.values()].forEach(this.handlePeerAdd.bind(this));
}
handlePeerAdd(peer: any) {
const id = peer.remotePublicKey;
this.onAddPeer(id);
}
handlePeerRemove(peer: any) {
const id = peer.remotePublicKey;
// Wait for a bit and check if we're still disconnected before removing the peer
setTimeout(() => {
if (this.swarm._allConnections.has(id)) {
return;
}
this.onRemovePeer(id);
}, DISCONNECT_SMOOTH);
}
broadcast(message: any, ttl?: number) {
this.flood.broadcast(message, ttl);
}
}

220
src/messages.ts Normal file
View File

@ -0,0 +1,220 @@
// @generated by protobuf-ts 2.8.1
// @generated from protobuf file "messages.proto" (syntax proto2)
// tslint:disable
import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
import type { IBinaryWriter } from "@protobuf-ts/runtime";
import { WireType } from "@protobuf-ts/runtime";
import type { BinaryReadOptions } from "@protobuf-ts/runtime";
import type { IBinaryReader } from "@protobuf-ts/runtime";
import { UnknownFieldHandler } from "@protobuf-ts/runtime";
import type { PartialMessage } from "@protobuf-ts/runtime";
import { reflectionMergePartial } from "@protobuf-ts/runtime";
import { MESSAGE_TYPE } from "@protobuf-ts/runtime";
import { MessageType } from "@protobuf-ts/runtime";
/**
* @generated from protobuf message Message
*/
export interface Message {
/**
* @generated from protobuf field: Type type = 1;
*/
type: Type;
/**
* @generated from protobuf field: map<string, State> bootstrap = 2;
*/
bootstrap: {
[key: string]: State;
}; // For bootstrap events
/**
* @generated from protobuf field: optional bytes data = 3;
*/
data?: Uint8Array; // For state event
/**
* @generated from protobuf field: optional bytes id = 4;
*/
id?: Uint8Array; // For connected and disconnected events
}
/**
* @generated from protobuf message State
*/
export interface State {
/**
* @generated from protobuf field: repeated bytes connectedTo = 1;
*/
connectedTo: Uint8Array[];
/**
* @generated from protobuf field: optional bytes data = 2;
*/
data?: Uint8Array;
}
/**
* @generated from protobuf enum Type
*/
export enum Type {
/**
* @generated synthetic value - protobuf-ts requires all enums to have a 0 value
*/
UNSPECIFIED$ = 0,
/**
* @generated from protobuf enum value: BOOTSTRAP_REQUEST = 1;
*/
BOOTSTRAP_REQUEST = 1,
/**
* @generated from protobuf enum value: BOOTSTRAP_RESPONSE = 2;
*/
BOOTSTRAP_RESPONSE = 2,
/**
* @generated from protobuf enum value: CONNECTED = 3;
*/
CONNECTED = 3,
/**
* @generated from protobuf enum value: DISCONNECTED = 4;
*/
DISCONNECTED = 4,
/**
* @generated from protobuf enum value: STATE = 5;
*/
STATE = 5
}
// @generated message type with reflection information, may provide speed optimized methods
class Message$Type extends MessageType<Message> {
constructor() {
super("Message", [
{ no: 1, name: "type", kind: "enum", T: () => ["Type", Type] },
{ no: 2, name: "bootstrap", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "message", T: () => State } },
{ no: 3, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
{ no: 4, name: "id", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
]);
}
create(value?: PartialMessage<Message>): Message {
const message = { type: 0, bootstrap: {} };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<Message>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: Message): Message {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* Type type */ 1:
message.type = reader.int32();
break;
case /* map<string, State> bootstrap */ 2:
this.binaryReadMap2(message.bootstrap, reader, options);
break;
case /* optional bytes data */ 3:
message.data = reader.bytes();
break;
case /* optional bytes id */ 4:
message.id = reader.bytes();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
private binaryReadMap2(map: Message["bootstrap"], reader: IBinaryReader, options: BinaryReadOptions): void {
let len = reader.uint32(), end = reader.pos + len, key: keyof Message["bootstrap"] | undefined, val: Message["bootstrap"][any] | undefined;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case 1:
key = reader.string();
break;
case 2:
val = State.internalBinaryRead(reader, reader.uint32(), options);
break;
default: throw new globalThis.Error("unknown map entry field for field Message.bootstrap");
}
}
map[key ?? ""] = val ?? State.create();
}
internalBinaryWrite(message: Message, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* Type type = 1; */
if (message.type !== 0)
writer.tag(1, WireType.Varint).int32(message.type);
/* map<string, State> bootstrap = 2; */
for (let k of Object.keys(message.bootstrap)) {
writer.tag(2, WireType.LengthDelimited).fork().tag(1, WireType.LengthDelimited).string(k);
writer.tag(2, WireType.LengthDelimited).fork();
State.internalBinaryWrite(message.bootstrap[k], writer, options);
writer.join().join();
}
/* optional bytes data = 3; */
if (message.data !== undefined)
writer.tag(3, WireType.LengthDelimited).bytes(message.data);
/* optional bytes id = 4; */
if (message.id !== undefined)
writer.tag(4, WireType.LengthDelimited).bytes(message.id);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message Message
*/
export const Message = new Message$Type();
// @generated message type with reflection information, may provide speed optimized methods
class State$Type extends MessageType<State> {
constructor() {
super("State", [
{ no: 1, name: "connectedTo", kind: "scalar", repeat: 2 /*RepeatType.UNPACKED*/, T: 12 /*ScalarType.BYTES*/ },
{ no: 2, name: "data", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
]);
}
create(value?: PartialMessage<State>): State {
const message = { connectedTo: [] };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<State>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: State): State {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* repeated bytes connectedTo */ 1:
message.connectedTo.push(reader.bytes());
break;
case /* optional bytes data */ 2:
message.data = reader.bytes();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: State, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* repeated bytes connectedTo = 1; */
for (let i = 0; i < message.connectedTo.length; i++)
writer.tag(1, WireType.LengthDelimited).bytes(message.connectedTo[i]);
/* optional bytes data = 2; */
if (message.data !== undefined)
writer.tag(2, WireType.LengthDelimited).bytes(message.data);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message State
*/
export const State = new State$Type();

82
test.js Normal file
View File

@ -0,0 +1,82 @@
const test = require("tape");
const Hyperswarm = require("hyperswarm");
const sodium = require("sodium-universal");
const b4a = require("b4a");
const { default: DHTOnline } = require("./");
const crypto = require("crypto");
const topicName = crypto.randomBytes(10);
const topic = b4a.allocUnsafe(32);
sodium.crypto_generichash(topic, b4a.from(topicName));
test("Basic presence test / data propagation", (t) => {
t.plan(6);
const peer1 = createPeer();
const peer2 = createPeer();
Promise.all([peer1, peer2]).then((peers) => {
const peer1 = peers.shift();
const peer2 = peers.shift();
const p1 = new DHTOnline(peer1);
const p2 = new DHTOnline(peer2);
p1.setData({ message: "Hello" });
p2.setData({ message: "World!" });
t.ok(p1.id, "Generated id 1");
t.ok(p2.id, "Generated id 2");
p1.on("online", handleOnline);
p2.on("online", handleOnline);
p1.on("peer-remove", handleDisconnect);
let hasFinished = false;
function handleOnline(list) {
if (list.length === 2) {
const peerData1 = p1.getPeerData(p2.id);
const peerData2 = p2.getPeerData(p1.id);
const hasP1 = peerData1 && Object.keys(peerData1).length;
const hasP2 = peerData2 && Object.keys(peerData2).length;
if (!hasP1 || !hasP2) {
return;
}
p1.removeListener("online", handleOnline);
p2.removeListener("online", handleOnline);
setTimeout(() => {
t.pass("Seeing everyone online");
t.deepEqual(peerData1, p2.data, "Got peer data from peer 2");
t.deepEqual(peerData2, p1.data, "Got peer data from peer 2");
hasFinished = true;
peer2._allConnections.get(peer1.keyPair.publicKey).end();
}, 1000);
}
}
function handleDisconnect() {
if (!hasFinished) return t.error("Disconnected before finished");
t.pass("Peer removed on disconnect");
t.end();
}
t.teardown(() => {
[peer1, peer2].forEach((item) => item.destroy());
});
});
});
async function createPeer() {
const swarm = new Hyperswarm();
await swarm.dht.ready();
await swarm.listen();
swarm.join(topic);
return swarm;
}

23
tsconfig.json Normal file
View File

@ -0,0 +1,23 @@
{
"compilerOptions": {
"declaration": true,
"strict": true,
"module": "commonjs",
"target": "esnext",
"esModuleInterop": true,
"sourceMap": false,
"rootDir": "src",
"outDir": "dist",
"typeRoots": [
"node_modules/@types",
],
"moduleResolution": "node",
"declarationMap": true,
"declarationDir": "dist",
"emitDeclarationOnly": false,
"allowJs": true
},
"include": [
"src"
]
}