From 31eca18b6eee93b388759732f3c93ca5c2f53066 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 8 Apr 2023 14:33:41 -0400 Subject: [PATCH] *Initial version --- LICENSE | 37 +++++++ package.json | 17 +++ src/index.ts | 296 ++++++++++++++++++++++++++++++++++++++++++++++++++ tsconfig.json | 13 +++ 4 files changed, 363 insertions(+) create mode 100644 LICENSE create mode 100644 package.json create mode 100644 src/index.ts create mode 100644 tsconfig.json diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..9436ce7 --- /dev/null +++ b/LICENSE @@ -0,0 +1,37 @@ +MIT License + +Copyright (c) 2022 Hammer Technologies LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +ISC License + +Copyright (c) 2022 Kasper Isager Dalsgarð & Contributors + +Permission to use, copy, modify, and/or distribute this software for any +purpose with or without fee is hereby granted, provided that the above +copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +PERFORMANCE OF THIS SOFTWARE. diff --git a/package.json b/package.json new file mode 100644 index 0000000..a932f7b --- /dev/null +++ b/package.json @@ -0,0 +1,17 @@ +{ + "name": "@lumeweb/protomux-rpc-web", + "version": "0.1.0", + "type": "module", + "dependencies": { + "bits-to-bytes": "^1.3.0", + "compact-encoding": "^2.11.0", + "compact-encoding-bitfield": "^1.0.0", + "events": "^3.3.0", + "protomux": "^3.4.1" + }, + "devDependencies": { + "@types/node": "^18.15.11", + "prettier": "^2.8.7", + "typescript": "^5.0.4" + } +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..1dccad6 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,296 @@ +import EventEmitter from "events"; +// @ts-ignore +import Protomux from "protomux"; +// @ts-ignore +import c from "compact-encoding"; +// @ts-ignore +import bitfield from "compact-encoding-bitfield"; +// @ts-ignore +import bits from "bits-to-bytes"; +import * as buffer from "buffer"; + +export default class ProtomuxRPC extends EventEmitter { + private _id: number; + private _ending: boolean; + private _error?: Error; + private _responding: number; + private _requests: Map; + private _defaultValueEncoding: any; + private _responders: Map; + private _channel: any; + private _request: any; + private _response: any; + + constructor(stream: any, options: any = {}) { + super(); + + this._mux = Protomux.from(stream); + this._defaultValueEncoding = options?.valueEncoding; + + this._id = 1; + this._ending = false; + this._responding = 0; + + this._requests = new Map(); + this._responders = new Map(); + this._ready = this._init(options); + } + + private _mux: any; + + get mux() { + return this._mux; + } + + private _ready; + + get ready() { + return this._ready; + } + + get closed() { + return this._channel.closed; + } + + get stream() { + return this._mux.stream; + } + + private async _init(options: any) { + this._channel = await this._mux.createChannel({ + protocol: "protomux-rpc", + id: options?.id, + handshake: options?.handshake + ? options?.handshakeEncoding || c.raw + : null, + onopen: this._onopen.bind(this), + onclose: this._onclose.bind(this), + ondestroy: this._ondestroy.bind(this), + }); + + if (this._channel === null) throw new Error("duplicate channel"); + + this._request = await this._channel.addMessage({ + encoding: request, + onmessage: this._onrequest.bind(this), + }); + + this._response = await this._channel.addMessage({ + encoding: response, + onmessage: this._onresponse.bind(this), + }); + + this._channel.open(options?.handshake); + + await this._channel.ready; + } + + _onopen(handshake: any) { + this.emit("open", handshake); + } + + _onclose() { + const err = this._error || new Error("channel closed"); + + for (const request of this._requests.values()) { + request.reject(err); + } + + this._requests.clear(); + this._responders.clear(); + + this.emit("close"); + } + + _ondestroy() { + this.emit("destroy"); + } + + async _onrequest({ + id, + method, + value, + }: { + id: number; + method: string; + value: any; + }) { + let error = null; + + const responder = this._responders.get(method); + + if (responder === undefined) error = `unknown method '${method}'`; + else { + const { + valueEncoding = this._defaultValueEncoding, + requestEncoding = valueEncoding, + responseEncoding = valueEncoding, + } = responder.options; + + if (requestEncoding) value = c.decode(requestEncoding, value); + + this._responding++; + + try { + value = await responder.handler(value); + } catch (err: any) { + error = (err as Error).message; + } + + this._responding--; + + if (!error && responseEncoding && id) { + value = c.encode(responseEncoding, value); + } + } + + if (id) this._response.send({ id, error, value }); + + this._endMaybe(); + } + + _onresponse({ id, error, value }: { id: number; error: string; value: any }) { + if (id === 0) return; + + const request = this._requests.get(id); + + if (request === undefined) return; + + this._requests.delete(id); + + if (error) request.reject(new Error(error)); + else { + const { + valueEncoding = this._defaultValueEncoding, + responseEncoding = valueEncoding, + } = request.options; + + if (responseEncoding) value = c.decode(responseEncoding, value); + + request.resolve(value); + } + + this._endMaybe(); + } + + respond(method: string, options: Function | any, handler: Function) { + if (typeof options === "function") { + handler = options; + options = {}; + } + + this._responders.set(method, { options, handler }); + + return this; + } + + unrespond(method: string) { + this._responders.delete(method); + + return this; + } + + async request(method: string, value: any, options: any = {}) { + if (this.closed) throw new Error("channel closed"); + + const { + valueEncoding = this._defaultValueEncoding, + requestEncoding = valueEncoding, + } = options; + + if (requestEncoding) value = c.encode(requestEncoding, value); + + const id = this._id++; + + this._request.send({ id, method, value }); + + return new Promise((resolve, reject) => { + this._requests.set(id, { options, resolve, reject }); + }); + } + + event(method: string, value: any, options: any = {}) { + if (this.closed) throw new Error("channel closed"); + + const { + valueEncoding = this._defaultValueEncoding, + requestEncoding = valueEncoding, + } = options; + + if (requestEncoding) value = c.encode(requestEncoding, value); + + this._request.send({ id: 0, method, value }); + } + + cork() { + this._channel.cork(); + } + + uncork() { + this._channel.uncork(); + } + + async end() { + this._ending = true; + this._endMaybe(); + + await EventEmitter.once(this, "close"); + } + + _endMaybe() { + if (this._ending && this._responding === 0 && this._requests.size === 0) { + this._channel.close(); + } + } + + destroy(err: Error) { + this._error = err || new Error("channel destroyed"); + this._channel.close(); + } +} + +const request = { + preencode(state: Buffer, m: any) { + c.uint.preencode(state, m.id); + c.string.preencode(state, m.method); + c.raw.preencode(state, m.value); + }, + encode(state: Buffer, m: any) { + c.uint.encode(state, m.id); + c.string.encode(state, m.method); + c.raw.encode(state, m.value); + }, + decode(state: Buffer) { + return { + id: c.uint.decode(state), + method: c.string.decode(state), + value: c.raw.decode(state), + }; + }, +}; + +const flags = bitfield(1); + +const response = { + preencode(state: Buffer, m: any) { + flags.preencode(state); + c.uint.preencode(state, m.id); + if (m.error) c.string.preencode(state, m.error); + else c.raw.preencode(state, m.value); + }, + encode(state: Buffer, m: any) { + flags.encode(state, bits.of(m.error)); + c.uint.encode(state, m.id); + if (m.error) c.string.encode(state, m.error); + else c.raw.encode(state, m.value); + }, + decode(state: Buffer) { + const [error] = bits.iterator(flags.decode(state)); + + return { + id: c.uint.decode(state), + error: error ? c.string.decode(state) : null, + value: !error ? c.raw.decode(state) : null, + }; + }, +}; diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..e00e2c7 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "target": "esnext", + "declaration": true, + "moduleResolution": "node", + "outDir": "./dist", + "strict": true, + "allowSyntheticDefaultImports": true, + "esModuleInterop": true + }, + "include": ["src"], + "exclude": ["node_modules", "**/__tests__/*"] +}