*Update to use new RPC protocol

*Add support for multiple RPC query types
This commit is contained in:
Derrick Hammer 2022-08-30 21:39:26 -04:00
parent 80fcb215b6
commit 5345135b02
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
3 changed files with 138 additions and 21 deletions

View File

@ -11,6 +11,7 @@
"libskynet": "^0.0.62" "libskynet": "^0.0.62"
}, },
"devDependencies": { "devDependencies": {
"@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git",
"@types/node": "^18.0.6", "@types/node": "^18.0.6",
"prettier": "^2.7.1" "prettier": "^2.7.1"
} }

View File

@ -1,5 +1,11 @@
import { ErrTuple } from "libskynet"; import { ErrTuple } from "libskynet";
import type { RPCRequest } from "@lumeweb/dht-rpc-client"; import type { RPCRequest } from "@lumeweb/relay-types";
import {
RpcQueryOptions,
StreamHandlerFunction,
StreamingRpcQueryOptions,
} from "@lumeweb/dht-rpc-client";
import { Buffer } from "buffer";
const RPC_MODULE = "AQDaEPIo_lpdvz7AKbeafERBHR331RiyvweJ6OrFTplzyg"; const RPC_MODULE = "AQDaEPIo_lpdvz7AKbeafERBHR331RiyvweJ6OrFTplzyg";
@ -21,8 +27,6 @@ async function loadLibs() {
} }
} }
type PromiseCB = () => Promise<ErrTuple>;
export class RpcNetwork { export class RpcNetwork {
private _actionQueue: [string, any][] = []; private _actionQueue: [string, any][] = [];
private _addQueue: string[] = []; private _addQueue: string[] = [];
@ -52,22 +56,62 @@ export class RpcNetwork {
if (array.includes(item)) { if (array.includes(item)) {
let queue = new Set(array); let queue = new Set(array);
queue.delete(item); queue.delete(item);
[].splice.apply(array, [0, array.length].concat([...queue])); [].splice.apply(array, [0, array.length].concat([...queue]) as any);
} }
} }
public query( public wisdomQuery(
query: string, method: string,
chain: string, module: string,
data: object | any[] = {}, data: object | any[] = {},
force: boolean = false bypassCache: boolean = false,
): RpcQuery { options: RpcQueryOptions = {}
return new RpcQuery(this, { ): WisdomRpcQuery {
query, return new WisdomRpcQuery(
chain, this,
data, {
force: force, method,
}); module,
data,
bypassCache,
},
options
).run();
}
public streamingQuery(
relay: Buffer | string,
method: string,
module: string,
streamHandler: StreamHandlerFunction,
data: object | any[] = {},
options: RpcQueryOptions = {}
): StreamingRpcQuery {
return new StreamingRpcQuery(
this,
relay,
{ method, module, data },
{ streamHandler, ...options }
).run();
}
public simpleQuery(
relay: Buffer | string,
method: string,
module: string,
data: object | any[] = {},
options: RpcQueryOptions = {}
): SimpleRpcQuery {
return new SimpleRpcQuery(
this,
relay,
{
method,
module,
data,
},
options
).run();
} }
public async processQueue(): Promise<void> { public async processQueue(): Promise<void> {
@ -95,17 +139,35 @@ export class RpcNetwork {
} }
} }
export class RpcQuery { export abstract class RpcQueryBase {
private _promise: Promise<any>; protected _promise?: Promise<any>;
protected _network: RpcNetwork;
protected _query: RPCRequest;
protected _options: RpcQueryOptions;
protected _queryType: string;
constructor(network: RpcNetwork, query: RPCRequest) { constructor(
this._promise = network network: RpcNetwork,
query: RPCRequest,
options: RpcQueryOptions = {},
queryType: string
) {
this._network = network;
this._query = query;
this._options = options;
this._queryType = queryType;
}
public run(): this {
this._promise = this._network
.processQueue() .processQueue()
.then(() => callModule(RPC_MODULE, "query", query)); .then(() => callModule(RPC_MODULE, this._queryType, this._query));
return this;
} }
get result(): Promise<any> { get result(): Promise<any> {
return this._promise.then((result) => { return (this._promise as Promise<any>).then((result) => {
if (result[1]) { if (result[1]) {
throw new Error(result[1]); throw new Error(result[1]);
} }
@ -113,3 +175,53 @@ export class RpcQuery {
}); });
} }
} }
export class SimpleRpcQuery extends RpcQueryBase {
constructor(
network: RpcNetwork,
relay: string | Buffer,
query: RPCRequest,
options: RpcQueryOptions
) {
super(network, query, options, "simpleQuery");
}
}
export class StreamingRpcQuery extends RpcQueryBase {
protected _options: StreamingRpcQueryOptions;
constructor(
network: RpcNetwork,
relay: string | Buffer,
query: RPCRequest,
options: StreamingRpcQueryOptions
) {
super(network, query, options, "streamingQuery");
this._options = options;
}
public run(): this {
this._promise = this._network
.processQueue()
.then(() =>
connectModule(
RPC_MODULE,
this._queryType,
this._query,
this._options.streamHandler
)
);
return this;
}
}
export class WisdomRpcQuery extends RpcQueryBase {
constructor(
network: RpcNetwork,
query: RPCRequest,
options: RpcQueryOptions = {}
) {
super(network, query, options, "wisdomQuery");
}
}

View File

@ -46,6 +46,10 @@
libskynet "^0.0.61" libskynet "^0.0.61"
msgpackr "^1.6.1" msgpackr "^1.6.1"
"@lumeweb/relay-types@https://github.com/LumeWeb/relay-types.git":
version "0.1.0"
resolved "https://github.com/LumeWeb/relay-types.git#9768e2dc9dd8ccc2ee8569424bb9d7a2d8cde13e"
"@msgpackr-extract/msgpackr-extract-darwin-arm64@2.1.2": "@msgpackr-extract/msgpackr-extract-darwin-arm64@2.1.2":
version "2.1.2" version "2.1.2"
resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-2.1.2.tgz#9571b87be3a3f2c46de05585470bc4f3af2f6f00" resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-2.1.2.tgz#9571b87be3a3f2c46de05585470bc4f3af2f6f00"