From 0c4406b8a54a46d162daf63bab6fa0d2e3b3cc3a Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Wed, 31 Aug 2022 00:28:34 -0400 Subject: [PATCH] *Use new RPC protocol --- package.json | 3 +- src/index.ts | 136 ++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 121 insertions(+), 18 deletions(-) diff --git a/package.json b/package.json index b68ddc1..2dbd3f6 100644 --- a/package.json +++ b/package.json @@ -11,11 +11,12 @@ "dependencies": { "@lumeweb/dht-rpc-client": "https://github.com/LumeWeb/dht-rpc-client.git", "@lumeweb/kernel-dht-client": "https://github.com/LumeWeb/kernel-dht-client.git", + "@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git", "libkmodule": "^0.2.46" }, "devDependencies": { "@types/events": "^3.0.0", - "@types/node": "^18.0.3", + "@types/node": "^18.7.14", "@types/read": "^0.0.29", "esbuild": "^0.14.49", "libskynetnode": "^0.1.3", diff --git a/src/index.ts b/src/index.ts index 718fc0d..4fad3e9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,12 @@ import { addHandler, handleMessage } from "libkmodule"; import type { ActiveQuery } from "libkmodule"; import { DHT } from "@lumeweb/kernel-dht-client"; -import { RpcNetwork, RPCRequest } from "@lumeweb/dht-rpc-client"; +import { + RpcNetwork, + RpcQueryOptions, + StreamingRpcQueryOptions, +} from "@lumeweb/dht-rpc-client"; +import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types"; onmessage = handleMessage; @@ -11,7 +16,9 @@ const dht = network.dht as DHT; addHandler("addRelay", handleAddRelay); addHandler("removeRelay", handleRemoveRelay); addHandler("clearRelays", handleClearRelays); -addHandler("query", handleQuery); +addHandler("simpleQuery", handleSimpleQuery); +addHandler("streamingQuery", handleStreamingQuery); +addHandler("wisdomQuery", handleWisdomQuery); addHandler("ready", handleReady); async function handleAddRelay(aq: ActiveQuery) { @@ -48,37 +55,132 @@ async function handleClearRelays(aq: ActiveQuery) { aq.respond(); } -async function handleQuery(aq: ActiveQuery) { - const query: RPCRequest = aq.callerInput; +async function handleSimpleQuery(aq: ActiveQuery) { + const { + query = null, + relay = null, + options = undefined, + } = aq.callerInput as { + query: RPCRequest; + options: RpcQueryOptions; + relay: Buffer | string; + }; - if (!("query" in query)) { - aq.reject("query required"); + if (!query) { + aq.reject("RPCRequest query required"); return; } - if (!("chain" in query)) { - aq.reject("chain required"); - return; - } - if (!("data" in query)) { - aq.reject("data required"); + if (!relay) { + aq.reject("relay required"); return; } - let resp; + let resp: RPCResponse | null = null; try { - const rpcQuery = await network.query( - query.query, - query.chain, + const rpcQuery = network.simpleQuery( + relay as Buffer | string, + query.method, + query.module, query.data, - query.force ?? false + query.bypassCache, + options ); resp = await rpcQuery.result; } catch (e: any) { aq.reject(e); } + if (resp?.error) { + aq.reject(resp?.error); + return; + } + + aq.respond(resp); +} + +async function handleStreamingQuery(aq: ActiveQuery) { + const { + query = null, + relay = null, + options = undefined, + } = aq.callerInput as { + query: RPCRequest; + options: StreamingRpcQueryOptions; + relay: Buffer | string; + }; + + if (!query) { + aq.reject("RPCRequest query required"); + return; + } + + if (!relay) { + aq.reject("relay required"); + return; + } + + if (!options || !options?.streamHandler) { + aq.reject("RPCRequest query required"); + return; + } + + let resp: RPCResponse | null = null; + + try { + const rpcQuery = network.streamingQuery( + relay as Buffer | string, + query.method, + query.module, + query.data, + { ...options, streamHandler: aq.sendUpdate } + ); + resp = await rpcQuery.result; + } catch (e: any) { + aq.reject(e); + } + + if (resp?.error) { + aq.reject(resp?.error); + return; + } + + aq.respond(resp); +} + +async function handleWisdomQuery(aq: ActiveQuery) { + const { query = null, options = undefined } = aq.callerInput as { + query: RPCRequest; + options: RpcQueryOptions; + relay: Buffer | string; + }; + + if (!query) { + aq.reject("RPCRequest query required"); + return; + } + + let resp: RPCResponse | null = null; + + try { + const rpcQuery = network.wisdomQuery( + query.method, + query.module, + query.data, + query.bypassCache ?? undefined, + options + ); + resp = await rpcQuery.result; + } catch (e: any) { + aq.reject(e); + } + + if (resp?.error) { + aq.reject(resp?.error); + return; + } + aq.respond(resp); }