*Add timeout support to broadcast request with a default timeout of 5 seconds.
ci/woodpecker/push/woodpecker Pipeline was successful
Details
ci/woodpecker/push/woodpecker Pipeline was successful
Details
This commit is contained in:
parent
cd2f3a415c
commit
884ba62bde
|
@ -52,6 +52,7 @@
|
||||||
"node-cache": "^5.1.2",
|
"node-cache": "^5.1.2",
|
||||||
"node-cron": "^3.0.1",
|
"node-cron": "^3.0.1",
|
||||||
"node-fetch": "2",
|
"node-fetch": "2",
|
||||||
|
"p-timeout": "^6.0.0",
|
||||||
"promise-retry": "^2.0.1",
|
"promise-retry": "^2.0.1",
|
||||||
"protomux": "^3.4.0",
|
"protomux": "^3.4.0",
|
||||||
"protomux-rpc": "^1.3.0",
|
"protomux-rpc": "^1.3.0",
|
||||||
|
@ -75,9 +76,5 @@
|
||||||
"rollup": "^2.77.0",
|
"rollup": "^2.77.0",
|
||||||
"supports-color": "https://github.com/LumeWeb/supports-color.git",
|
"supports-color": "https://github.com/LumeWeb/supports-color.git",
|
||||||
"typescript": "^4.7.4"
|
"typescript": "^4.7.4"
|
||||||
},
|
|
||||||
"resolutions": {
|
|
||||||
"**/@hyperswarm/dht": "^6.0.1",
|
|
||||||
"**/node-fetch": "^2.6.6"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,17 +11,19 @@ import {
|
||||||
import { getRpcByPeer } from "../rpc";
|
import { getRpcByPeer } from "../rpc";
|
||||||
import { get as getSwarm, LUMEWEB_TOPIC_HASH } from "../swarm";
|
import { get as getSwarm, LUMEWEB_TOPIC_HASH } from "../swarm";
|
||||||
import b4a from "b4a";
|
import b4a from "b4a";
|
||||||
|
import pTimeout, { ClearablePromise } from "p-timeout";
|
||||||
|
|
||||||
async function broadcastRequest(
|
async function broadcastRequest(
|
||||||
request: RPCRequest,
|
request: RPCRequest,
|
||||||
relays: string[]
|
relays: string[],
|
||||||
|
timeout = 5000
|
||||||
): Promise<Map<string, Promise<any>>> {
|
): Promise<Map<string, Promise<any>>> {
|
||||||
const makeRequest = async (relay: string) => {
|
const makeRequest = async (relay: string) => {
|
||||||
const rpc = await getRpcByPeer(relay);
|
const rpc = await getRpcByPeer(relay);
|
||||||
return rpc.request(`${request.module}.${request.method}`, request.data);
|
return rpc.request(`${request.module}.${request.method}`, request.data);
|
||||||
};
|
};
|
||||||
|
|
||||||
let relayMap = new Map<string, Promise<any>>();
|
let relayMap = new Map<string, ClearablePromise<any>>();
|
||||||
|
|
||||||
for (const relay of relays) {
|
for (const relay of relays) {
|
||||||
let req;
|
let req;
|
||||||
|
@ -31,10 +33,12 @@ async function broadcastRequest(
|
||||||
req = makeRequest(relay);
|
req = makeRequest(relay);
|
||||||
}
|
}
|
||||||
|
|
||||||
relayMap.set(
|
let timeoutPromise = pTimeout(req, {
|
||||||
relay,
|
milliseconds: timeout,
|
||||||
req.catch((error: Error) => error)
|
message: `relay timed out after ${timeout} milliseconds`,
|
||||||
);
|
});
|
||||||
|
|
||||||
|
relayMap.set(relay, timeoutPromise);
|
||||||
}
|
}
|
||||||
|
|
||||||
await Promise.allSettled([...relays.values()]);
|
await Promise.allSettled([...relays.values()]);
|
||||||
|
@ -100,7 +104,7 @@ const plugin: Plugin = {
|
||||||
throw new Error("recursive broadcast_request calls are not allowed");
|
throw new Error("recursive broadcast_request calls are not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
let resp = await broadcastRequest(req.request, req.relays);
|
let resp = await broadcastRequest(req.request, req.relays, req.timeout);
|
||||||
|
|
||||||
const result: RPCBroadcastResponse = {
|
const result: RPCBroadcastResponse = {
|
||||||
relays: {},
|
relays: {},
|
||||||
|
|
Loading…
Reference in New Issue