*we don't need a slave mode, just use this async fork on the slave webworkers

This commit is contained in:
Derrick Hammer 2023-04-07 03:34:19 -04:00
parent 2f0deaf2bf
commit f9955c18d2
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 2 additions and 135 deletions

137
index.js
View File

@ -305,7 +305,6 @@ module.exports = class Protomux {
this._infos = new Map(); this._infos = new Map();
this._notify = new Map(); this._notify = new Map();
this._slave = slave;
this.stream.on("data", this._ondata.bind(this)); this.stream.on("data", this._ondata.bind(this));
this.stream.on("end", this._onend.bind(this)); this.stream.on("end", this._onend.bind(this));
@ -357,67 +356,7 @@ module.exports = class Protomux {
const info = this._infos.get(key); const info = this._infos.get(key);
return info ? info.opened > 0 : false; return info ? info.opened > 0 : false;
} }
async createChannel({
_createChannel({
userData = null,
protocol,
aliases = [],
id = null,
unique = true,
handshake = null,
messages = [],
onopen = noop,
onclose = noop,
ondestroy = noop,
}) {
if (this.stream.destroyed) return null;
const info = this._get(protocol, id, aliases);
if (unique && info.opened > 0) return null;
if (info.incoming.length === 0) {
return new Channel(
this,
info,
userData,
protocol,
aliases,
id,
handshake,
messages,
onopen,
onclose,
ondestroy
);
}
this._remoteBacklog--;
const remoteId = info.incoming.shift();
const r = this._remote[remoteId - 1];
if (r === null) return null;
const session = new Channel(
this,
info,
userData,
protocol,
aliases,
id,
handshake,
messages,
onopen,
onclose,
ondestroy
);
session._remoteId = remoteId;
session._fullyOpenSoon();
return session;
}
async _createChannelAsync({
userData = null, userData = null,
protocol, protocol,
aliases = [], aliases = [],
@ -481,14 +420,6 @@ module.exports = class Protomux {
return session; return session;
} }
createChannel(options) {
if (this._slave) {
return this._createChannelAsync(options);
}
return this._createChannel(options);
}
_pushBatch(localId, buffer) { _pushBatch(localId, buffer) {
if (this._batchState.end >= MAX_BATCH) { if (this._batchState.end >= MAX_BATCH) {
this._sendBatch(this._batch, this._batchState); this._sendBatch(this._batch, this._batchState);
@ -530,43 +461,7 @@ module.exports = class Protomux {
this.stream.write(state.buffer); this.stream.write(state.buffer);
} }
_get(protocol, id, aliases = []) { async _get(protocol, id, aliases = []) {
if (this._slave) {
return this._getAsync(protocol, id, aliases);
}
return this.__get(protocol, id, aliases);
}
__get(protocol, id, aliases) {
const key = toKey(protocol, id);
let info = this._infos.get(key);
if (info) return info;
info = {
key,
protocol,
aliases: [],
id,
pairing: 0,
opened: 0,
incoming: [],
outgoing: [],
};
this._infos.set(key, info);
for (const alias of aliases) {
const key = toKey(alias, id);
info.aliases.push(key);
this._infos.set(key, info);
}
return info;
}
async _getAsync(protocol, id, aliases = []) {
const key = toKey(protocol, id); const key = toKey(protocol, id);
await this.pullInfos(); await this.pullInfos();
@ -904,33 +799,18 @@ module.exports = class Protomux {
} }
async popFree(id) { async popFree(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("popFree", id); await this.stream.syncProtomux("popFree", id);
} }
async pushFree(id) { async pushFree(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushFree", id); await this.stream.syncProtomux("pushFree", id);
} }
async pushLocal(id) { async pushLocal(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushLocal", id); await this.stream.syncProtomux("pushLocal", id);
} }
async pushRemote(id) { async pushRemote(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushRemote", id); await this.stream.syncProtomux("pushRemote", id);
} }
async pushInfos() { async pushInfos() {
if (!this._slave) {
return;
}
await this.stream.syncProtomux( await this.stream.syncProtomux(
"pushInfos", "pushInfos",
Array.from(this._infos.entries()) Array.from(this._infos.entries())
@ -938,9 +818,6 @@ module.exports = class Protomux {
} }
async pullLocal() { async pullLocal() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullLocal"); const ids = await this.stream.syncProtomux("pullLocal");
ids.forEach((item) => { ids.forEach((item) => {
@ -951,18 +828,12 @@ module.exports = class Protomux {
}); });
} }
async pullFree() { async pullFree() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullFree"); const ids = await this.stream.syncProtomux("pullFree");
this._free = Array.from(new Set([...this._free, ...ids])); this._free = Array.from(new Set([...this._free, ...ids]));
this._free = this._free.filter((item) => item !== null); this._free = this._free.filter((item) => item !== null);
} }
async pullRemote() { async pullRemote() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullRemote"); const ids = await this.stream.syncProtomux("pullRemote");
ids.forEach((item) => { ids.forEach((item) => {
@ -973,10 +844,6 @@ module.exports = class Protomux {
}); });
} }
async pullInfos() { async pullInfos() {
if (!this._slave) {
return;
}
const info = await this.stream.syncProtomux( const info = await this.stream.syncProtomux(
"pullInfos", "pullInfos",
Array.from(this._infos.entries()) Array.from(this._infos.entries())