From 1a844804ec000b160ae8a941160e9b9d7b01dab6 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Thu, 23 Mar 2023 12:42:21 -0400 Subject: [PATCH] *Refactor to use new RPC module API and new kernel client API --- package.json | 5 +- src/index.ts | 279 ++++++++++----------------------------------------- 2 files changed, 54 insertions(+), 230 deletions(-) diff --git a/package.json b/package.json index f9f2f52..996c565 100644 --- a/package.json +++ b/package.json @@ -4,14 +4,15 @@ "type": "module", "main": "dist/index.js", "dependencies": { - "@lumeweb/dht-rpc-client": "https://github.com/LumeWeb/dht-rpc-client.git", + "@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git", + "@lumeweb/rpc-client": "git+https://git.lumeweb.com/LumeWeb/rpc-client.git", "buffer": "^6.0.3", "libkernel": "^0.1.43", "libkmodule": "^0.2.44", "libskynet": "^0.0.62" }, "devDependencies": { - "@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git", + "@lumeweb/interface-relay": "git+https://git.lumeweb.com/LumeWeb/interface-relay.git", "@types/node": "^18.0.6", "prettier": "^2.7.1" } diff --git a/src/index.ts b/src/index.ts index 247d5be..7703e84 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,40 +1,23 @@ import { ErrTuple } from "libskynet"; -import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types"; -import { - RpcQueryOptions, - StreamHandlerFunction, - StreamingRpcQueryOptions, -} from "@lumeweb/dht-rpc-client"; +import type { + RPCRequest, + RPCResponse, + ClientRPCRequest, +} from "@lumeweb/interface-relay"; +import { RpcQueryOptions } from "@lumeweb/rpc-client"; import { Buffer } from "buffer"; -import { DataFn } from "libskynet/dist"; +import { Client } from "@lumeweb/libkernel-universal"; -const RPC_MODULE = "AQDaEPIo_lpdvz7AKbeafERBHR331RiyvweJ6OrFTplzyg"; +const RPC_MODULE = "PAMz6NHrYRxDqQ-Am5HW_l0tBHouFBCbMXjnnjurJLXpTQ"; -let callModule: any, connectModule: any; - -async function loadLibs() { - if (callModule && connectModule) { - return; - } - - if (typeof window !== "undefined" && window?.document) { - const pkg = await import("libkernel"); - callModule = pkg.callModule; - connectModule = pkg.connectModule; - } else { - const pkg = await import("libkmodule"); - callModule = pkg.callModule; - connectModule = pkg.connectModule; - } -} - -export class RpcNetwork { +export class RpcNetwork extends Client { private _actionQueue: [string, any][] = []; private _addQueue: string[] = []; private _removeQueue: string[] = []; private _def: boolean; constructor(def: boolean = true) { + super(); this._def = def; } @@ -45,134 +28,34 @@ export class RpcNetwork { } get ready(): Promise { - let promise = loadLibs(); - if (this._def) { this._networkId = 1; } else { - promise = promise - .then(() => callModule(RPC_MODULE, "createNetwork")) + Promise.resolve() + .then(() => this.callModuleReturn(RPC_MODULE, "createNetwork")) .then((ret: ErrTuple) => (this._networkId = ret[0])); } - return promise.then(() => - callModule(RPC_MODULE, "ready", { network: this._networkId }) - ); + return this.callModuleReturn("ready", { + network: this._networkId, + }); } - - private static deleteItem(array: Array, item: string): void { - if (array.includes(item)) { - let queue = new Set(array); - queue.delete(item); - [].splice.apply(array, [0, array.length].concat([...queue]) as any); - } - } - - public addRelay(pubkey: string): void { - this._addQueue.push(pubkey); - this._addQueue = [...new Set(this._addQueue)]; - RpcNetwork.deleteItem(this._removeQueue, pubkey); - } - - public removeRelay(pubkey: string): void { - this._removeQueue.push(pubkey); - this._removeQueue = [...new Set(this._removeQueue)]; - RpcNetwork.deleteItem(this._addQueue, pubkey); - } - - public clearRelays(): void { - this._actionQueue.push(["clearRelays", {}]); - } - - public wisdomQuery( - method: string, - module: string, - data: object | any[] = {}, - bypassCache: boolean = false, - options: RpcQueryOptions = {} - ): WisdomRpcQuery { - return new WisdomRpcQuery( - this, - { - method, - module, - data, - bypassCache, - }, - options - ).run(); - } - - public streamingQuery( - relay: Buffer | string, - method: string, - module: string, - streamHandler: StreamHandlerFunction, - data: object | any[] = {}, - options: RpcQueryOptions = {} - ): StreamingRpcQuery { - return new StreamingRpcQuery( - this, - relay, - { method, module, data }, - { streamHandler, ...options } - ).run(); - } - public simpleQuery( relay: Buffer | string, - method: string, - module: string, + query: ClientRPCRequest, data: object | any[] = {}, options: RpcQueryOptions = {} ): SimpleRpcQuery { - return new SimpleRpcQuery( - this, + return new SimpleRpcQuery({ + network: this, relay, - { - method, - module, - data, - }, - options - ).run(); - } - - public async processQueue(): Promise { - await loadLibs(); - for (const action of this._actionQueue) { - try { - await callModule(RPC_MODULE, action[0], { - ...action[1], - network: this._networkId, - }); - } catch (e: any) {} - } - - await Promise.allSettled( - this._removeQueue.map((item: string) => - callModule(RPC_MODULE, "removeRelay", { - pubkey: item, - network: this._networkId, - }) - ) - ); - await Promise.allSettled( - this._addQueue.map((item: string) => - callModule(RPC_MODULE, "addRelay", { - pubkey: item, - network: this._networkId, - }) - ) - ); - - this._actionQueue = []; - this._removeQueue = []; - this._addQueue = []; + query, + options, + }).run(); } } -export abstract class RpcQueryBase { +export abstract class RpcQueryBase extends Client { protected _promise?: Promise; protected _network: RpcNetwork; protected _query: RPCRequest; @@ -185,6 +68,7 @@ export abstract class RpcQueryBase { options: RpcQueryOptions = {}, queryType: string ) { + super(); this._network = network; this._query = query; this._options = options; @@ -192,22 +76,21 @@ export abstract class RpcQueryBase { } get result(): Promise { - return (this._promise as Promise).then((result): RPCResponse => { - if (result[1]) { - return { error: result[1] }; - } - return result[0]; - }); + return (this._promise as Promise) + .then((result: ErrTuple): RPCResponse => { + return result[0]; + }) + .catch((error: Error) => { + return { error: error.message }; + }); } public run(): this { - this._promise = this._network.processQueue().then(() => - callModule(RPC_MODULE, this._queryType, { - query: this._query, - options: this._options, - network: this._network.networkId, - }) - ); + this._promise = this.callModule(this._queryType, { + query: this._query, + options: this._options, + network: this._network.networkId, + }); return this; } @@ -215,88 +98,28 @@ export abstract class RpcQueryBase { export class SimpleRpcQuery extends RpcQueryBase { protected _relay: string | Buffer; - constructor( - network: RpcNetwork, - relay: string | Buffer, - query: RPCRequest, - options: RpcQueryOptions - ) { + constructor({ + network, + relay, + query, + options, + }: { + network: RpcNetwork; + relay: string | Buffer; + query: RPCRequest; + options: RpcQueryOptions; + }) { super(network, query, options, "simpleQuery"); this._relay = relay; } public run(): this { - this._promise = this._network.processQueue().then(() => - callModule(RPC_MODULE, this._queryType, { - relay: this._relay, - query: this._query, - options: this._options, - network: this._network.networkId, - }) - ); - - return this; - } -} - -export class StreamingRpcQuery extends SimpleRpcQuery { - protected _options: StreamingRpcQueryOptions; - private _sendUpdate?: DataFn; - - constructor( - network: RpcNetwork, - relay: string | Buffer, - query: RPCRequest, - options: StreamingRpcQueryOptions - ) { - super(network, relay, query, options); - this._options = options; - this._queryType = "streamingQuery"; - } - - public cancel() { - if (this._sendUpdate) { - this._sendUpdate({ cancel: true }); - } - } - - public run(): this { - this._promise = this._network.processQueue().then(() => { - const ret = connectModule( - RPC_MODULE, - this._queryType, - { - relay: this._relay, - query: this._query, - options: { ...this._options, streamHandler: true }, - network: this._network.networkId, - }, - this._options.streamHandler - ); - this._sendUpdate = ret[0]; - return ret[1]; + this._promise = this.callModule(this._queryType, { + relay: this._relay, + query: this._query, + options: this._options, + network: this._network.networkId, }); return this; } - - get result(): Promise { - return (this._promise as Promise) - .then((result): Promise => result) - .then((response: ErrTuple) => { - if (response[1]) { - return { error: response[1] }; - } - return response[0]; - }); - } -} - -export class WisdomRpcQuery extends RpcQueryBase { - constructor( - network: RpcNetwork, - query: RPCRequest, - options: RpcQueryOptions = {} - ) { - super(network, query, options, "wisdomQuery"); - } }