*Add support for a stream being canceled or aborted

This commit is contained in:
Derrick Hammer 2022-08-31 20:26:02 -04:00
parent 231c4a36b4
commit 8e881a7dc1
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 23 additions and 1 deletions

View File

@ -8,8 +8,13 @@ import { pack, unpack } from "msgpackr";
import log from "loglevel"; import log from "loglevel";
import { getRpcServer } from "./server"; import { getRpcServer } from "./server";
interface CancelRequest {
cancel: true;
}
export default class RPCConnection { export default class RPCConnection {
private _socket: any; private _socket: any;
private _canceled = false;
constructor(socket: any) { constructor(socket: any) {
this._socket = socket; this._socket = socket;
socket.rawStream._ondestroy = () => false; socket.rawStream._ondestroy = () => false;
@ -19,9 +24,21 @@ export default class RPCConnection {
private async checkRpc(data: Buffer) { private async checkRpc(data: Buffer) {
if (data.toString() === "rpc") { if (data.toString() === "rpc") {
this._socket.once("data", this.processRequest); this._socket.once("data", this.processRequest);
this._socket.on("data", this.listenForCancel);
} }
} }
private async listenForCancel(data: Buffer) {
let request: any;
try {
request = unpack(data) as CancelRequest;
} catch (e) {
return;
}
if (request.cancel) {
this._canceled = true;
}
}
private async processRequest(data: Buffer) { private async processRequest(data: Buffer) {
let request: RPCRequest; let request: RPCRequest;
try { try {
@ -44,6 +61,9 @@ export default class RPCConnection {
} as StreamFileResponse, } as StreamFileResponse,
}; };
for await (const chunk of stream) { for await (const chunk of stream) {
if (this._canceled) {
break;
}
streamResp.data.data = chunk as unknown as Uint8Array; streamResp.data.data = chunk as unknown as Uint8Array;
that.write(pack(streamResp)); that.write(pack(streamResp));
} }
@ -61,7 +81,9 @@ export default class RPCConnection {
that.end(); that.end();
return; return;
} }
if (!this._canceled) {
that.write(pack(response)); that.write(pack(response));
}
that.end(); that.end();
} }
} }