From 5345135b02118cd48a4e24c19058c5dbd081c371 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Tue, 30 Aug 2022 21:39:26 -0400 Subject: [PATCH] *Update to use new RPC protocol *Add support for multiple RPC query types --- package.json | 1 + src/index.ts | 154 ++++++++++++++++++++++++++++++++++++++++++++------- yarn.lock | 4 ++ 3 files changed, 138 insertions(+), 21 deletions(-) diff --git a/package.json b/package.json index cea887b..f9f2f52 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ "libskynet": "^0.0.62" }, "devDependencies": { + "@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git", "@types/node": "^18.0.6", "prettier": "^2.7.1" } diff --git a/src/index.ts b/src/index.ts index d433bf3..e5a54a8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,11 @@ import { ErrTuple } from "libskynet"; -import type { RPCRequest } from "@lumeweb/dht-rpc-client"; +import type { RPCRequest } from "@lumeweb/relay-types"; +import { + RpcQueryOptions, + StreamHandlerFunction, + StreamingRpcQueryOptions, +} from "@lumeweb/dht-rpc-client"; +import { Buffer } from "buffer"; const RPC_MODULE = "AQDaEPIo_lpdvz7AKbeafERBHR331RiyvweJ6OrFTplzyg"; @@ -21,8 +27,6 @@ async function loadLibs() { } } -type PromiseCB = () => Promise; - export class RpcNetwork { private _actionQueue: [string, any][] = []; private _addQueue: string[] = []; @@ -52,22 +56,62 @@ export class RpcNetwork { if (array.includes(item)) { let queue = new Set(array); queue.delete(item); - [].splice.apply(array, [0, array.length].concat([...queue])); + [].splice.apply(array, [0, array.length].concat([...queue]) as any); } } - public query( - query: string, - chain: string, + public wisdomQuery( + method: string, + module: string, data: object | any[] = {}, - force: boolean = false - ): RpcQuery { - return new RpcQuery(this, { - query, - chain, - data, - force: force, - }); + bypassCache: boolean = false, + options: RpcQueryOptions = {} + ): WisdomRpcQuery { + return new WisdomRpcQuery( + this, + { + method, + module, + data, + bypassCache, + }, + options + ).run(); + } + + public streamingQuery( + relay: Buffer | string, + method: string, + module: string, + streamHandler: StreamHandlerFunction, + data: object | any[] = {}, + options: RpcQueryOptions = {} + ): StreamingRpcQuery { + return new StreamingRpcQuery( + this, + relay, + { method, module, data }, + { streamHandler, ...options } + ).run(); + } + + public simpleQuery( + relay: Buffer | string, + method: string, + module: string, + data: object | any[] = {}, + options: RpcQueryOptions = {} + ): SimpleRpcQuery { + return new SimpleRpcQuery( + this, + relay, + { + method, + module, + data, + }, + options + ).run(); } public async processQueue(): Promise { @@ -95,17 +139,35 @@ export class RpcNetwork { } } -export class RpcQuery { - private _promise: Promise; +export abstract class RpcQueryBase { + protected _promise?: Promise; + protected _network: RpcNetwork; + protected _query: RPCRequest; + protected _options: RpcQueryOptions; + protected _queryType: string; - constructor(network: RpcNetwork, query: RPCRequest) { - this._promise = network + constructor( + network: RpcNetwork, + query: RPCRequest, + options: RpcQueryOptions = {}, + queryType: string + ) { + this._network = network; + this._query = query; + this._options = options; + this._queryType = queryType; + } + + public run(): this { + this._promise = this._network .processQueue() - .then(() => callModule(RPC_MODULE, "query", query)); + .then(() => callModule(RPC_MODULE, this._queryType, this._query)); + + return this; } get result(): Promise { - return this._promise.then((result) => { + return (this._promise as Promise).then((result) => { if (result[1]) { throw new Error(result[1]); } @@ -113,3 +175,53 @@ export class RpcQuery { }); } } + +export class SimpleRpcQuery extends RpcQueryBase { + constructor( + network: RpcNetwork, + relay: string | Buffer, + query: RPCRequest, + options: RpcQueryOptions + ) { + super(network, query, options, "simpleQuery"); + } +} + +export class StreamingRpcQuery extends RpcQueryBase { + protected _options: StreamingRpcQueryOptions; + + constructor( + network: RpcNetwork, + relay: string | Buffer, + query: RPCRequest, + options: StreamingRpcQueryOptions + ) { + super(network, query, options, "streamingQuery"); + this._options = options; + } + + public run(): this { + this._promise = this._network + .processQueue() + .then(() => + connectModule( + RPC_MODULE, + this._queryType, + this._query, + this._options.streamHandler + ) + ); + + return this; + } +} + +export class WisdomRpcQuery extends RpcQueryBase { + constructor( + network: RpcNetwork, + query: RPCRequest, + options: RpcQueryOptions = {} + ) { + super(network, query, options, "wisdomQuery"); + } +} diff --git a/yarn.lock b/yarn.lock index 8d3dda6..57236c7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -46,6 +46,10 @@ libskynet "^0.0.61" msgpackr "^1.6.1" +"@lumeweb/relay-types@https://github.com/LumeWeb/relay-types.git": + version "0.1.0" + resolved "https://github.com/LumeWeb/relay-types.git#9768e2dc9dd8ccc2ee8569424bb9d7a2d8cde13e" + "@msgpackr-extract/msgpackr-extract-darwin-arm64@2.1.2": version "2.1.2" resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-2.1.2.tgz#9571b87be3a3f2c46de05585470bc4f3af2f6f00"