From 55159d89ce5358d9c34af5b10326a45624905525 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Wed, 5 Apr 2023 03:52:53 -0400 Subject: [PATCH] *implement syncing protomux channel/message state --- package.json | 4 +++- src/index.ts | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 9015fe5..68cdcd5 100644 --- a/package.json +++ b/package.json @@ -7,9 +7,11 @@ "@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git", "@noble/hashes": "^1.2.0", "@siaweb/libweb": "git+https://git.lumeweb.com/LumeWeb/libsiaweb.git", + "async-mutex": "^0.4.0", "b4a": "^1.6.3", "backoff.js": "^1.0.4", - "eventemitter3": "^5.0.0" + "eventemitter3": "^5.0.0", + "protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git" }, "devDependencies": { "@types/b4a": "^1.6.0", diff --git a/src/index.ts b/src/index.ts index 977fe41..110700c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,6 +8,9 @@ import type { EventEmitter } from "eventemitter3"; // @ts-ignore import Backoff from "backoff.js"; +import { Mutex } from "async-mutex"; +// @ts-ignore +import Protomux from "protomux"; export class SwarmClient extends Client { private useDefaultSwarm: boolean; @@ -67,9 +70,11 @@ export class SwarmClient extends Client { throw new Error("not implemented"); } + async init(): Promise { return await this.callModuleReturn("init", { swarm: this.swarm }); } + async ready(): Promise { if (this._ready) { return this._ready; @@ -132,6 +137,7 @@ export class SwarmClient extends Client { public async clearRelays(): Promise { return this.callModuleReturn("clearRelays", { swarm: this.swarm }); } + public async getRelays(): Promise { return this.callModuleReturn("getRelays", { swarm: this.swarm }); } @@ -156,6 +162,8 @@ export class Socket extends Client { private id: number; private eventUpdates: { [event: string]: DataFn[] } = {}; + private syncMutex = new Mutex(); + constructor(id: number) { super(); this.id = id; @@ -178,6 +186,52 @@ export class Socket extends Client { this._remotePublicKey = info.remotePublicKey; this._rawStream = info.rawStream; + + this._initSync(); + } + + private async _initSync() { + const mux = Protomux.from(this); + + const [update] = this.connectModule( + "syncProtomux", + { id: this.id }, + async (data: any) => { + await this.syncMutex.acquire(); + + ["remote", "local"].forEach((field) => { + const rField = `_${field}`; + data[field].forEach((item: any) => { + if (!mux[rField][item]) { + while (item > mux[rField].length) { + mux[rField].push(null); + } + } + if (!mux[rField][item]) { + mux[rField][item] = null; + } + }); + }); + + data.free.forEach((index: number) => { + if (mux._free[index] === null) { + mux._free[index] = undefined; + } + }); + mux._free = mux._free.filter((item: any) => item !== undefined); + + this.syncMutex.release(); + } + ); + + const send = (mux: any) => { + update({ + remote: Object.keys(mux._remote), + local: Object.keys(mux._local), + free: mux._free, + }); + }; + mux.syncState = send.bind(undefined, mux); } on>(