Compare commits

...

66 Commits

Author SHA1 Message Date
semantic-release-bot 0379757dcd chore(release): 0.0.2-develop.1 [skip ci]
## [0.0.2-develop.1](https://git.lumeweb.com/LumeWeb/kernel-swarm/compare/v0.0.1...v0.0.2-develop.1) (2023-07-01)
2023-07-01 08:38:58 +00:00
Derrick Hammer 8a80ba9ac5
dep: update @lumeweb/hyperswarm-web 2023-07-01 04:35:27 -04:00
Derrick Hammer cc929a903a
refactor: update to new sdks and portal 2023-07-01 04:21:09 -04:00
Derrick Hammer 87c7d1e02d
*update deps 2023-04-16 22:43:17 -04:00
Derrick Hammer 402508ca64
*rename handlers to match naming scheme 2023-04-15 03:58:53 -04:00
Derrick Hammer ba38f7670f
*Add api method to create swarms 2023-04-15 03:57:34 -04:00
Derrick Hammer 2832694cd5
*Switch to using message id based tracking 2023-04-09 02:27:56 -04:00
Derrick Hammer acade9801f
Revert "*Add support for destroy"
This reverts commit b42069231c.
2023-04-08 22:55:05 -04:00
Derrick Hammer b42069231c
*Add support for destroy 2023-04-08 22:50:56 -04:00
Derrick Hammer 40c042bb49
*Update to use kernel fork of protomux 2023-04-08 22:02:59 -04:00
Derrick Hammer 7206624cb5
* Refactor index.ts by checking if ret[1].buffer exists before assigning it to args[0].buffer, updating the update function to decode instead of encode, sending message if the action is "send", updating onmessage to include only relevant arguments, and adding a noop function. 2023-04-08 20:07:10 -04:00
Derrick Hammer 64ec4bb91c
*Just open the channel on creation 2023-04-08 14:47:12 -04:00
Derrick Hammer 4712b50447
*Fix LICENSE 2023-04-08 14:05:53 -04:00
Derrick Hammer dc806ba98e
*Use ?. on constructor 2023-04-07 21:50:05 -04:00
Derrick Hammer 875af27733
*Refactor again to use an api for protomux channels and messages 2023-04-07 19:48:23 -04:00
Derrick Hammer 41751c7d2b
*If for some reason we responded by the time we try to send an update, just abort 2023-04-06 18:07:51 -04:00
Derrick Hammer 86a3881cfb
*remove debug 2023-04-06 17:46:21 -04:00
Derrick Hammer 44a9838490
*release mutex 2023-04-06 17:02:54 -04:00
Derrick Hammer 1bd159c19e
*Switch to a different, simpler method of syncing protomux state 2023-04-06 16:33:08 -04:00
Derrick Hammer 2e62597cd6
*Don't need to toggle our event hook 2023-04-06 14:26:57 -04:00
Derrick Hammer b8b72450e6
*Notify the slave protomux that we have synced 2023-04-06 13:19:06 -04:00
Derrick Hammer b8d0af64c7
*Update deps 2023-04-06 13:17:06 -04:00
Derrick Hammer 33b11cbde6
*In getSwarm and swarm init, await on .opened to ensure we are ready, only when we have an active relay
*Only set mux.syncState if not previously set
*change mux.syncState to emit syncProtomux
*ensure the state data are numbers
*check for undefined in the state data
2023-04-06 13:16:46 -04:00
Derrick Hammer 4f6f4eacfd
*Need to explicitly add override for protomux 2023-04-05 04:50:25 -04:00
Derrick Hammer 369b1d19a4
*remove the current listeners sync function from the syncProtomux before emitting to not create an infinite loop, and add back after 2023-04-05 03:51:04 -04:00
Derrick Hammer 04528830cf
*switch to protomux fork 2023-04-05 03:47:50 -04:00
Derrick Hammer 053e309d98
*Initial version of syncProtomux api protocol that will keep Protomux channel/message tracking in sync between workers 2023-04-05 02:41:57 -04:00
Derrick Hammer 2f30b743f6
*Move existing connection logic up 2023-04-04 11:22:37 -04:00
Derrick Hammer c0b99e8146
*When listening for connections, send over any existing connections to the requester as if they are new 2023-04-04 08:07:02 -04:00
Derrick Hammer dd3d4948d4
*Switch to webcrypto fork 2023-04-04 06:27:27 -04:00
Derrick Hammer 21feb9fa17
*Add crypto subtle polyfill 2023-04-04 06:08:30 -04:00
Derrick Hammer c31eedd40d
*Update deps 2023-03-29 16:56:08 -04:00
Derrick Hammer 2c3d15c734
*Use swarmEvents not swarm.onceSelf to emit the close 2023-03-29 14:48:15 -04:00
Derrick Hammer 2e1780e28f
*Refactor swarm event management to use a dedicated event emitter per swarm for efficiency 2023-03-19 15:16:51 -04:00
Derrick Hammer e7a0c381b9
*Set the max event listeners via const on the peer object 2023-03-19 07:19:19 -04:00
Derrick Hammer 6e61420bfd
*update deps 2023-03-18 14:41:30 -04:00
Derrick Hammer b76f0f12dd
*change close hook to be one time as it does not need to stay hooked, else memory leak 2023-03-18 14:41:20 -04:00
Derrick Hammer 5ea823679f
*remove debug line 2023-02-18 08:44:35 -05:00
Derrick Hammer 2ad6b1fd5d
*Wrap connection event in an init call event to survive the relay being recreated 2023-02-17 22:37:05 -05:00
Derrick Hammer db079ce214
*If we have an active relay, add close hook, else queue on ready hook to add close hook 2023-02-17 19:16:22 -05:00
Derrick Hammer a84ea18680
*Update deps 2023-02-17 08:10:22 -05:00
Derrick Hammer ac7d1a0db5
*only return early if both activeRelay and ready are set 2023-02-17 08:10:14 -05:00
Derrick Hammer 5f733195c4
*Fix typo 2023-02-16 21:57:49 -05:00
Derrick Hammer 2a533d6f7f
*If the dht closes, then remove the connection handler and emit the close event on the swarm 2023-02-16 21:40:40 -05:00
Derrick Hammer 3b6448fec8
*Update deps 2023-02-06 12:33:27 -05:00
Derrick Hammer 379649b006
*Create helper getSwarmToSocketConnectionId
*on join method, disable server mode
*rename handleJoinPeer to handleJoin
*rename handleSocketListenEvent to handleSocketListenEvent
*on socketListenEvent, if setReceiveUpdate is ever called, assume we want to terminate
* Ensure full 64 bit private key is passed to handlePresentSeedModule
*Re-organize api methods
*Add new socket methods
*Add socketGetInfo method which creates an IPV6 address from the pubkey
2023-02-06 12:33:11 -05:00
Derrick Hammer 236e445ff8
*Change ready to init
*Add new ready method that listens for a ready event but returns if activeRelay is set
2023-02-01 12:07:46 -05:00
Derrick Hammer 4e443f4ccb
*Replace dht references with swarm 2023-02-01 08:18:31 -05:00
Derrick Hammer d608392532
*Rewrite to use new design 2023-02-01 07:50:45 -05:00
Derrick Hammer 4a62c94ee3
*Rename package 2023-02-01 07:50:11 -05:00
Derrick Hammer c6497655c7
*Temp build script 2023-02-01 07:49:26 -05:00
Derrick Hammer 44ed118571
*Switch to iife 2023-02-01 07:49:00 -05:00
Derrick Hammer ce3fbcad0b
*Bug fix and improve response handling 2022-09-19 08:11:54 -04:00
Derrick Hammer b25706e8ac
*Update DHT management 2022-08-31 17:14:42 -04:00
Derrick Hammer 4956592d1d
*add getRelayServers api method 2022-08-14 07:28:40 -04:00
Derrick Hammer 37fbd1ee62
*Fix build order 2022-08-14 07:26:50 -04:00
Derrick Hammer 5652eccd1b
*Add method to get current relays in a DHT instance 2022-08-14 06:49:24 -04:00
Derrick Hammer f8229abf66
*Need to shift off the first element from hexToBuf 2022-08-13 20:05:51 -04:00
Derrick Hammer cd33e1508b
*add socketExists api method 2022-08-13 15:12:49 -04:00
Derrick Hammer 4e4a22542f
*Switch from Buffer to hexToBuf 2022-08-13 15:11:18 -04:00
Derrick Hammer 77c473611a
*Support multiple dht connections with a default DHT connection 2022-08-03 12:01:11 -04:00
Derrick Hammer e01d4fc9b5 *Use noise-handshake fork to fix handshake communication bug 2022-07-31 04:10:04 -04:00
Derrick Hammer 5a3e8d7fb5 *Add browser polyfills 2022-07-27 18:46:42 -04:00
Derrick Hammer b351708da7 *Add process global 2022-07-27 18:46:15 -04:00
Derrick Hammer 869f53938b *Override sodium library with WASM version 2022-07-21 14:59:25 -04:00
Derrick Hammer 05f3916b4a *Initial version 2022-07-21 13:03:18 -04:00
10 changed files with 22738 additions and 3 deletions

13
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,13 @@
name: Build/Publish
on:
push:
branches:
- master
- develop
- develop-*
jobs:
main:
uses: lumeweb/github-node-deploy-workflow/.github/workflows/main.yml@master
secrets: inherit

8
.presetterrc.json Normal file
View File

@ -0,0 +1,8 @@
{
"preset": [
"@lumeweb/presetter-kernel-module-preset"
],
"config": {
"official": true
}
}

1
CHANGELOG.md Normal file
View File

@ -0,0 +1 @@
## [0.0.2-develop.1](https://git.lumeweb.com/LumeWeb/kernel-swarm/compare/v0.0.1...v0.0.2-develop.1) (2023-07-01)

View File

@ -1,6 +1,6 @@
MIT License
Copyright (c) 2022 Lume Web
Copyright (c) 2022 Hammer Technologies LLC
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -1,2 +1,2 @@
# kernel-dht
Skynet kernel module for DHT requests using the @lumeweb/dht-rpc-client package
# kernel-swarm
Skynet kernel module for DHT requests using the @lumeweb/swarm-rpc-client package

21931
npm-shrinkwrap.json generated Normal file

File diff suppressed because it is too large Load Diff

38
package.json Normal file
View File

@ -0,0 +1,38 @@
{
"name": "@lumeweb/kernel-swarm",
"author": {
"name": "Hammer Technologies LLC",
"email": "contact@lumeweb.com"
},
"repository": {
"type": "git",
"url": "gitea@git.lumeweb.com:LumeWeb/kernel-swarm.git"
},
"scripts": {
"prepare": "presetter bootstrap",
"build": "run build",
"semantic-release": "semantic-release"
},
"type": "module",
"dependencies": {
"@lumeweb/hyperswarm-web": "0.0.2-develop.4",
"@lumeweb/libkernel": "^0.1.0-develop.7",
"@lumeweb/rpc": "0.0.2-develop.1",
"@noble/curves": "^1.1.0",
"@peculiar/webcrypto": "^1.4.3",
"async-mutex": "^0.4.0",
"b4a": "^1.6.4",
"eventemitter2": "^6.4.9",
"hyperswarm": "^4.5.1",
"noise-handshake": "^3.0.3",
"p-defer": "^4.0.0",
"protomux": "^3.5.0"
},
"devDependencies": {
"@lumeweb/presetter-kernel-module-preset": "^0.1.0-develop.20",
"@types/b4a": "^1.6.0",
"presetter": "*"
},
"version": "0.0.2-develop.1",
"readme": "ERROR: No README data found!"
}

View File

@ -0,0 +1,45 @@
diff --git a/node_modules/protomux/index.js b/node_modules/protomux/index.js
index 07bf0ad..c8db59c 100644
--- a/node_modules/protomux/index.js
+++ b/node_modules/protomux/index.js
@@ -195,34 +195,34 @@ class Channel {
type,
encoding,
onmessage,
- recv (state, session) {
+ async recv (state, session) {
session._track(m.onmessage(encoding.decode(state), session))
},
- send (m, session = s) {
+ async send (m, session = s) {
if (session.closed === true) return false
const mux = session._mux
const state = { buffer: null, start: 0, end: typeLen }
if (mux._batch !== null) {
- encoding.preencode(state, m)
+ await encoding.preencode(state, m)
state.buffer = mux._alloc(state.end)
c.uint.encode(state, type)
- encoding.encode(state, m)
+ await encoding.encode(state, m)
mux._pushBatch(session._localId, state.buffer)
return true
}
c.uint.preencode(state, session._localId)
- encoding.preencode(state, m)
+ await encoding.preencode(state, m)
state.buffer = mux._alloc(state.end)
c.uint.encode(state, session._localId)
c.uint.encode(state, type)
- encoding.encode(state, m)
+ await encoding.encode(state, m)
mux.drained = mux.stream.write(state.buffer)

72
src/addr.ts Normal file
View File

@ -0,0 +1,72 @@
/*
The following is based on https://github.com/yggdrasil-network/yggdrasil-go/blob/develop/src/address/address.go, which is licensed LGPL3. Full credit to them for the idea and original algorithm
*/
export function pubKeyToIpv6(publicKey: Uint8Array) {
const keySize = 32;
if (publicKey.length !== keySize) {
return null;
}
const buf = new Uint8Array(keySize);
for (let i = 0; i < keySize; i++) {
buf[i] = buf[i] = publicKey[i] ^ 0xff;
}
const prefix = [0x02];
const ones = getLeadingOnes(buf);
const nodeId = getTruncatedNodeID(buf);
const addr = new Uint8Array(prefix.length + 1 + nodeId.length);
addr.set(prefix, 0);
addr[prefix.length] = ones;
addr.set(nodeId, prefix.length + 1);
const result: string[] = [];
for (let i = 0; i < 8; i++) {
const num1 = addr[i * 2].toString(16).padStart(2, "0");
const num2 = addr[i * 2 + 1].toString(16).padStart(2, "0");
result.push(`${num1}${num2}`);
}
return result.join(":");
}
function getLeadingOnes(buf: Uint8Array) {
let done = false;
let ones = 0;
for (let i = 0; i < buf.length * 8; i++) {
const bit = (buf[i >>> 3] & (0x80 >> (i & 7))) >> (7 - (i & 7));
if (!done && bit !== 0) {
ones++;
} else if (!done && bit === 0) {
done = true;
}
}
return ones;
}
function getTruncatedNodeID(buf: Uint8Array) {
const result: number[] = [];
let done = false;
let bits = 0;
let nBits = 0;
for (let i = 0; i < buf.length * 8; i++) {
const bit = (buf[i >>> 3] & (0x80 >> (i & 7))) >> (7 - (i & 7));
if (!done && bit !== 0) {
continue;
}
if (!done && bit === 0) {
done = true;
continue;
}
bits = (bits << 1) | bit;
nBits++;
if (nBits === 8) {
nBits = 0;
result.push(bits);
}
}
return result;
}

627
src/index.ts Normal file
View File

@ -0,0 +1,627 @@
// @ts-ignore
import Hyperswarm from "@lumeweb/hyperswarm-web";
import type { ActiveQuery } from "@lumeweb/libkernel/module";
import {
addHandler,
getKey,
handleMessage,
handlePresentKey as handlePresentKeyModule,
} from "@lumeweb/libkernel/module";
import { Buffer } from "buffer";
import { ed25519 } from "@noble/curves/ed25519";
import b4a from "b4a";
import { pubKeyToIpv6 } from "./addr.js";
import { EventEmitter2 as EventEmitter } from "eventemitter2";
import { logErr } from "@lumeweb/libkernel";
// @ts-ignore
import Protomux from "protomux";
import defer, { DeferredPromise } from "p-defer";
const MAX_PEER_LISTENERS = 20;
interface SwarmConnection {
swarm: number;
conn: any;
channels: Map<number, Protomux>;
}
interface SwarmEvents {
swarm: number;
events: EventEmitter;
}
const connections = new Map<number, SwarmConnection>();
const swarmInstances = new Map<number, Hyperswarm>();
const swarmEvents = new Map<number, SwarmEvents>();
let defaultSwarm: Hyperswarm;
let moduleReadyResolve: Function;
let moduleReady: Promise<void> = new Promise((resolve) => {
moduleReadyResolve = resolve;
});
onmessage = handleMessage;
function idFactory(start = 1) {
let id = start;
return function nextId() {
const nextId = id;
id += 1;
return nextId;
};
}
const getSwarmId = idFactory();
const getSocketId = idFactory();
const getChannelId = idFactory();
const getMessageId = idFactory();
addHandler("presentKey", handlePresentKey);
addHandler("join", handleJoin);
addHandler("getPeerByPubkey", handleGetPeerByPubkey);
addHandler("addRelay", handleAddRelay);
addHandler("removeRelay", handleRemoveRelay);
addHandler("clearRelays", handleClearRelays);
addHandler("getRelays", handleGetRelays);
addHandler("init", handleInit);
addHandler("ready", handleReady);
addHandler("listenConnections", handleListenConnections, {
receiveUpdates: true,
});
addHandler("socketGetInfo", handleGetSocketInfo);
addHandler("socketExists", handleSocketExists);
addHandler("socketListenEvent", handleSocketListenEvent, {
receiveUpdates: true,
});
addHandler("socketWrite", handleWriteSocketEvent);
addHandler("socketClose", handleCloseSocketEvent);
addHandler("createProtomuxChannel", handleCreateProtomuxChannel, {
receiveUpdates: true,
});
addHandler("createProtomuxMessage", handleCreateProtomuxMessage, {
receiveUpdates: true,
});
addHandler("createSwarm", handleCreateSwarm);
async function handlePresentKey(aq: ActiveQuery) {
const pubkey = ed25519.getPublicKey(aq.callerInput.rootKey);
handlePresentKeyModule({
callerInput: {
seed: {
publicKey: ed25519.getPublicKey(aq.callerInput.rootKey),
secretKey: b4a.concat([aq.callerInput.rootKey, pubkey]),
},
},
} as ActiveQuery);
if (!defaultSwarm) {
defaultSwarm = swarmInstances.get(await createSwarm()) as Hyperswarm;
}
moduleReadyResolve();
}
async function createSwarm(): Promise<number> {
const privateKey = await getKey();
const swarmInstance = new Hyperswarm({
keyPair: {
publicKey: ed25519.getPublicKey(privateKey),
secretKey: privateKey,
},
});
const id = getSwarmId();
swarmInstances.set(id, swarmInstance);
swarmInstance.onSelf("init", () => {
const swarmInstanceEvents = new EventEmitter();
swarmInstanceEvents.setMaxListeners(MAX_PEER_LISTENERS);
swarmEvents.set(id, { swarm: id, events: swarmInstanceEvents });
swarmInstance.on("connection", (peer: any) => {
const socketId = getSocketId();
connections.set(socketId, {
swarm: id,
conn: peer,
channels: new Map<number, Protomux>(),
});
peer.once("close", () => {
connections.delete(socketId);
});
swarmInstanceEvents.emit("connection", peer);
});
});
swarmInstance.onSelf("close", (...args) => {
swarmEvents.get(id)?.events.emit("close", ...args);
swarmEvents.get(id)?.events.removeAllListeners();
swarmEvents.delete(id);
});
return id;
}
function handleSocketListenEvent(aq: ActiveQuery) {
const { event = null } = aq.callerInput;
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!event) {
aq.reject("Invalid event");
return;
}
let responded = false;
const respond = () => {
if (responded) {
return;
}
responded = true;
aq.respond();
};
const cb = async (data: Buffer) => {
await socket.mutex?.waitForUnlock();
if (responded) {
return;
}
aq.sendUpdate(data);
};
socket.on(event, cb);
socket.once("close", () => {
socket.off(event, cb);
respond();
});
aq.setReceiveUpdate?.(() => {
socket.off(event, cb);
respond();
});
}
async function handleSocketExists(aq: ActiveQuery) {
const { id = null } = aq.callerInput;
aq.respond(connections.has(Number(id)));
}
function handleCloseSocketEvent(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
socket.end();
aq.respond();
}
async function handleWriteSocketEvent(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
const { message = null } = aq.callerInput;
if (!message) {
aq.reject("empty message");
return;
}
await socket.mutex?.waitForUnlock();
socket.write(message);
aq.respond();
}
function validateConnection(aq: ActiveQuery): any | boolean {
const { id = null } = aq.callerInput;
if (!id || !connections.has(id)) {
aq.reject("Invalid connection id");
return false;
}
return connections.get(id)?.conn;
}
async function getSwarm(aq: ActiveQuery): Promise<Hyperswarm> {
await moduleReady;
let swarm;
if ("callerInput" in aq && aq.callerInput) {
swarm = aq.callerInput.swarm ?? null;
if (swarm && !swarmInstances.has(swarm)) {
const error = "Invalid swarm id";
aq.reject(error);
throw new Error(error);
}
}
if (!swarm) {
if (defaultSwarm.activeRelay && defaultSwarm.ready) {
await defaultSwarm.activeRelay.dht._protocol.opened;
}
return defaultSwarm;
}
if (swarm.activeRelay && swarm.ready) {
await swarm.activeRelay.dht._protocol.opened;
}
return swarmInstances.get(swarm) as Hyperswarm;
}
async function handleAddRelay(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
if (!pubkey) {
aq.reject("invalid pubkey");
return;
}
const swarm = await getSwarm(aq);
aq.respond(await swarm.addRelay(pubkey));
}
async function handleRemoveRelay(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
if (!pubkey) {
aq.reject("invalid pubkey");
return;
}
const swarm = await getSwarm(aq);
aq.respond(swarm.removeRelay(pubkey));
}
async function handleClearRelays(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
swarm.clearRelays();
aq.respond();
}
async function handleGetRelays(aq: ActiveQuery) {
aq.respond(await (await getSwarm(aq)).relays);
}
async function handleJoin(aq: ActiveQuery) {
const { topic = null } = aq.callerInput;
const swarm = await getSwarm(aq);
if (!topic) {
aq.reject("invalid topic");
return;
}
if (!b4a.isBuffer(topic)) {
aq.reject("topic must be a buffer");
return;
}
// @ts-ignore
swarm.join(topic, { server: false });
aq.respond();
}
async function handleGetPeerByPubkey(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
const swarm = await getSwarm(aq);
if (!pubkey) {
aq.reject("invalid topic");
return;
}
if (!b4a.isBuffer(pubkey)) {
aq.reject("pubkey must be a buffer");
return;
}
// @ts-ignore
if (!swarm._allConnections.has(pubkey)) {
aq.reject("peer does not exist");
return;
}
// @ts-ignore
const peer = swarm._allConnections.get(pubkey);
aq.respond(getSwarmToSocketConnectionId(peer));
}
async function handleInit(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
try {
await swarm.init();
} catch (e) {
aq.reject((e as Error).message);
return;
}
aq.respond();
}
async function handleReady(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
if (swarm.activeRelay && swarm.ready) {
aq.respond();
await swarm.activeRelay.dht._protocol.opened;
return;
}
swarm.once("ready", async () => {
await swarm.activeRelay.dht._protocol.opened;
aq.respond();
});
}
async function handleListenConnections(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
const swarmId = getSwarmToSwarmId(swarm);
const listener = (peer: any) => {
aq.sendUpdate(getSwarmToSocketConnectionId(peer));
};
const swarmEvent = swarmEvents.get(swarmId as number)?.events;
if (!swarmEvent) {
logErr("swarm event object is missing");
}
swarmEvent?.on("connection", listener);
aq.setReceiveUpdate?.(() => {
swarmEvent?.off("connection", listener);
aq.respond();
});
for (const conn of connections) {
if (conn[1].swarm === swarmId) {
listener(conn[1].conn);
}
}
const closeCb = () => {
swarmEvent?.off("connection", listener);
swarmEvent?.emit("close");
aq.respond();
};
const hookClose = () => {
swarmEvent?.once("close", closeCb);
};
if (swarm.activeRelay) {
hookClose();
return;
}
swarm.onceSelf("ready", hookClose);
}
async function handleGetSocketInfo(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
aq.respond({
remotePublicKey: socket.remotePublicKey,
publicKey: socket.publicKey,
rawStream: {
remoteHost: pubKeyToIpv6(socket.remotePublicKey),
remotePort: 0,
remoteFamily: "IPv6",
},
});
}
async function handleCreateProtomuxChannel(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!("data" in aq.callerInput)) {
aq.reject("data required");
return;
}
const mux = Protomux.from(socket);
const data = aq.callerInput.data;
const handleCallback = (name: string, enabled: boolean) => {
if (!enabled && name !== "destroy") {
return undefined;
}
return (...args: any) => {
args = args.filter(
(item: any) => item?.constructor.name.toLowerCase() !== "channel",
);
if (name === "destroy") {
connections.get(aq.callerInput.id)?.channels.delete(channelId);
aq.respond();
}
if (!enabled) {
return;
}
aq.sendUpdate({
action: name,
args,
});
};
};
let channel = mux.createChannel({
protocol: data?.protocol,
id: data?.id,
handshake: data?.handshake,
onopen: handleCallback("onopen", data?.onopen ?? undefined),
onclose: handleCallback("onclose", data?.onclose ?? undefined),
ondestroy: handleCallback("ondestroy", data?.ondestroy ?? undefined),
});
if (channel === null) {
aq.reject("duplicate channel");
return;
}
channel.open();
const channelId = getChannelId();
connections.get(aq.callerInput.id)?.channels.set(channelId, channel);
aq.sendUpdate(channelId);
}
async function handleCreateProtomuxMessage(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!("data" in aq.callerInput)) {
aq.reject("action required");
return;
}
if (!("channelId" in aq.callerInput)) {
aq.reject("channel id required");
return;
}
const channel = connections
.get(aq.callerInput.id)
?.channels.get(aq.callerInput.channelId);
if (!channel) {
aq.reject("invalid channel");
}
const data = aq.callerInput.data;
const defers = new Map<number, DeferredPromise<any>>();
const handleEncoding = (enabled: boolean) => {
if (!enabled) {
return undefined;
}
const update = async (action: string, args: any) => {
const messageId = getMessageId();
const d = defer();
defers.set(messageId, d);
aq.sendUpdate({
id: messageId,
action,
args,
});
const ret = (await d.promise) as any;
if (ret[1]) {
if (ret[1].buffer) {
args[0].buffer = b4a.from(ret[1].buffer);
}
args[0].start = ret[1].start;
args[0].end = ret[1].end;
}
return ret[0];
};
return {
async preencode(...args: any) {
return update("preencode", args);
},
async encode(...args: any) {
return update("encode", args);
},
async decode(...args: any) {
return update("decode", args);
},
};
};
aq.setReceiveUpdate?.((data: any) => {
if (data.action === "send") {
message.send(...data.args);
}
defers.get(data.id)?.resolve(data.args);
defers.delete(data.id);
});
if (data.onmessage) {
data.onmessage = (...args: any) => {
args = args.filter(
(item: any) => item?.constructor.name.toLowerCase() !== "channel",
);
aq.sendUpdate({
action: "onmessage",
args,
});
};
}
const message = channel.addMessage({
encoding: handleEncoding(data.encoding ?? false),
onmessage: data.onmessage ?? noop,
});
aq.sendUpdate({
action: "created",
});
}
async function handleCreateSwarm(aq: ActiveQuery) {
aq.respond(await createSwarm());
}
function getSwarmToSocketConnectionId(socket: any) {
for (const conn of connections) {
if (conn[1].conn === socket) {
return conn[0];
}
}
return false;
}
function getSwarmToSwarmId(swarm: any) {
for (const swarmInstance of swarmInstances) {
if (swarmInstance[1] === swarm) {
return swarmInstance[0];
}
}
return false;
}
function noop() {}