*Use new RPC protocol

This commit is contained in:
Derrick Hammer 2022-08-31 00:28:34 -04:00
parent 1e3b565b59
commit 0c4406b8a5
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 121 additions and 18 deletions

View File

@ -11,11 +11,12 @@
"dependencies": { "dependencies": {
"@lumeweb/dht-rpc-client": "https://github.com/LumeWeb/dht-rpc-client.git", "@lumeweb/dht-rpc-client": "https://github.com/LumeWeb/dht-rpc-client.git",
"@lumeweb/kernel-dht-client": "https://github.com/LumeWeb/kernel-dht-client.git", "@lumeweb/kernel-dht-client": "https://github.com/LumeWeb/kernel-dht-client.git",
"@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git",
"libkmodule": "^0.2.46" "libkmodule": "^0.2.46"
}, },
"devDependencies": { "devDependencies": {
"@types/events": "^3.0.0", "@types/events": "^3.0.0",
"@types/node": "^18.0.3", "@types/node": "^18.7.14",
"@types/read": "^0.0.29", "@types/read": "^0.0.29",
"esbuild": "^0.14.49", "esbuild": "^0.14.49",
"libskynetnode": "^0.1.3", "libskynetnode": "^0.1.3",

View File

@ -1,7 +1,12 @@
import { addHandler, handleMessage } from "libkmodule"; import { addHandler, handleMessage } from "libkmodule";
import type { ActiveQuery } from "libkmodule"; import type { ActiveQuery } from "libkmodule";
import { DHT } from "@lumeweb/kernel-dht-client"; import { DHT } from "@lumeweb/kernel-dht-client";
import { RpcNetwork, RPCRequest } from "@lumeweb/dht-rpc-client"; import {
RpcNetwork,
RpcQueryOptions,
StreamingRpcQueryOptions,
} from "@lumeweb/dht-rpc-client";
import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types";
onmessage = handleMessage; onmessage = handleMessage;
@ -11,7 +16,9 @@ const dht = network.dht as DHT;
addHandler("addRelay", handleAddRelay); addHandler("addRelay", handleAddRelay);
addHandler("removeRelay", handleRemoveRelay); addHandler("removeRelay", handleRemoveRelay);
addHandler("clearRelays", handleClearRelays); addHandler("clearRelays", handleClearRelays);
addHandler("query", handleQuery); addHandler("simpleQuery", handleSimpleQuery);
addHandler("streamingQuery", handleStreamingQuery);
addHandler("wisdomQuery", handleWisdomQuery);
addHandler("ready", handleReady); addHandler("ready", handleReady);
async function handleAddRelay(aq: ActiveQuery) { async function handleAddRelay(aq: ActiveQuery) {
@ -48,37 +55,132 @@ async function handleClearRelays(aq: ActiveQuery) {
aq.respond(); aq.respond();
} }
async function handleQuery(aq: ActiveQuery) { async function handleSimpleQuery(aq: ActiveQuery) {
const query: RPCRequest = aq.callerInput; const {
query = null,
relay = null,
options = undefined,
} = aq.callerInput as {
query: RPCRequest;
options: RpcQueryOptions;
relay: Buffer | string;
};
if (!("query" in query)) { if (!query) {
aq.reject("query required"); aq.reject("RPCRequest query required");
return; return;
} }
if (!("chain" in query)) { if (!relay) {
aq.reject("chain required"); aq.reject("relay required");
return;
}
if (!("data" in query)) {
aq.reject("data required");
return; return;
} }
let resp; let resp: RPCResponse | null = null;
try { try {
const rpcQuery = await network.query( const rpcQuery = network.simpleQuery(
query.query, relay as Buffer | string,
query.chain, query.method,
query.module,
query.data, query.data,
query.force ?? false query.bypassCache,
options
); );
resp = await rpcQuery.result; resp = await rpcQuery.result;
} catch (e: any) { } catch (e: any) {
aq.reject(e); aq.reject(e);
} }
if (resp?.error) {
aq.reject(resp?.error);
return;
}
aq.respond(resp);
}
async function handleStreamingQuery(aq: ActiveQuery) {
const {
query = null,
relay = null,
options = undefined,
} = aq.callerInput as {
query: RPCRequest;
options: StreamingRpcQueryOptions;
relay: Buffer | string;
};
if (!query) {
aq.reject("RPCRequest query required");
return;
}
if (!relay) {
aq.reject("relay required");
return;
}
if (!options || !options?.streamHandler) {
aq.reject("RPCRequest query required");
return;
}
let resp: RPCResponse | null = null;
try {
const rpcQuery = network.streamingQuery(
relay as Buffer | string,
query.method,
query.module,
query.data,
{ ...options, streamHandler: aq.sendUpdate }
);
resp = await rpcQuery.result;
} catch (e: any) {
aq.reject(e);
}
if (resp?.error) {
aq.reject(resp?.error);
return;
}
aq.respond(resp);
}
async function handleWisdomQuery(aq: ActiveQuery) {
const { query = null, options = undefined } = aq.callerInput as {
query: RPCRequest;
options: RpcQueryOptions;
relay: Buffer | string;
};
if (!query) {
aq.reject("RPCRequest query required");
return;
}
let resp: RPCResponse | null = null;
try {
const rpcQuery = network.wisdomQuery(
query.method,
query.module,
query.data,
query.bypassCache ?? undefined,
options
);
resp = await rpcQuery.result;
} catch (e: any) {
aq.reject(e);
}
if (resp?.error) {
aq.reject(resp?.error);
return;
}
aq.respond(resp); aq.respond(resp);
} }