kernel-s5/src/index.ts

185 lines
3.9 KiB
TypeScript
Raw Normal View History

import { concatBytes, ensureBytes } from "@lumeweb/libkernel";
2023-09-01 10:55:37 +00:00
import {
addHandler,
defer,
getKey,
handlePresentKey as handlePresentKeyModule,
} from "@lumeweb/libkernel/module";
import type { ActiveQuery } from "@lumeweb/libkernel/module";
import {
createClient as createSwarmClient,
SwarmClient,
} from "@lumeweb/kernel-swarm-client";
2023-09-01 10:55:37 +00:00
import Protomux from "@lumeweb/kernel-protomux-client";
import {
CID_HASH_TYPES,
2023-09-01 10:55:37 +00:00
createKeyPair,
createNode,
NodeId,
S5NodeConfig,
} from "@lumeweb/libs5";
import type { S5Node } from "@lumeweb/libs5";
import KeyPairEd25519 from "@lumeweb/libs5/lib/ed25519.js";
2023-09-01 10:55:37 +00:00
import { Level } from "level";
import HyperTransportPeer from "@lumeweb/libs5-transport-hyper";
const PROTOCOL = "lumeweb.service.s5";
const moduleReadyDefer = defer();
let swarm: SwarmClient;
let node: S5Node;
2023-09-01 10:55:37 +00:00
addHandler("presentKey", handlePresentKey);
addHandler("ready", ready);
addHandler("getRegistryEntry", handleGetRegistryEntry);
addHandler("setRegistryEntry", handleSetRegistryEntry);
addHandler("registrySubscription", handleRegistrySubscription, {
receiveUpdates: true,
});
2023-09-01 10:55:37 +00:00
async function handlePresentKey(aq: ActiveQuery) {
handlePresentKeyModule({
callerInput: {
key: aq.callerInput.rootKey,
},
} as ActiveQuery);
await setup();
moduleReadyDefer.resolve();
}
async function setup() {
swarm = createSwarmClient();
const peerConnectedDefer = defer();
2023-09-01 10:55:37 +00:00
const db = new Level<string, Uint8Array>("s5");
await db.open();
let config = {
keyPair: createKeyPair(await getKey()),
db,
p2p: {
peers: {
initial: [],
},
},
} as S5NodeConfig;
swarm.join(PROTOCOL);
await swarm.start();
await swarm.ready();
node = createNode(config);
2023-09-01 10:55:37 +00:00
await node.start();
swarm.on("connection", async (peer: any) => {
const muxer = Protomux.from(peer);
const s5peer = new HyperTransportPeer({
muxer,
peer,
protocol: PROTOCOL,
});
s5peer.id = new NodeId(
concatBytes(
Uint8Array.from([CID_HASH_TYPES.ED25519]),
peer.remotePublicKey,
),
2023-09-01 10:55:37 +00:00
);
await s5peer.init();
node.services.p2p.onNewPeer(s5peer, true);
node.services.p2p.once("peerConnected", peerConnectedDefer.resolve);
2023-09-01 10:55:37 +00:00
});
return peerConnectedDefer.promise;
2023-09-01 10:55:37 +00:00
}
2023-09-01 11:18:23 +00:00
async function ready(aq: ActiveQuery) {
2023-09-01 10:55:37 +00:00
await moduleReadyDefer.promise;
2023-09-01 11:18:23 +00:00
aq.respond();
2023-09-01 10:55:37 +00:00
}
async function handleGetRegistryEntry(aq: ActiveQuery) {
if (!("pubkey" in aq.callerInput)) {
aq.reject("pubkey required");
}
await moduleReadyDefer.promise;
let { pubkey } = aq.callerInput;
pubkey = ensureBytes("registry entry ", pubkey, 32);
if (pubkey[0] !== CID_HASH_TYPES.ED25519) {
pubkey = concatBytes(Uint8Array.from([CID_HASH_TYPES.ED25519]), pubkey);
}
2023-09-03 04:56:34 +00:00
const ret = await node.services.registry.get(pubkey);
if (!ret) {
aq.reject("could not find registry entry");
return;
}
aq.respond(ret);
}
2023-09-03 04:56:34 +00:00
async function handleSetRegistryEntry(aq: ActiveQuery) {
for (const field of ["key", "data", "revision"]) {
if (!(field in aq.callerInput)) {
aq.reject(`${field} required`);
return;
}
}
await moduleReadyDefer.promise;
let { key, data, revision } = aq.callerInput;
key = ensureBytes("registry entry private key", key, 32);
const sre = node.services.registry.signRegistryEntry({
kp: new KeyPairEd25519(key),
data,
revision,
});
try {
await node.services.registry.set(sre);
aq.respond(sre);
} catch (e) {
aq.reject(e);
}
}
async function handleRegistrySubscription(aq: ActiveQuery) {
if (!("pubkey" in aq.callerInput)) {
aq.reject("pubkey required");
}
await moduleReadyDefer.promise;
let { pubkey } = aq.callerInput;
pubkey = ensureBytes("registry entry ", pubkey, 32);
const wait = defer();
const done = node.services.registry.listen(pubkey, (sre) => {
aq.sendUpdate(sre);
});
aq.setReceiveUpdate?.(() => {
done();
wait.resolve();
});
await wait.promise;
aq.respond();
}