feat: change workers to use a dedicated iframe that for now is hosted. This should unlock a dedicated thread per worker and prevent cpu fighting
This commit is contained in:
parent
9bf9f386f7
commit
cfeca11300
|
@ -11,6 +11,10 @@ export const handleIncomingMessage = function (event: any) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (event.origin.endsWith(".module.kernel.lumeweb.com")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Input validation.
|
// Input validation.
|
||||||
if (!("method" in event.data)) {
|
if (!("method" in event.data)) {
|
||||||
logErr("handleIncomingMessage", "kernel request is missing 'method' field");
|
logErr("handleIncomingMessage", "kernel request is missing 'method' field");
|
||||||
|
|
|
@ -23,10 +23,11 @@ import {
|
||||||
store as moduleStore,
|
store as moduleStore,
|
||||||
} from "./modules.js";
|
} from "./modules.js";
|
||||||
import { blake3 } from "@noble/hashes/blake3";
|
import { blake3 } from "@noble/hashes/blake3";
|
||||||
|
import Worker from "./worker.js";
|
||||||
|
|
||||||
// WorkerLaunchFn is the type signature of the function that launches the
|
// WorkerLaunchFn is the type signature of the function that launches the
|
||||||
// worker to set up for processing a query.
|
// worker to set up for processing a query.
|
||||||
type WorkerLaunchFn = () => [Worker, Err];
|
type WorkerLaunchFn = () => Promise<[Worker, Err]>;
|
||||||
|
|
||||||
// modules is a hashmap that maps from a domain to the module that handles
|
// modules is a hashmap that maps from a domain to the module that handles
|
||||||
// queries to that domain. It maintains the domain and URL of the module so
|
// queries to that domain. It maintains the domain and URL of the module so
|
||||||
|
@ -36,7 +37,7 @@ type WorkerLaunchFn = () => [Worker, Err];
|
||||||
// a new worker gets launched for every query.
|
// a new worker gets launched for every query.
|
||||||
interface Module {
|
interface Module {
|
||||||
domain: string;
|
domain: string;
|
||||||
url: string;
|
code: Uint8Array;
|
||||||
launchWorker: WorkerLaunchFn;
|
launchWorker: WorkerLaunchFn;
|
||||||
worker?: Worker;
|
worker?: Worker;
|
||||||
}
|
}
|
||||||
|
@ -223,20 +224,16 @@ async function createModule(
|
||||||
workerCode: Uint8Array,
|
workerCode: Uint8Array,
|
||||||
domain: string,
|
domain: string,
|
||||||
): Promise<[Module | null, Err]> {
|
): Promise<[Module | null, Err]> {
|
||||||
// Generate the URL for the worker code.
|
|
||||||
const url = URL.createObjectURL(new Blob([workerCode]));
|
|
||||||
|
|
||||||
// Create the module object.
|
// Create the module object.
|
||||||
const mod: Module = {
|
const mod: Module = {
|
||||||
domain,
|
domain,
|
||||||
url,
|
code: workerCode,
|
||||||
launchWorker: function (): [Worker, Err] {
|
launchWorker: function (): Promise<[Worker, Err]> {
|
||||||
return launchWorker(mod);
|
return launchWorker(mod);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start worker
|
// Start worker
|
||||||
const [worker, err] = mod.launchWorker();
|
const [worker, err] = await mod.launchWorker();
|
||||||
if (err !== null) {
|
if (err !== null) {
|
||||||
return [{} as Module, addContextToErr(err, "unable to launch worker")];
|
return [{} as Module, addContextToErr(err, "unable to launch worker")];
|
||||||
}
|
}
|
||||||
|
@ -247,11 +244,12 @@ async function createModule(
|
||||||
|
|
||||||
// launchWorker will launch a worker and perform all the setup so that the
|
// launchWorker will launch a worker and perform all the setup so that the
|
||||||
// worker is ready to receive a query.
|
// worker is ready to receive a query.
|
||||||
function launchWorker(mod: Module): [Worker, Err] {
|
async function launchWorker(mod: Module): Promise<[Worker, Err]> {
|
||||||
// Create and launch the worker.
|
// Create and launch the worker.
|
||||||
let worker: Worker;
|
let worker: Worker;
|
||||||
try {
|
try {
|
||||||
worker = new Worker(mod.url);
|
worker = new Worker(mod.code, CID.decode(mod.domain));
|
||||||
|
await worker.ready;
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
logErr("worker", mod.domain, "unable to create worker", mod.domain, err);
|
logErr("worker", mod.domain, "unable to create worker", mod.domain, err);
|
||||||
return [
|
return [
|
||||||
|
@ -260,6 +258,9 @@ function launchWorker(mod: Module): [Worker, Err] {
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clean up memory, we don't need the code anymore
|
||||||
|
mod.code = new Uint8Array();
|
||||||
|
|
||||||
// Set the onmessage and onerror functions.
|
// Set the onmessage and onerror functions.
|
||||||
worker.onmessage = function (event: MessageEvent) {
|
worker.onmessage = function (event: MessageEvent) {
|
||||||
handleWorkerMessage(event, mod, worker);
|
handleWorkerMessage(event, mod, worker);
|
||||||
|
@ -321,7 +322,6 @@ async function handleModuleCall(
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let validCid = false;
|
let validCid = false;
|
||||||
let isResolver = false;
|
let isResolver = false;
|
||||||
if (
|
if (
|
||||||
|
@ -588,8 +588,12 @@ async function handleModuleCall(
|
||||||
await (await moduleStore()).put(finalModule, moduleData);
|
await (await moduleStore()).put(finalModule, moduleData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!(moduleData instanceof Uint8Array)) {
|
||||||
|
moduleData = new TextEncoder().encode(moduleData);
|
||||||
|
}
|
||||||
|
|
||||||
// Create a new module.
|
// Create a new module.
|
||||||
const [mod, errCM] = await createModule(moduleData, moduleDomain);
|
const [mod, errCM] = await createModule(moduleData, finalModule);
|
||||||
if (errCM !== null) {
|
if (errCM !== null) {
|
||||||
const err = addContextToErr(errCM, "unable to create module");
|
const err = addContextToErr(errCM, "unable to create module");
|
||||||
respondErr(event, messagePortal, isWorker, isInternal, err);
|
respondErr(event, messagePortal, isWorker, isInternal, err);
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
import { CID } from "@lumeweb/libs5";
|
||||||
|
import { defer } from "@lumeweb/libkernel/module";
|
||||||
|
|
||||||
|
type WorkerMessage = (this: Worker, ev: MessageEvent) => any;
|
||||||
|
type WorkerError = (this: Worker, ev: ErrorEvent) => any;
|
||||||
|
|
||||||
|
export default class Worker {
|
||||||
|
private _code: Uint8Array;
|
||||||
|
private _errorHandler?: WorkerError;
|
||||||
|
private _messageHandler?: WorkerMessage;
|
||||||
|
private _iframe: HTMLIFrameElement;
|
||||||
|
private _iframeDefer = defer();
|
||||||
|
private _cid: CID;
|
||||||
|
|
||||||
|
constructor(code: Uint8Array, cid: CID) {
|
||||||
|
this._code = code;
|
||||||
|
this._cid = cid;
|
||||||
|
|
||||||
|
const iframe = document.createElement("iframe");
|
||||||
|
iframe.src = this.getModuleUrl();
|
||||||
|
iframe.onload = () => {
|
||||||
|
this._postMessage({
|
||||||
|
method: "workerInit",
|
||||||
|
module: this._code,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
this._iframe = iframe;
|
||||||
|
|
||||||
|
document.body.appendChild(iframe);
|
||||||
|
|
||||||
|
window.addEventListener("message", (event: MessageEvent) => {
|
||||||
|
if (event.source !== iframe.contentWindow) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.data.method === "workerMessage") {
|
||||||
|
this._messageHandler?.(
|
||||||
|
new MessageEvent("worker", { data: event.data.data }),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.data.method === "workerError") {
|
||||||
|
this._errorHandler?.(
|
||||||
|
new ErrorEvent("worker", {
|
||||||
|
message: event.data.data.message,
|
||||||
|
error: event.data.data.error,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.data.method === "workerInited") {
|
||||||
|
this._iframeDefer.resolve();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
set onmessage(handler: (this: Worker, ev: MessageEvent) => any) {
|
||||||
|
this._messageHandler = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
set onerror(handler: (this: Worker, ev: ErrorEvent) => any) {
|
||||||
|
this._errorHandler = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
get ready(): Promise<any> {
|
||||||
|
return this._iframeDefer.promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
postMessage(message: any) {
|
||||||
|
this._iframeDefer.promise.then(() => {
|
||||||
|
this._postMessage(message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_postMessage(message: any) {
|
||||||
|
this._iframe.contentWindow!.postMessage(message, this.getModuleUrl());
|
||||||
|
}
|
||||||
|
|
||||||
|
private getModuleUrl() {
|
||||||
|
return `https://${this._cid.toBase32()}.module.kernel.lumeweb.com`;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue