diff --git a/messages.proto b/messages.proto index 815754c..0f5cf13 100644 --- a/messages.proto +++ b/messages.proto @@ -7,6 +7,7 @@ enum Type { DISCONNECTED = 4; ADD_ITEM = 5; REMOVE_ITEM = 6; + HEARTBEAT = 7; } message Message { diff --git a/src/index.ts b/src/index.ts index 8efcaae..2eec873 100644 --- a/src/index.ts +++ b/src/index.ts @@ -56,7 +56,7 @@ export default class DHTCache extends EventEmitter { this.flood.on("message", (message, id) => this.onGetBroadcast(message, id)); this.swarm.on("connection", (peer: any) => - this.flood.send(peer, b4a.from("hello"), 0) + this.send(peer, b4a.from("hello")) ); [...this.swarm.peers.values()] @@ -69,6 +69,8 @@ export default class DHTCache extends EventEmitter { }); this._ensurePeer(this.id); + setInterval(() => this._heartbeatCheck(), 5 * 1000); + setInterval(() => this._emitHeartbeat(), 60 * 1000); } private _cache: Set; @@ -163,8 +165,8 @@ export default class DHTCache extends EventEmitter { this.flood.broadcast(message, ttl); } - public send(message: any) { - this.flood.send(message, 0); + public send(peer: any, message: any) { + this.flood.send(peer, message, 0); } protected addPeerHandler(peer: any) { @@ -186,6 +188,8 @@ export default class DHTCache extends EventEmitter { id, }); + this._emitHeartbeat(peer); + if (this.bootstrapped) { return; } @@ -278,6 +282,15 @@ export default class DHTCache extends EventEmitter { } else if (type === Type.BOOTSTRAP_RESPONSE) { const { bootstrap } = decoded; this._bootstrapFrom(bootstrap); + } else if (type === Type.HEARTBEAT) { + let { id: toId, signature, data: bufData } = decoded; + + toId = b4a.from(toId as Uint8Array); + + if (signature && crypto.verify(bufData, signature, id)) { + this._addEntityConnection(id, toId as Buffer); + this._setEntity(toId as Buffer, { heartbeat: Date.now() }); + } } } @@ -447,4 +460,49 @@ export default class DHTCache extends EventEmitter { } })(); } + + private _heartbeatCheck() { + for (const peer of this.connectedTo) { + const pubkey = b4a.from(peer, "hex"); + const heartbeat = this.graph.node.get(peer)?.heartbeat; + + const conn = this.swarm._allConnections.get(pubkey); + if (!conn) { + this.onRemovePeer({ remotePublicKey: pubkey }); + continue; + } + + if (heartbeat > 0 && Date.now() - heartbeat > 60 * 1000) { + conn.end(); + } + } + } + + private _emitHeartbeat(peer?: any) { + let peers = [...this.connectedTo]; + + if (peer) { + // @ts-ignore + peers = [b4a.from(peer.remotePublicKey).toString("hex")]; + } + + for (const peer of peers) { + const pubkey = b4a.from(peer, "hex"); + const conn = this.swarm._allConnections.get(pubkey); + if (!conn) { + continue; + } + const data = b4a.from(Uint8Array.from([1])); + + this.send( + conn, + this._compileMessage({ + type: Type.HEARTBEAT, + id: this.id, + data, + signature: crypto.sign(data, this.swarm.keyPair.secretKey), + }) + ); + } + } } diff --git a/src/messages.ts b/src/messages.ts index cc1cd5a..4d13bc8 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -78,7 +78,11 @@ export enum Type { /** * @generated from protobuf enum value: REMOVE_ITEM = 6; */ - REMOVE_ITEM = 6 + REMOVE_ITEM = 6, + /** + * @generated from protobuf enum value: HEARTBEAT = 7; + */ + HEARTBEAT = 7 } // @generated message type with reflection information, may provide speed optimized methods class Message$Type extends MessageType {