kernel-swarm/src/index.ts

699 lines
15 KiB
TypeScript

// @ts-ignore
import Hyperswarm from "@lumeweb/hyperswarm-web";
import type { ActiveQuery } from "@lumeweb/libkernel/module";
import {
addHandler,
getKey,
handleMessage,
handlePresentKey as handlePresentKeyModule,
logErr,
} from "@lumeweb/libkernel/module";
import { Buffer } from "buffer";
import { ed25519 } from "@noble/curves/ed25519";
import b4a from "b4a";
import { pubKeyToIpv6 } from "./addr.js";
import { EventEmitter2 as EventEmitter } from "eventemitter2";
// @ts-ignore
import Protomux from "protomux";
import defer, { DeferredPromise } from "p-defer";
import { concatBytes, hexToBytes } from "@lumeweb/libweb";
const MAX_PEER_LISTENERS = 20;
interface SwarmConnection {
swarm: number;
conn: any;
channels: Map<number, Protomux>;
listeners: Map<string, number>;
}
interface SwarmEvents {
swarm: number;
events: EventEmitter;
}
const connections = new Map<number, SwarmConnection>();
const swarmInstances = new Map<number, Hyperswarm>();
const swarmEvents = new Map<number, SwarmEvents>();
let defaultSwarm: Hyperswarm;
let moduleReadyResolve: Function;
let moduleReady: Promise<void> = new Promise((resolve) => {
moduleReadyResolve = resolve;
});
onmessage = handleMessage;
function idFactory(start = 1) {
let id = start;
return function nextId() {
const _nextId = id;
id += 1;
return _nextId;
};
}
const getSwarmId = idFactory();
const getSocketId = idFactory();
const getChannelId = idFactory();
const getMessageId = idFactory();
addHandler("presentKey", handlePresentKey);
addHandler("join", handleJoin);
addHandler("getPeerByPubkey", handleGetPeerByPubkey);
addHandler("addRelay", handleAddRelay);
addHandler("removeRelay", handleRemoveRelay);
addHandler("clearRelays", handleClearRelays);
addHandler("getRelays", handleGetRelays);
addHandler("init", handleInit);
addHandler("ready", handleReady);
addHandler("listenConnections", handleListenConnections, {
receiveUpdates: true,
});
addHandler("socketGetInfo", handleGetSocketInfo);
addHandler("socketExists", handleSocketExists);
addHandler("socketListenEvent", handleSocketListenEvent, {
receiveUpdates: true,
});
addHandler("socketListeners", handleSocketListenersEvent);
addHandler("socketWrite", handleWriteSocketEvent);
addHandler("socketClose", handleCloseSocketEvent);
addHandler("socketSetKeepAlive", handleSocketSetKeepAliveEvent);
addHandler("createProtomuxChannel", handleCreateProtomuxChannel, {
receiveUpdates: true,
});
addHandler("createProtomuxMessage", handleCreateProtomuxMessage, {
receiveUpdates: true,
});
addHandler("createSwarm", handleCreateSwarm);
async function handlePresentKey(aq: ActiveQuery) {
handlePresentKeyModule({
callerInput: {
key: aq.callerInput.rootKey,
},
} as ActiveQuery);
if (!defaultSwarm) {
defaultSwarm = swarmInstances.get(await createSwarm()) as Hyperswarm;
}
moduleReadyResolve();
}
async function createSwarm(): Promise<number> {
const privateKey = await getKey();
const swarmInstance = new Hyperswarm({
keyPair: {
publicKey: ed25519.getPublicKey(privateKey),
secretKey: concatBytes(privateKey, ed25519.getPublicKey(privateKey)),
},
});
const id = getSwarmId();
swarmInstances.set(id, swarmInstance);
swarmInstance.onSelf("init", () => {
const swarmInstanceEvents = new EventEmitter();
swarmInstanceEvents.setMaxListeners(MAX_PEER_LISTENERS);
swarmEvents.set(id, { swarm: id, events: swarmInstanceEvents });
swarmInstance.on("connection", (peer: any) => {
const socketId = getSocketId();
connections.set(socketId, {
swarm: id,
conn: peer,
channels: new Map<number, Protomux>(),
listeners: new Map<string, number>(),
});
peer.once("close", () => {
connections.delete(socketId);
});
swarmInstanceEvents.emit("connection", peer);
});
swarmInstance.relays.forEach((relay) => {
swarmInstance.activeRelay.joinPeer(hexToBytes(relay));
});
});
swarmInstance.onSelf("close", (...args) => {
swarmEvents.get(id)?.events.emit("close", ...args);
swarmEvents.get(id)?.events.removeAllListeners();
swarmEvents.delete(id);
});
return id;
}
function handleSocketListenEvent(aq: ActiveQuery) {
const { event = null } = aq.callerInput;
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!event) {
aq.reject("Invalid event");
return;
}
const conn = connections.get(aq.callerInput.id) as SwarmConnection;
let responded = false;
const respond = () => {
if (responded) {
return;
}
responded = true;
let count = conn.listeners.get(aq.domain) as number;
count--;
if (count > 0) {
conn.listeners.set(aq.domain, count);
} else {
conn.listeners.delete(aq.domain);
}
aq.respond();
};
const cb = (data: Buffer) => {
if (responded) {
return;
}
aq.sendUpdate(data);
};
socket.on(event, cb);
socket.once("close", () => {
socket.off(event, cb);
respond();
});
aq.setReceiveUpdate?.(() => {
socket.off(event, cb);
respond();
});
if (!conn.listeners.has(aq.domain)) {
conn.listeners.set(aq.domain, 0);
}
conn.listeners.set(aq.domain, (conn.listeners.get(aq.domain) as number) + 1);
}
async function handleSocketListenersEvent(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
const conn = connections.get(aq.callerInput.id) as SwarmConnection;
aq.respond(
[...conn.listeners.entries()]
.filter((item) => {
return item[1] > 0;
})
.map((item) => item[0]),
);
}
async function handleSocketExists(aq: ActiveQuery) {
const { id = null } = aq.callerInput;
aq.respond(connections.has(Number(id)));
}
function handleCloseSocketEvent(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
socket.end();
aq.respond();
}
function handleSocketSetKeepAliveEvent(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
const { alive = null } = aq.callerInput;
if (!alive) {
aq.reject("alive required");
return;
}
socket.setKeepAlive(aq.callerInput.alive);
aq.respond();
}
async function handleWriteSocketEvent(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
const { message = null } = aq.callerInput;
if (!message) {
aq.reject("empty message");
return;
}
socket.write(message);
aq.respond();
}
function validateConnection(aq: ActiveQuery): any | boolean {
const { id = null } = aq.callerInput;
if (!id || !connections.has(id)) {
aq.reject("Invalid connection id");
return false;
}
return connections.get(id)?.conn;
}
async function getSwarm(aq: ActiveQuery): Promise<Hyperswarm> {
await moduleReady;
let swarm;
if ("callerInput" in aq && aq.callerInput) {
swarm = aq.callerInput.swarm ?? null;
if (swarm && !swarmInstances.has(swarm)) {
const error = "Invalid swarm id";
aq.reject(error);
throw new Error(error);
}
}
if (!swarm) {
if (defaultSwarm.activeRelay && defaultSwarm.ready) {
await defaultSwarm.activeRelay.dht._protocol.opened;
}
return defaultSwarm;
}
if (swarm.activeRelay && swarm.ready) {
await swarm.activeRelay.dht._protocol.opened;
}
return swarmInstances.get(swarm) as Hyperswarm;
}
async function handleAddRelay(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
if (!pubkey) {
aq.reject("invalid pubkey");
return;
}
const swarm = await getSwarm(aq);
aq.respond(await swarm.addRelay(pubkey));
}
async function handleRemoveRelay(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
if (!pubkey) {
aq.reject("invalid pubkey");
return;
}
const swarm = await getSwarm(aq);
aq.respond(swarm.removeRelay(pubkey));
}
async function handleClearRelays(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
swarm.clearRelays();
aq.respond();
}
async function handleGetRelays(aq: ActiveQuery) {
aq.respond(await (await getSwarm(aq)).relays);
}
async function handleJoin(aq: ActiveQuery) {
const { topic = null } = aq.callerInput;
const swarm = await getSwarm(aq);
if (!topic) {
aq.reject("invalid topic");
return;
}
if (!b4a.isBuffer(topic)) {
aq.reject("topic must be a buffer");
return;
}
// @ts-ignore
swarm.join(topic, { server: false });
aq.respond();
}
async function handleGetPeerByPubkey(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
const swarm = await getSwarm(aq);
if (!pubkey) {
aq.reject("invalid topic");
return;
}
if (!b4a.isBuffer(pubkey)) {
aq.reject("pubkey must be a buffer");
return;
}
// @ts-ignore
if (!swarm._allConnections.has(pubkey)) {
aq.reject("peer does not exist");
return;
}
// @ts-ignore
const peer = swarm._allConnections.get(pubkey);
aq.respond(getSwarmToSocketConnectionId(peer));
}
async function handleInit(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
try {
await swarm.init();
} catch (e) {
aq.reject((e as Error).message);
return;
}
aq.respond();
}
async function handleReady(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
if (swarm.activeRelay && swarm.ready) {
aq.respond();
await swarm.activeRelay.dht._protocol.opened;
return;
}
swarm.once("ready", async () => {
await swarm.activeRelay.dht._protocol.opened;
aq.respond();
});
}
async function handleListenConnections(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
const swarmId = getSwarmToSwarmId(swarm);
const listener = (peer: any) => {
aq.sendUpdate(getSwarmToSocketConnectionId(peer));
};
const swarmEvent = swarmEvents.get(swarmId as number)?.events;
if (!swarmEvent) {
aq.reject("swarm event object is missing");
return;
}
swarmEvent?.on("connection", listener);
aq.setReceiveUpdate?.(() => {
swarmEvent?.off("connection", listener);
aq.respond();
});
for (const conn of connections) {
if (conn[1].swarm === swarmId) {
listener(conn[1].conn);
}
}
const closeCb = () => {
swarmEvent?.off("connection", listener);
swarmEvent?.emit("close");
aq.respond();
};
const hookClose = () => {
swarmEvent?.once("close", closeCb);
};
if (swarm.activeRelay) {
hookClose();
return;
}
swarm.onceSelf("ready", hookClose);
}
async function handleGetSocketInfo(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
aq.respond({
remotePublicKey: socket.remotePublicKey,
publicKey: socket.publicKey,
rawStream: {
remoteHost: pubKeyToIpv6(socket.remotePublicKey),
remotePort: 0,
remoteFamily: "IPv6",
},
});
}
async function handleCreateProtomuxChannel(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!("data" in aq.callerInput)) {
aq.reject("data required");
return;
}
const mux = Protomux.from(socket);
const data = aq.callerInput.data;
const handleCallback = (name: string, enabled: boolean) => {
if (!enabled && name !== "destroy") {
return undefined;
}
return (...args: any) => {
args = args.filter(
(item: any) => item?.constructor.name.toLowerCase() !== "channel",
);
if (name === "destroy") {
connections.get(aq.callerInput.id)?.channels.delete(channelId);
aq.respond();
}
if (!enabled) {
return;
}
aq.sendUpdate({
action: name,
args,
});
};
};
let channel = mux.createChannel({
protocol: data?.protocol,
id: data?.id,
handshake: data?.handshake,
onopen: handleCallback("onopen", data?.onopen ?? undefined),
onclose: handleCallback("onclose", data?.onclose ?? undefined),
ondestroy: handleCallback("ondestroy", data?.ondestroy ?? undefined),
});
if (channel === null) {
aq.reject("duplicate channel");
return;
}
channel.open();
const channelId = getChannelId();
connections.get(aq.callerInput.id)?.channels.set(channelId, channel);
aq.sendUpdate(channelId);
}
async function handleCreateProtomuxMessage(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!("data" in aq.callerInput)) {
aq.reject("action required");
return;
}
if (!("channelId" in aq.callerInput)) {
aq.reject("channel id required");
return;
}
const channel = connections
.get(aq.callerInput.id)
?.channels.get(aq.callerInput.channelId);
if (!channel) {
aq.reject("invalid channel");
}
const data = aq.callerInput.data;
const defers = new Map<number, DeferredPromise<any>>();
const handleEncoding = (enabled: boolean) => {
if (!enabled) {
return undefined;
}
const update = async (action: string, args: any) => {
const messageId = getMessageId();
const d = defer();
defers.set(messageId, d);
aq.sendUpdate({
id: messageId,
action,
args,
});
const ret = (await d.promise) as any;
if (ret[1]) {
if (ret[1].buffer) {
args[0].buffer = b4a.from(ret[1].buffer);
}
args[0].start = ret[1].start;
args[0].end = ret[1].end;
}
return ret[0];
};
return {
async preencode(...args: any) {
return update("preencode", args);
},
async encode(...args: any) {
return update("encode", args);
},
async decode(...args: any) {
return update("decode", args);
},
};
};
aq.setReceiveUpdate?.((data: any) => {
if (data.action === "send") {
message.send(...data.args);
}
defers.get(data.id)?.resolve(data.args);
defers.delete(data.id);
});
if (data.onmessage) {
data.onmessage = async (...args: any) => {
for (let i = 0; i < args.length; i++) {
if (isPromise(args[i])) {
args[i] = await args[i];
}
}
args = args.filter(
(item: any) => item?.constructor.name.toLowerCase() !== "channel",
);
aq.sendUpdate({
action: "onmessage",
args,
});
};
}
const message = channel.addMessage({
encoding: handleEncoding(data.encoding ?? false),
onmessage: data.onmessage ?? noop,
});
aq.sendUpdate({
action: "created",
});
}
async function handleCreateSwarm(aq: ActiveQuery) {
aq.respond(await createSwarm());
}
function getSwarmToSocketConnectionId(socket: any) {
for (const conn of connections) {
if (conn[1].conn === socket) {
return conn[0];
}
}
return false;
}
function getSwarmToSwarmId(swarm: any) {
for (const swarmInstance of swarmInstances) {
if (swarmInstance[1] === swarm) {
return swarmInstance[0];
}
}
return false;
}
function noop() {}
function isPromise(obj: Promise<any>) {
return (
!!obj &&
(typeof obj === "object" || typeof obj === "function") &&
typeof obj.then === "function"
);
}