Compare commits

..

No commits in common. "v0.0.2-develop.1" and "v0.0.1" have entirely different histories.

16 changed files with 3 additions and 19938 deletions

View File

@ -1,13 +0,0 @@
name: Build/Publish
on:
push:
branches:
- master
- develop
- develop-*
jobs:
main:
uses: lumeweb/github-node-deploy-workflow/.github/workflows/main.yml@master
secrets: inherit

View File

@ -1,5 +0,0 @@
{
"preset": [
"@lumeweb/node-library-preset"
]
}

View File

@ -1,7 +0,0 @@
## [0.0.2-develop.1](https://git.lumeweb.com/LumeWeb/rpc-client/compare/v0.0.1...v0.0.2-develop.1) (2023-07-04)
### Reverts
* Revert "*add a _timeoutCanceled property and abort handeTimeout if true" ([1d73d23](https://git.lumeweb.com/LumeWeb/rpc-client/commit/1d73d2370057c6efca45a00f1676722ccd3ac3ab))
* Revert "*Switch from clearing the timeout to setting the _timeoutCanceled flag" ([f9036e1](https://git.lumeweb.com/LumeWeb/rpc-client/commit/f9036e1c73572e38e27fdf748cc164c4f2ca32e1))

View File

@ -1,6 +1,6 @@
MIT License
Copyright (c) 2022 Hammer Technologies LLC
Copyright (c) 2022 Lume Web
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -1 +1,2 @@
# rpc-client
# dht-rpc-client
A client library that uses hypercore and the @lumeweb/relay server along with Skynet for web, to perform "Wisdom of the crowd" RPC requests.

19218
npm-shrinkwrap.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,27 +0,0 @@
{
"name": "@lumeweb/rpc-client",
"type": "module",
"version": "0.0.2-develop.1",
"main": "lib/index.js",
"repository": {
"type": "git",
"url": "gitea@git.lumeweb.com:LumeWeb/rpc-client.git"
},
"devDependencies": {
"@lumeweb/node-library-preset": "^0.2.7",
"presetter": "*"
},
"readme": "ERROR: No README data found!",
"scripts": {
"prepare": "presetter bootstrap",
"build": "run build",
"semantic-release": "semantic-release"
},
"dependencies": {
"@lumeweb/interface-relay": "0.0.2-develop.1",
"@lumeweb/rpc": "0.0.2-develop.2"
},
"publishConfig": {
"access": "public"
}
}

View File

@ -1 +0,0 @@
export const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE";

View File

@ -1,4 +0,0 @@
export * from "./types.js";
export * from "./query/index.js";
export * from "./network.js";
export * from "./util.js";

View File

@ -1,169 +0,0 @@
// @ts-ignore
import Hyperswarm from "hyperswarm";
import RpcNetworkQueryFactory from "./query/index.js";
import b4a from "b4a";
import { createHash, maybeGetAsyncProperty } from "./util.js";
export default class RpcNetwork {
private _relaysAvailablePromise?: Promise<void>;
private _relaysAvailableResolve?: Function;
constructor(swarm = new Hyperswarm()) {
this._swarm = swarm;
this.init();
}
private _methods: Map<string, Set<string>> = new Map<string, Set<string>>();
get methods(): Map<string, Set<string>> {
return this._methods;
}
private _factory = new RpcNetworkQueryFactory(this);
get factory(): RpcNetworkQueryFactory {
return this._factory;
}
private _swarm: typeof Hyperswarm;
get swarm() {
return this._swarm;
}
private _majorityThreshold = 0.75;
get majorityThreshold(): number {
return this._majorityThreshold;
}
set majorityThreshold(value: number) {
this._majorityThreshold = value;
}
private _queryTimeout = 30;
get queryTimeout(): number {
return this._queryTimeout;
}
set queryTimeout(value: number) {
this._queryTimeout = value;
}
private _relayTimeout = 2;
get relayTimeout(): number {
return this._relayTimeout;
}
set relayTimeout(value: number) {
this._relayTimeout = value;
}
private _relays: Map<string, any> = new Map<string, string[]>();
get relays(): Map<string, string[]> {
return this._relays;
}
private _ready?: Promise<void>;
get ready(): Promise<void> {
if (!this._ready) {
this._ready = maybeGetAsyncProperty(this._swarm.dht).then((dht: any) =>
dht.ready()
) as Promise<void>;
}
return this._ready as Promise<void>;
}
get readyWithRelays(): Promise<void> {
return this.ready.then(() => this._relaysAvailablePromise);
}
private _bypassCache: boolean = false;
get bypassCache(): boolean {
return this._bypassCache;
}
set bypassCache(value: boolean) {
this._bypassCache = value;
}
public getAvailableRelay(module: string, method: string) {
method = `${module}.${method}`;
let relays = this._methods.get(method) ?? new Set();
if (!relays.size) {
throw Error("no available relay");
}
return this._relays.get(
Array.from(relays)[Math.floor(Math.random() * relays.size)]
);
}
public getRelay(pubkey: string) {
if (this._relays.has(pubkey)) {
return this._relays.get(pubkey);
}
return undefined;
}
private init() {
this._swarm.join(createHash("lumeweb"));
this.setupRelayPromise();
this._swarm.on("connection", async (relay: any) => {
const pubkey = b4a
.from(await maybeGetAsyncProperty(relay.remotePublicKey))
.toString("hex");
relay.once("close", () => {
this._methods.forEach((item) => {
if (item.has(pubkey)) {
item.delete(pubkey);
}
});
this.relays.delete(pubkey);
if (!this._relays.size) {
this.setupRelayPromise();
}
});
const query = this._factory.simple({
relay,
query: { module: "core", method: "get_methods", data: null },
});
const resp = await query.result;
if (resp.error) {
relay.end();
return;
}
if (resp.data) {
this._relays.set(pubkey, relay);
(resp.data as string[]).forEach((item) => {
const methods: Set<string> =
this._methods.get(item) ?? new Set<string>();
methods.add(pubkey);
this._methods.set(item, methods);
});
this._relaysAvailableResolve?.();
}
});
}
private setupRelayPromise() {
this._relaysAvailablePromise = new Promise<void>((resolve) => {
this._relaysAvailableResolve = resolve;
});
}
}

View File

@ -1,121 +0,0 @@
import { clearTimeout, setTimeout } from "timers";
import RpcNetwork from "../network.js";
import { RpcQueryOptions } from "../types.js";
import type {
ClientRPCRequest,
RPCRequest,
RPCResponse,
} from "@lumeweb/interface-relay";
export default abstract class RpcQueryBase {
protected _network: RpcNetwork;
protected _query: RPCRequest;
protected _options: RpcQueryOptions;
protected _promise?: Promise<any>;
protected _timeoutTimer?: any;
protected _timeout: boolean = false;
protected _completed: boolean = false;
protected _response?: RPCResponse;
protected _error?: string;
protected _promiseResolve?: (data: any) => void;
constructor({
network,
query,
options = {},
}: {
network: RpcNetwork;
query: ClientRPCRequest | RPCRequest;
options: RpcQueryOptions;
}) {
this._network = network;
this._query = query;
this._options = options;
}
get result(): Promise<RPCResponse> {
return this._promise as Promise<RPCResponse>;
}
protected handeTimeout() {
this.resolve(undefined, true);
}
protected resolve(data?: RPCResponse, timeout: boolean = false): void {
clearTimeout(this._timeoutTimer);
this._timeout = timeout;
this._completed = true;
if (timeout) {
data = {
error: "timeout",
};
}
this._promiseResolve?.(data);
}
public run(): this {
this._promise =
this._promise ??
new Promise<any>((resolve) => {
this._promiseResolve = resolve;
});
this._timeoutTimer =
this._timeoutTimer ??
setTimeout(
this.handeTimeout.bind(this),
(this._options?.queryTimeout || this._network.queryTimeout) * 1000
);
this._doRun();
return this;
}
private async _doRun() {
try {
await this._network.ready;
await this._run();
} catch (e: any) {
this._promiseResolve?.({ error: e?.message || e?.error });
}
}
protected setupRelayTimeout(reject: Function): NodeJS.Timeout {
return setTimeout(() => {
this._error = "timeout";
reject("timeout");
}, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout;
}
protected abstract _run(): void;
protected async queryRpc(rpc: any, request: RPCRequest) {
let timer: NodeJS.Timeout;
return new Promise((resolve, reject) => {
rpc
// @ts-ignore
.request(`${request.module}.${request.method}`, request.data)
.then((resp: any) => {
if (resp.error) {
throw new Error(resp.error);
}
clearTimeout(timer as any);
this._response = resp;
resolve(null);
})
.catch((e: Error) => {
this._error = e.message;
reject({ error: e.message });
clearTimeout(timer as any);
});
timer = this.setupRelayTimeout(reject);
});
}
}

View File

@ -1,49 +0,0 @@
import RpcNetwork from "../network.js";
import { RPCBroadcastRequest, RPCRequest } from "@lumeweb/interface-relay";
import { RpcQueryOptions } from "../types.js";
import { hashQuery } from "../util.js";
import SimpleRpcQuery from "./simple.js";
export default class ClearCacheRpcQuery extends SimpleRpcQuery {
protected _relays: string[];
constructor({
network,
relays,
query,
options,
}: {
network: RpcNetwork;
relays: string[];
query: RPCRequest;
options: RpcQueryOptions;
}) {
super({ network, relay: "", query, options });
this._relays = relays;
}
protected async _run(): Promise<void> {
// @ts-ignore
this._relay = getActiveRelay().stream.remotePublicKey;
await this.queryRelay();
await this.checkResponses();
}
protected async queryRelay(): Promise<any> {
return this.queryRpc(
this._network.getAvailableRelay("rpc", "broadcast_request"),
{
module: "rpc",
method: "broadcast_request",
data: {
request: {
module: "rpc",
method: "clear_cached_item",
data: hashQuery(this._query),
},
relays: this._relays,
} as RPCBroadcastRequest,
}
);
}
}

View File

@ -1,53 +0,0 @@
import { ClientRPCRequest, RPCRequest } from "@lumeweb/interface-relay";
import { RpcQueryOptions } from "../types.js";
import SimpleRpcQuery from "./simple.js";
import ClearCacheRpcQuery from "./clearCache.js";
import RpcNetwork from "../network.js";
import RpcQueryBase from "./base.js";
export default class RpcNetworkQueryFactory {
private _network: RpcNetwork;
constructor(network: RpcNetwork) {
this._network = network;
}
simple({
relay,
query,
options = {},
}: {
relay?: string | Buffer;
query: ClientRPCRequest;
options?: RpcQueryOptions;
}): SimpleRpcQuery {
return new SimpleRpcQuery({
network: this._network,
relay,
query: {
...query,
bypassCache: query?.bypassCache || this._network.bypassCache,
},
options,
}).run();
}
clearCache({
relays,
query,
options = {},
}: {
relays: string[];
query: RPCRequest;
options?: RpcQueryOptions;
}): ClearCacheRpcQuery {
return new ClearCacheRpcQuery({
network: this._network,
query,
relays,
options,
}).run();
}
}
export { RpcNetwork, RpcQueryBase, SimpleRpcQuery };

View File

@ -1,122 +0,0 @@
import RpcNetwork from "../network.js";
import { ClientRPCRequest, RPCResponse } from "@lumeweb/interface-relay";
import { RpcQueryOptions } from "../types.js";
import b4a from "b4a";
import {
hashQuery,
isPromise,
maybeGetAsyncProperty,
setupStream,
validateTimestampedResponse,
} from "../util.js";
import RPC from "@lumeweb/rpc";
import { ERR_INVALID_SIGNATURE } from "../error.js";
import RpcQueryBase from "./base.js";
export default class SimpleRpcQuery extends RpcQueryBase {
protected _relay?: string | any;
protected declare _query: ClientRPCRequest;
constructor({
network,
relay,
query,
options,
}: {
network: RpcNetwork;
relay?: string | Buffer | any;
query: ClientRPCRequest;
options: RpcQueryOptions;
}) {
super({ network, query, options });
if (b4a.isBuffer(relay)) {
relay = b4a.from(relay).toString("hex");
}
this._relay = relay;
}
protected async _run(): Promise<void> {
await this.queryRelay();
await this.checkResponses();
}
protected async queryRelay(): Promise<any> {
let socket = this._relay;
if (socket) {
if (typeof socket === "string") {
try {
const relay = this._network.getRelay(socket);
if (this._network.getRelay(socket)) {
socket = relay;
}
} catch {}
}
if (typeof socket === "string") {
try {
socket = this._network.swarm.connect(b4a.from(this._relay, "hex"));
if (isPromise(socket)) {
socket = await socket;
}
} catch {}
}
}
if (!socket) {
socket = this._network.getAvailableRelay(
this._query.module,
this._query.method
);
}
this._relay = socket;
await socket.opened;
const rpc = await setupStream(socket);
if (this._query.bypassCache) {
delete this._query.bypassCache;
await this.queryRpc(rpc, {
module: "rpc",
method: "clear_cached_item",
data: hashQuery(this._query),
});
}
if ("bypassCache" in this._query) {
delete this._query.bypassCache;
}
try {
await this.queryRpc(rpc, this._query);
} catch (e: any) {
throw e;
}
}
protected async checkResponses() {
let response: RPCResponse = this._response as RPCResponse;
if (this._error) {
response = { error: this._error };
}
if (
!response.error &&
!validateTimestampedResponse(
b4a.from(
await maybeGetAsyncProperty(this._relay.remotePublicKey),
"hex"
) as Buffer,
response
)
) {
response = { error: ERR_INVALID_SIGNATURE };
}
this.resolve(response);
}
}

View File

@ -1,4 +0,0 @@
export interface RpcQueryOptions {
queryTimeout?: number;
relayTimeout?: number;
}

View File

@ -1,143 +0,0 @@
// @ts-ignore
import stringify from "json-stringify-deterministic";
import type { RPCRequest, RPCResponse } from "@lumeweb/interface-relay";
// @ts-ignore
import crypto from "hypercore-crypto";
// @ts-ignore
import sodium from "sodium-universal";
import b4a from "b4a";
import RPC from "@lumeweb/rpc";
export const RPC_PROTOCOL_SYMBOL = Symbol.for("lumeweb");
export function isPromise(obj: Promise<any>) {
return (
!!obj &&
(typeof obj === "object" || typeof obj === "function") &&
typeof obj.then === "function"
);
}
/*
Forked from https://github.com/hughsk/flat
*/
export function flatten(target: any, opts: any = {}): any[] {
opts = opts || {};
const delimiter = opts.delimiter || ".";
const maxDepth = opts.maxDepth;
const transformKey =
opts.transformKey || ((key: any) => (isNaN(parseInt(key)) ? key : ""));
const output: any[] = [];
function step(object: any, prev?: any, currentDepth?: any) {
currentDepth = currentDepth || 1;
if (!Array.isArray(object)) {
object = Object.keys(object ?? {});
}
object.forEach(function (key: any) {
const value = object[key];
const isarray = opts.safe && Array.isArray(value);
const type = Object.prototype.toString.call(value);
const isbuffer = b4a.isBuffer(value);
const isobject = type === "[object Object]" || type === "[object Array]";
const newKey = prev
? prev + delimiter + transformKey(key)
: transformKey(key);
if (
!isarray &&
!isbuffer &&
isobject &&
Object.keys(value).length &&
(!opts.maxDepth || currentDepth < maxDepth)
) {
return step(value, newKey, currentDepth + 1);
}
output.push(`${newKey}=${value}`);
});
}
step(target);
return output;
}
export function validateResponse(
relay: Buffer,
response: RPCResponse,
timestamped = false
): boolean {
const field = response.signedField || "data";
// @ts-ignore
let json = response[field];
if (typeof json !== "string") {
json = stringify(json);
}
const updated = response.updated as number;
if (timestamped && updated) {
json = updated.toString() + json;
}
return !!crypto.verify(
b4a.from(json),
b4a.from(response.signature as string, "hex"),
relay
);
}
export function validateTimestampedResponse(
relay: Buffer,
response: RPCResponse
): boolean {
return validateResponse(relay, response, true);
}
export function hashQuery(query: RPCRequest): string {
const clonedQuery: RPCRequest = {
module: query.module,
method: query.method,
data: query.data,
};
const queryHash = Buffer.allocUnsafe(32);
sodium.crypto_generichash(queryHash, Buffer.from(stringify(clonedQuery)));
return queryHash.toString("hex");
}
export function createHash(data: string): Buffer {
const buffer = b4a.from(data);
let hash = b4a.allocUnsafe(32) as Buffer;
sodium.crypto_generichash(hash, buffer);
return hash;
}
export async function setupStream(stream: any) {
const existing = stream[RPC_PROTOCOL_SYMBOL];
if (existing) {
await existing.ready;
return existing;
}
const rpc = new RPC(stream);
stream[RPC_PROTOCOL_SYMBOL] = rpc;
await rpc.ready;
return rpc;
}
export async function maybeGetAsyncProperty(object: any) {
if (typeof object === "function") {
object = object();
}
if (isPromise(object)) {
object = await object;
}
return object;
}