diff --git a/src/rpc/connection.ts b/src/rpc/connection.ts index 9db7ba3..70763fd 100644 --- a/src/rpc/connection.ts +++ b/src/rpc/connection.ts @@ -8,8 +8,13 @@ import { pack, unpack } from "msgpackr"; import log from "loglevel"; import { getRpcServer } from "./server"; +interface CancelRequest { + cancel: true; +} + export default class RPCConnection { private _socket: any; + private _canceled = false; constructor(socket: any) { this._socket = socket; socket.rawStream._ondestroy = () => false; @@ -19,9 +24,21 @@ export default class RPCConnection { private async checkRpc(data: Buffer) { if (data.toString() === "rpc") { this._socket.once("data", this.processRequest); + this._socket.on("data", this.listenForCancel); } } + private async listenForCancel(data: Buffer) { + let request: any; + try { + request = unpack(data) as CancelRequest; + } catch (e) { + return; + } + if (request.cancel) { + this._canceled = true; + } + } private async processRequest(data: Buffer) { let request: RPCRequest; try { @@ -44,6 +61,9 @@ export default class RPCConnection { } as StreamFileResponse, }; for await (const chunk of stream) { + if (this._canceled) { + break; + } streamResp.data.data = chunk as unknown as Uint8Array; that.write(pack(streamResp)); } @@ -61,7 +81,9 @@ export default class RPCConnection { that.end(); return; } - that.write(pack(response)); + if (!this._canceled) { + that.write(pack(response)); + } that.end(); } }