Compare commits

...

14 Commits

Author SHA1 Message Date
Derrick Hammer 875af27733
*Refactor again to use an api for protomux channels and messages 2023-04-07 19:48:23 -04:00
Derrick Hammer 41751c7d2b
*If for some reason we responded by the time we try to send an update, just abort 2023-04-06 18:07:51 -04:00
Derrick Hammer 86a3881cfb
*remove debug 2023-04-06 17:46:21 -04:00
Derrick Hammer 44a9838490
*release mutex 2023-04-06 17:02:54 -04:00
Derrick Hammer 1bd159c19e
*Switch to a different, simpler method of syncing protomux state 2023-04-06 16:33:08 -04:00
Derrick Hammer 2e62597cd6
*Don't need to toggle our event hook 2023-04-06 14:26:57 -04:00
Derrick Hammer b8b72450e6
*Notify the slave protomux that we have synced 2023-04-06 13:19:06 -04:00
Derrick Hammer b8d0af64c7
*Update deps 2023-04-06 13:17:06 -04:00
Derrick Hammer 33b11cbde6
*In getSwarm and swarm init, await on .opened to ensure we are ready, only when we have an active relay
*Only set mux.syncState if not previously set
*change mux.syncState to emit syncProtomux
*ensure the state data are numbers
*check for undefined in the state data
2023-04-06 13:16:46 -04:00
Derrick Hammer 4f6f4eacfd
*Need to explicitly add override for protomux 2023-04-05 04:50:25 -04:00
Derrick Hammer 369b1d19a4
*remove the current listeners sync function from the syncProtomux before emitting to not create an infinite loop, and add back after 2023-04-05 03:51:04 -04:00
Derrick Hammer 04528830cf
*switch to protomux fork 2023-04-05 03:47:50 -04:00
Derrick Hammer 053e309d98
*Initial version of syncProtomux api protocol that will keep Protomux channel/message tracking in sync between workers 2023-04-05 02:41:57 -04:00
Derrick Hammer 2f30b743f6
*Move existing connection logic up 2023-04-04 11:22:37 -04:00
2 changed files with 196 additions and 11 deletions

View File

@ -14,14 +14,18 @@
"type": "module", "type": "module",
"dependencies": { "dependencies": {
"@lumeweb/hyperswarm-web": "git+https://git.lumeweb.com/LumeWeb/hyperswarm-web.git", "@lumeweb/hyperswarm-web": "git+https://git.lumeweb.com/LumeWeb/hyperswarm-web.git",
"@lumeweb/rpc": "git+https://git.lumeweb.com/LumeWeb/rpc.git",
"@noble/ed25519": "^1.7.3", "@noble/ed25519": "^1.7.3",
"@peculiar/webcrypto": "git+https://git.lumeweb.com/LumeWeb/webcrypto.git", "@peculiar/webcrypto": "git+https://git.lumeweb.com/LumeWeb/webcrypto.git",
"async-mutex": "^0.4.0",
"b4a": "^1.6.3", "b4a": "^1.6.3",
"eventemitter2": "^6.4.9", "eventemitter2": "^6.4.9",
"hyperswarm": "^4.4.0", "hyperswarm": "^4.4.0",
"libkmodule": "^0.2.53", "libkmodule": "^0.2.53",
"libskynet": "^0.0.62", "libskynet": "^0.0.62",
"noise-handshake": "^3.0.2", "noise-handshake": "^3.0.2",
"p-defer": "^4.0.0",
"protomux": "^3.4.1",
"randombytes": "github:LumeWeb/randombytes-browser" "randombytes": "github:LumeWeb/randombytes-browser"
}, },
"devDependencies": { "devDependencies": {
@ -52,7 +56,7 @@
"stream-browserify": "^3.0.0", "stream-browserify": "^3.0.0",
"ts-loader": "^9.4.2", "ts-loader": "^9.4.2",
"typescript": "^4.9.5", "typescript": "^4.9.5",
"webpack": "^5.77.0", "webpack": "^5.78.0",
"webpack-cli": "^4.10.0" "webpack-cli": "^4.10.0"
}, },
"browser": { "browser": {

View File

@ -3,18 +3,22 @@ import Hyperswarm from "@lumeweb/hyperswarm-web";
import type { ActiveQuery } from "libkmodule"; import type { ActiveQuery } from "libkmodule";
import { addHandler, getSeed, handleMessage } from "libkmodule"; import { addHandler, getSeed, handleMessage } from "libkmodule";
import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js"; import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js";
import type { Buffer } from "buffer"; import { Buffer } from "buffer";
import * as ed from "@noble/ed25519"; import * as ed from "@noble/ed25519";
import b4a from "b4a"; import b4a from "b4a";
import { pubKeyToIpv6 } from "./addr.js"; import { pubKeyToIpv6 } from "./addr.js";
import { EventEmitter2 as EventEmitter } from "eventemitter2"; import { EventEmitter2 as EventEmitter } from "eventemitter2";
import { logErr } from "libkmodule/dist"; import { logErr } from "libkmodule/dist";
// @ts-ignore
import Protomux from "protomux";
import defer, { DeferredPromise } from "p-defer";
const MAX_PEER_LISTENERS = 20; const MAX_PEER_LISTENERS = 20;
interface SwarmConnection { interface SwarmConnection {
swarm: number; swarm: number;
conn: any; conn: any;
channels: Map<number, Protomux>;
} }
interface SwarmEvents { interface SwarmEvents {
@ -34,6 +38,7 @@ let moduleReady: Promise<void> = new Promise((resolve) => {
}); });
onmessage = handleMessage; onmessage = handleMessage;
function idFactory(start = 1) { function idFactory(start = 1) {
let id = start; let id = start;
@ -46,6 +51,7 @@ function idFactory(start = 1) {
const getSwarmId = idFactory(); const getSwarmId = idFactory();
const getSocketId = idFactory(); const getSocketId = idFactory();
const getChannelId = idFactory();
addHandler("presentSeed", handlePresentSeed); addHandler("presentSeed", handlePresentSeed);
addHandler("join", handleJoin); addHandler("join", handleJoin);
@ -68,6 +74,13 @@ addHandler("socketListenEvent", handleSocketListenEvent, {
}); });
addHandler("socketWrite", handleWriteSocketEvent); addHandler("socketWrite", handleWriteSocketEvent);
addHandler("socketClose", handleCloseSocketEvent); addHandler("socketClose", handleCloseSocketEvent);
addHandler("createProtomuxChannel", createProtomuxChannel, {
receiveUpdates: true,
});
addHandler("createProtomuxMessage", createProtomuxMessage, {
receiveUpdates: true,
});
async function handlePresentSeed(aq: ActiveQuery) { async function handlePresentSeed(aq: ActiveQuery) {
const pubkey = await ed.getPublicKey(aq.callerInput.rootKey); const pubkey = await ed.getPublicKey(aq.callerInput.rootKey);
handlePresentSeedModule({ handlePresentSeedModule({
@ -99,6 +112,7 @@ async function createSwarm(): Promise<number> {
connections.set(socketId, { connections.set(socketId, {
swarm: id, swarm: id,
conn: peer, conn: peer,
channels: new Map<number, Protomux>(),
}); });
peer.once("close", () => { peer.once("close", () => {
@ -142,7 +156,11 @@ function handleSocketListenEvent(aq: ActiveQuery) {
aq.respond(); aq.respond();
}; };
const cb = (data: Buffer) => { const cb = async (data: Buffer) => {
await socket.mutex?.waitForUnlock();
if (responded) {
return;
}
aq.sendUpdate(data); aq.sendUpdate(data);
}; };
@ -176,7 +194,7 @@ function handleCloseSocketEvent(aq: ActiveQuery) {
aq.respond(); aq.respond();
} }
function handleWriteSocketEvent(aq: ActiveQuery) { async function handleWriteSocketEvent(aq: ActiveQuery) {
const socket = validateConnection(aq); const socket = validateConnection(aq);
if (!socket) { if (!socket) {
@ -189,6 +207,8 @@ function handleWriteSocketEvent(aq: ActiveQuery) {
return false; return false;
} }
await socket.mutex?.waitForUnlock();
socket.write(message); socket.write(message);
aq.respond(); aq.respond();
@ -219,9 +239,16 @@ async function getSwarm(aq: ActiveQuery): Promise<Hyperswarm> {
} }
if (!swarm) { if (!swarm) {
if (defaultSwarm.activeRelay && defaultSwarm.ready) {
await defaultSwarm.activeRelay.dht._protocol.opened;
}
return defaultSwarm; return defaultSwarm;
} }
if (swarm.activeRelay && swarm.ready) {
await swarm.activeRelay.dht._protocol.opened;
}
return swarmInstances.get(swarm) as Hyperswarm; return swarmInstances.get(swarm) as Hyperswarm;
} }
@ -281,6 +308,7 @@ async function handleJoin(aq: ActiveQuery) {
swarm.join(topic, { server: false }); swarm.join(topic, { server: false });
aq.respond(); aq.respond();
} }
async function handleGetPeerByPubkey(aq: ActiveQuery) { async function handleGetPeerByPubkey(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput; const { pubkey = null } = aq.callerInput;
@ -319,17 +347,21 @@ async function handleInit(aq: ActiveQuery) {
aq.respond(); aq.respond();
} }
async function handleReady(aq: ActiveQuery) { async function handleReady(aq: ActiveQuery) {
const swarm = await getSwarm(aq); const swarm = await getSwarm(aq);
if (swarm.activeRelay && swarm.ready) { if (swarm.activeRelay && swarm.ready) {
aq.respond(); aq.respond();
await swarm.activeRelay.dht._protocol.opened;
return; return;
} }
swarm.once("ready", () => { swarm.once("ready", async () => {
await swarm.activeRelay.dht._protocol.opened;
aq.respond(); aq.respond();
}); });
} }
async function handleListenConnections(aq: ActiveQuery) { async function handleListenConnections(aq: ActiveQuery) {
const swarm = await getSwarm(aq); const swarm = await getSwarm(aq);
const swarmId = getSwarmToSwarmId(swarm); const swarmId = getSwarmToSwarmId(swarm);
@ -351,6 +383,12 @@ async function handleListenConnections(aq: ActiveQuery) {
aq.respond(); aq.respond();
}); });
for (const conn of connections) {
if (conn[1].swarm === swarmId) {
listener(conn[1].conn);
}
}
const closeCb = () => { const closeCb = () => {
swarmEvent?.off("connection", listener); swarmEvent?.off("connection", listener);
swarmEvent?.emit("close"); swarmEvent?.emit("close");
@ -366,12 +404,6 @@ async function handleListenConnections(aq: ActiveQuery) {
return; return;
} }
swarm.onceSelf("ready", hookClose); swarm.onceSelf("ready", hookClose);
for (const conn of connections) {
if (conn[1].swarm === swarmId) {
listener(conn[1].conn);
}
}
} }
async function handleGetSocketInfo(aq: ActiveQuery) { async function handleGetSocketInfo(aq: ActiveQuery) {
@ -392,6 +424,154 @@ async function handleGetSocketInfo(aq: ActiveQuery) {
}); });
} }
async function createProtomuxChannel(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!("data" in aq.callerInput)) {
aq.reject("data required");
return;
}
const mux = Protomux.from(socket);
const data = aq.callerInput.data;
const handleCallback = (name: string, enabled: boolean) => {
if (!enabled && name !== "destroy") {
return undefined;
}
return (...args: any) => {
args = args.filter(
(item: any) => item.constructor.name.toLowerCase() !== "channel"
);
if (name === "destroy") {
connections.get(aq.callerInput.id)?.channels.delete(channelId);
aq.respond();
}
if (!enabled) {
return;
}
aq.sendUpdate({
action: name,
args,
});
};
};
aq.setReceiveUpdate?.((data: any) => {
switch (data.action) {
case "open":
channel.open();
}
});
const channel = mux.createChannel({
protocol: data?.protocol,
id: data?.id,
handshake: data?.handshake,
onopen: handleCallback("onopen", data?.onopen ?? undefined),
onclose: handleCallback("onclose", data?.onclose ?? undefined),
ondestroy: handleCallback("ondestroy", data?.ondestroy ?? undefined),
});
if (channel === null) {
aq.reject("duplicate channel");
return;
}
const channelId = getChannelId();
connections.get(aq.callerInput.id)?.channels.set(channelId, channel);
aq.sendUpdate(channelId);
}
async function createProtomuxMessage(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!("data" in aq.callerInput)) {
aq.reject("action required");
return;
}
if (!("channelId" in aq.callerInput)) {
aq.reject("channel id required");
return;
}
const channel = connections
.get(aq.callerInput.id)
?.channels.get(aq.callerInput.channelId);
if (!channel) {
aq.reject("invalid channel");
}
const data = aq.callerInput.data;
const defers: { [action: string]: DeferredPromise<any> } = {};
const handleEncoding = (enabled: boolean) => {
if (!enabled) {
return undefined;
}
const update = async (action: string, args: any) => {
await defers[action]?.promise;
defers[action] = defer();
aq.sendUpdate({
action,
args,
});
const ret = await defers[action]?.promise;
if (ret[1]) {
args[0].buffer = Buffer.from(ret[1].buffer);
args[0].start = ret[1].start;
args[0].end = ret[1].end;
}
return ret[0];
};
return {
async preencode(...args: any) {
return update("preencode", args);
},
async encode(...args: any) {
return update("encode", args);
},
async decode(...args: any) {
return update("encode", args);
},
};
};
aq.setReceiveUpdate?.((data) => {
defers[data.action]?.resolve(data.args[0]);
});
const message = channel.addMessage({
encoding: handleEncoding(data.encoding ?? false),
onmessage: data.encoding ?? undefined,
});
aq.sendUpdate({
action: "created",
});
}
function getSwarmToSocketConnectionId(socket: any) { function getSwarmToSocketConnectionId(socket: any) {
for (const conn of connections) { for (const conn of connections) {
if (conn[1].conn === socket) { if (conn[1].conn === socket) {
@ -401,6 +581,7 @@ function getSwarmToSocketConnectionId(socket: any) {
return false; return false;
} }
function getSwarmToSwarmId(swarm: any) { function getSwarmToSwarmId(swarm: any) {
for (const swarmInstance of swarmInstances) { for (const swarmInstance of swarmInstances) {
if (swarmInstance[1] === swarm) { if (swarmInstance[1] === swarm) {