diff --git a/package.json b/package.json index 7e53fa1..ac543a8 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,8 @@ "libkmodule": "^0.2.53", "libskynet": "^0.0.62", "noise-handshake": "^3.0.2", - "protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git", + "p-defer": "^4.0.0", + "protomux": "^3.4.1", "randombytes": "github:LumeWeb/randombytes-browser" }, "devDependencies": { @@ -62,10 +63,5 @@ "sodium-universal": "@screamingvoid/sodium-universal", "crypto": "crypto-browserify", "stream": "stream-browserify" - }, - "pnpm": { - "overrides": { - "protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git" - } } } diff --git a/src/index.ts b/src/index.ts index 5b4a807..f45b172 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,7 @@ import Hyperswarm from "@lumeweb/hyperswarm-web"; import type { ActiveQuery } from "libkmodule"; import { addHandler, getSeed, handleMessage } from "libkmodule"; import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js"; -import type { Buffer } from "buffer"; +import { Buffer } from "buffer"; import * as ed from "@noble/ed25519"; import b4a from "b4a"; import { pubKeyToIpv6 } from "./addr.js"; @@ -11,13 +11,14 @@ import { EventEmitter2 as EventEmitter } from "eventemitter2"; import { logErr } from "libkmodule/dist"; // @ts-ignore import Protomux from "protomux"; -import { Mutex } from "async-mutex"; +import defer, { DeferredPromise } from "p-defer"; const MAX_PEER_LISTENERS = 20; interface SwarmConnection { swarm: number; conn: any; + channels: Map; } interface SwarmEvents { @@ -50,6 +51,7 @@ function idFactory(start = 1) { const getSwarmId = idFactory(); const getSocketId = idFactory(); +const getChannelId = idFactory(); addHandler("presentSeed", handlePresentSeed); addHandler("join", handleJoin); @@ -72,7 +74,10 @@ addHandler("socketListenEvent", handleSocketListenEvent, { }); addHandler("socketWrite", handleWriteSocketEvent); addHandler("socketClose", handleCloseSocketEvent); -addHandler("syncProtomux", handleSyncProtomux, { +addHandler("createProtomuxChannel", createProtomuxChannel, { + receiveUpdates: true, +}); +addHandler("createProtomuxMessage", createProtomuxMessage, { receiveUpdates: true, }); @@ -107,6 +112,7 @@ async function createSwarm(): Promise { connections.set(socketId, { swarm: id, conn: peer, + channels: new Map(), }); peer.once("close", () => { @@ -418,69 +424,152 @@ async function handleGetSocketInfo(aq: ActiveQuery) { }); } -async function handleSyncProtomux(aq: ActiveQuery) { +async function createProtomuxChannel(aq: ActiveQuery) { const socket = validateConnection(aq); if (!socket) { return; } - if (!("action" in aq.callerInput)) { - aq.reject("action required"); + if (!("data" in aq.callerInput)) { + aq.reject("data required"); return; } const mux = Protomux.from(socket); - let mutex: Mutex | null = null; - if (mux.mutex) { - mutex = mux.mutex; - } - if (!mutex) { - mutex = new Mutex(); - mux.mutex = mutex; + const data = aq.callerInput.data; + + const handleCallback = (name: string, enabled: boolean) => { + if (!enabled && name !== "destroy") { + return undefined; + } + return (...args: any) => { + args = args.filter( + (item: any) => item.constructor.name.toLowerCase() !== "channel" + ); + + if (name === "destroy") { + connections.get(aq.callerInput.id)?.channels.delete(channelId); + aq.respond(); + } + + if (!enabled) { + return; + } + + aq.sendUpdate({ + action: name, + args, + }); + }; + }; + + aq.setReceiveUpdate?.((data: any) => { + switch (data.action) { + case "open": + channel.open(); + } + }); + + const channel = mux.createChannel({ + protocol: data?.protocol, + id: data?.id, + handshake: data?.handshake, + onopen: handleCallback("onopen", data?.onopen ?? undefined), + onclose: handleCallback("onclose", data?.onclose ?? undefined), + ondestroy: handleCallback("ondestroy", data?.ondestroy ?? undefined), + }); + + if (channel === null) { + aq.reject("duplicate channel"); + return; } - await mutex.acquire(); + const channelId = getChannelId(); + + connections.get(aq.callerInput.id)?.channels.set(channelId, channel); + + aq.sendUpdate(channelId); +} + +async function createProtomuxMessage(aq: ActiveQuery) { + const socket = validateConnection(aq); + + if (!socket) { + return; + } + + if (!("data" in aq.callerInput)) { + aq.reject("action required"); + return; + } + + if (!("channelId" in aq.callerInput)) { + aq.reject("channel id required"); + return; + } + + const channel = connections + .get(aq.callerInput.id) + ?.channels.get(aq.callerInput.channelId); + + if (!channel) { + aq.reject("invalid channel"); + } const data = aq.callerInput.data; - let ret; + const defers: { [action: string]: DeferredPromise } = {}; - switch (aq.callerInput.action) { - case "popFree": - if (mux._free.includes(data)) { - mux._free[mux._free.indexOf(data)] = null; - mux._free = mux._free.filter((item: number) => item !== null); - } - break; - case "pushFree": - mux._free.push(data); - mux._free = Array.from(new Set(mux._free)); - break; - case "pushLocal": - if (typeof mux._local[data] === "undefined") { - mux._local[data] = null; - } - break; - case "pushRemote": - if (typeof mux._remote[data] === "undefined") { - mux._remote[data] = null; - } - break; - case "pullFree": - ret = Object.values(mux._free); - break; - case "pullLocal": - ret = Object.keys(mux._local); - break; - case "pullRemote": - ret = Object.keys(mux._remote); - break; - } + const handleEncoding = (enabled: boolean) => { + if (!enabled) { + return undefined; + } - aq.respond(ret); + const update = async (action: string, args: any) => { + await defers[action]?.promise; + defers[action] = defer(); + aq.sendUpdate({ + action, + args, + }); - mutex.release(); + const ret = await defers[action]?.promise; + + if (ret[1]) { + args[0].buffer = Buffer.from(ret[1].buffer); + args[0].start = ret[1].start; + args[0].end = ret[1].end; + } + + return ret[0]; + }; + + return { + async preencode(...args: any) { + return update("preencode", args); + }, + async encode(...args: any) { + return update("encode", args); + }, + async decode(...args: any) { + return update("encode", args); + }, + }; + }; + + aq.setReceiveUpdate?.((data) => { + defers[data.action]?.resolve(data.args[0]); + }); + + const message = channel.addMessage({ + encoding: handleEncoding(data.encoding ?? false), + onmessage: data.encoding ?? undefined, + }); + + aq.sendUpdate({ + action: "created", + }); } function getSwarmToSocketConnectionId(socket: any) {