*Update to use new multisocket proxy
This commit is contained in:
parent
998d05a74c
commit
90c6f789cc
12
package.json
12
package.json
|
@ -8,21 +8,21 @@
|
|||
"devDependencies": {
|
||||
"@libp2p/interface-connection-encrypter": "^3.0.6",
|
||||
"@libp2p/interface-peer-id": "^2.0.1",
|
||||
"@libp2p/multistream-select": "^3.1.2",
|
||||
"@libp2p/multistream-select": "^3.1.4",
|
||||
"@lumeweb/relay-types": "git+https://git.lumeweb.com/LumeWeb/relay-types.git",
|
||||
"@types/b4a": "^1.6.0",
|
||||
"@types/streamx": "^2.9.1",
|
||||
"esbuild": "^0.15.18",
|
||||
"prettier": "^2.8.4",
|
||||
"prettier": "^2.8.7",
|
||||
"rimraf": "^3.0.2"
|
||||
},
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-noise": "^11.0.1",
|
||||
"@libp2p/interface-connection": "^3.1.0",
|
||||
"@libp2p/peer-id": "^2.0.2",
|
||||
"@chainsafe/libp2p-noise": "^11.0.4",
|
||||
"@libp2p/interface-connection": "^3.1.1",
|
||||
"@libp2p/peer-id": "^2.0.3",
|
||||
"@lumeweb/libhyperproxy": "git+https://git.lumeweb.com/LumeWeb/libhyperproxy.git",
|
||||
"@multiformats/multiaddr": "^11.6.1",
|
||||
"b4a": "^1.6.2",
|
||||
"b4a": "^1.6.3",
|
||||
"compact-encoding": "^2.11.0",
|
||||
"debug-stream": "^3.0.1",
|
||||
"serialize-error": "^11.0.0",
|
||||
|
|
22
src/index.ts
22
src/index.ts
|
@ -1,12 +1,5 @@
|
|||
import type { Plugin, PluginAPI } from "@lumeweb/relay-types";
|
||||
import { Peer, Proxy, Socket } from "@lumeweb/libhyperproxy";
|
||||
// @ts-ignore
|
||||
import debugStream from "debug-stream";
|
||||
// @ts-ignore
|
||||
import toIterable from "stream-to-it";
|
||||
// @ts-ignore
|
||||
import { fixed32, raw } from "compact-encoding";
|
||||
import PeerManager from "./peerManager";
|
||||
import { MultiSocketProxy } from "@lumeweb/libhyperproxy";
|
||||
|
||||
const PROTOCOL = "lumeweb.proxy.ipfs";
|
||||
|
||||
|
@ -18,22 +11,17 @@ interface PeerInfoResult {
|
|||
const plugin: Plugin = {
|
||||
name: "ipfs",
|
||||
async plugin(api: PluginAPI): Promise<void> {
|
||||
api.swarm.join(api.util.crypto.createHash(PROTOCOL));
|
||||
const proxy = new Proxy({
|
||||
const proxy = new MultiSocketProxy({
|
||||
swarm: api.swarm,
|
||||
protocol: PROTOCOL,
|
||||
allowedPorts: [5001, 5002],
|
||||
server: true,
|
||||
});
|
||||
api.swarm.join(api.util.crypto.createHash(PROTOCOL));
|
||||
api.protocols.register(PROTOCOL, (peer: any, muxer: any) => {
|
||||
proxy.handlePeer({
|
||||
peer,
|
||||
muxer,
|
||||
createDefaultMessage: false,
|
||||
onchannel(peer: Peer, channel: any) {
|
||||
PeerManager.instance(api).handleNewPeerChannel(peer, channel);
|
||||
},
|
||||
onclose(peer: Peer) {
|
||||
PeerManager.instance(api).handleClosePeer(peer);
|
||||
},
|
||||
});
|
||||
});
|
||||
},
|
||||
|
|
|
@ -1,220 +0,0 @@
|
|||
import { PluginAPI } from "@lumeweb/relay-types";
|
||||
import { Peer, Socket } from "@lumeweb/libhyperproxy";
|
||||
import net from "net";
|
||||
// @ts-ignore
|
||||
import { fixed32, json, raw, uint } from "compact-encoding";
|
||||
import b4a from "b4a";
|
||||
import {
|
||||
CloseSocketRequest,
|
||||
ErrorSocketRequest,
|
||||
PeerEntity,
|
||||
PeerInfoResult,
|
||||
SocketRequest,
|
||||
WriteSocketRequest,
|
||||
} from "./types";
|
||||
import { TCPSocket } from "./socket";
|
||||
import { serializeError } from "serialize-error";
|
||||
|
||||
const socketEncoding = {
|
||||
preencode(state: any, m: SocketRequest) {
|
||||
uint.preencode(state, m.id);
|
||||
uint.preencode(state, m.remoteId);
|
||||
},
|
||||
encode(state: any, m: SocketRequest) {
|
||||
uint.encode(state, m.id);
|
||||
uint.encode(state, m.remoteId);
|
||||
},
|
||||
decode(state: any, m: any): SocketRequest {
|
||||
return {
|
||||
remoteId: uint.decode(state, m),
|
||||
id: uint.decode(state, m),
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const writeSocketEncoding = {
|
||||
preencode(state: any, m: WriteSocketRequest) {
|
||||
socketEncoding.preencode(state, m);
|
||||
raw.preencode(state, m.data);
|
||||
},
|
||||
encode(state: any, m: WriteSocketRequest) {
|
||||
socketEncoding.encode(state, m);
|
||||
raw.encode(state, m.data);
|
||||
},
|
||||
decode(state: any, m: any): WriteSocketRequest {
|
||||
const socket = socketEncoding.decode(state, m);
|
||||
return {
|
||||
...socket,
|
||||
data: raw.decode(state, m),
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const errorSocketEncoding = {
|
||||
preencode(state: any, m: ErrorSocketRequest) {
|
||||
socketEncoding.preencode(state, m);
|
||||
json.preencode(state, serializeError(m.err));
|
||||
},
|
||||
encode(state: any, m: ErrorSocketRequest) {
|
||||
socketEncoding.encode(state, m);
|
||||
json.encode(state, serializeError(m.err));
|
||||
},
|
||||
};
|
||||
|
||||
function idFactory(start: number, step = 1, limit = 2 ** 32) {
|
||||
let id = start;
|
||||
|
||||
return function nextId() {
|
||||
const nextId = id;
|
||||
id += step;
|
||||
if (id >= limit) id = start;
|
||||
return nextId;
|
||||
};
|
||||
}
|
||||
|
||||
const nextSocketId = idFactory(1);
|
||||
|
||||
export default class PeerManager {
|
||||
private static _instance: PeerManager;
|
||||
private _api: PluginAPI;
|
||||
private _peers: Map<string, PeerEntity> = new Map<string, PeerEntity>();
|
||||
|
||||
constructor(api: PluginAPI) {
|
||||
this._api = api;
|
||||
}
|
||||
|
||||
private _sockets = new Map<number, TCPSocket>();
|
||||
|
||||
get sockets(): Map<number, TCPSocket> {
|
||||
return this._sockets;
|
||||
}
|
||||
|
||||
private _socketMap = new Map<number, number>();
|
||||
|
||||
get socketMap(): Map<number, number> {
|
||||
return this._socketMap;
|
||||
}
|
||||
|
||||
public static instance(api?: PluginAPI): PeerManager {
|
||||
if (!PeerManager._instance) {
|
||||
if (!api) {
|
||||
throw new Error("api argument required");
|
||||
}
|
||||
PeerManager._instance = new PeerManager(api as PluginAPI);
|
||||
}
|
||||
|
||||
return PeerManager._instance;
|
||||
}
|
||||
|
||||
handleNewPeerChannel(peer: Peer, channel: any) {
|
||||
this._registerOpenSocketMessage(peer, channel);
|
||||
this._registerWriteSocketMessage(peer, channel);
|
||||
this._registerCloseSocketMessage(peer, channel);
|
||||
this._registerTimeoutSocketMessage(peer, channel);
|
||||
this._registerErrorSocketMessage(peer, channel);
|
||||
}
|
||||
|
||||
public get(pubkey: Uint8Array): PeerEntity | undefined {
|
||||
if (this._peers.has(this._toString(pubkey))) {
|
||||
return this._peers.get(this._toString(pubkey)) as PeerEntity;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
public update(pubkey: Uint8Array, data: Partial<PeerEntity>): void {
|
||||
const peer = this.get(pubkey) ?? ({} as PeerEntity);
|
||||
|
||||
this._peers.set(this._toString(pubkey), {
|
||||
...peer,
|
||||
...data,
|
||||
...{
|
||||
messages: {
|
||||
...peer?.messages,
|
||||
...data?.messages,
|
||||
},
|
||||
},
|
||||
} as PeerEntity);
|
||||
}
|
||||
async handleClosePeer(peer: Peer) {
|
||||
for (const item of this._sockets) {
|
||||
if (item[1].peer.peer === peer) {
|
||||
item[1].end();
|
||||
}
|
||||
}
|
||||
|
||||
const pubkey = this._toString(peer.socket.remotePublicKey);
|
||||
|
||||
if (this._peers.has(pubkey)) {
|
||||
this._peers.delete(pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
private _registerOpenSocketMessage(peer: Peer, channel: any) {
|
||||
const self = this;
|
||||
const message = channel.addMessage({
|
||||
encoding: {
|
||||
...socketEncoding,
|
||||
decode: json.decode,
|
||||
},
|
||||
async onmessage(m: any) {
|
||||
// @ts-ignore
|
||||
new TCPSocket(
|
||||
nextSocketId(),
|
||||
m.id,
|
||||
self,
|
||||
self.get(peer.socket.remotePublicKey) as PeerEntity,
|
||||
m
|
||||
).connect();
|
||||
},
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { openSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerWriteSocketMessage(peer: Peer, channel: any) {
|
||||
const self = this;
|
||||
const message = channel.addMessage({
|
||||
encoding: writeSocketEncoding,
|
||||
onmessage(m: WriteSocketRequest) {
|
||||
self._sockets.get(m.id)?.push(m.data);
|
||||
},
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { writeSocket: message },
|
||||
});
|
||||
}
|
||||
private _registerCloseSocketMessage(peer: Peer, channel: any) {
|
||||
const self = this;
|
||||
const message = channel.addMessage({
|
||||
encoding: socketEncoding,
|
||||
async onmessage(m: CloseSocketRequest) {
|
||||
self._sockets.get(m.id)?.end();
|
||||
},
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { closeSocket: message },
|
||||
});
|
||||
}
|
||||
private _registerTimeoutSocketMessage(peer: Peer, channel: any) {
|
||||
const message = channel.addMessage({
|
||||
encoding: socketEncoding,
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { timeoutSocket: message },
|
||||
});
|
||||
}
|
||||
private _registerErrorSocketMessage(peer: Peer, channel: any) {
|
||||
const message = channel.addMessage({
|
||||
encoding: errorSocketEncoding,
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { errorSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _toString(pubkey: Uint8Array) {
|
||||
return b4a.from(pubkey).toString("hex");
|
||||
}
|
||||
}
|
101
src/socket.ts
101
src/socket.ts
|
@ -1,101 +0,0 @@
|
|||
import {
|
||||
Callback,
|
||||
Duplex,
|
||||
DuplexEvents,
|
||||
EventName,
|
||||
EventListener,
|
||||
} from "streamx";
|
||||
import net, { TcpSocketConnectOpts } from "net";
|
||||
import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types";
|
||||
import PeerManager from "./peerManager";
|
||||
import { Socket } from "net";
|
||||
|
||||
export class TCPSocket extends Duplex {
|
||||
private _options;
|
||||
private _id: number;
|
||||
private _remoteId: number;
|
||||
private _manager: PeerManager;
|
||||
|
||||
private _socket?: Socket;
|
||||
|
||||
constructor(
|
||||
id: number,
|
||||
remoteId: number,
|
||||
manager: PeerManager,
|
||||
peer: PeerEntity,
|
||||
options: TcpSocketConnectOpts
|
||||
) {
|
||||
super();
|
||||
this._remoteId = remoteId;
|
||||
this._manager = manager;
|
||||
this._id = id;
|
||||
this._peer = peer;
|
||||
this._options = options;
|
||||
|
||||
this._manager.sockets.set(this._id, this);
|
||||
this._manager.socketMap.set(this._id, this._remoteId);
|
||||
console.log(options);
|
||||
}
|
||||
|
||||
private _peer;
|
||||
|
||||
get peer() {
|
||||
return this._peer;
|
||||
}
|
||||
|
||||
public _write(data: any, cb: any): void {
|
||||
this._peer.messages.writeSocket?.send({
|
||||
...this._getSocketRequest(),
|
||||
data,
|
||||
} as WriteSocketRequest);
|
||||
cb();
|
||||
}
|
||||
|
||||
public _destroy(cb: Callback) {
|
||||
this._manager.sockets.delete(this._id);
|
||||
this._manager.socketMap.delete(this._id);
|
||||
this._peer.messages.closeSocket?.send(this._getSocketRequest());
|
||||
}
|
||||
|
||||
public connect() {
|
||||
this.on("error", (err: Error) => {
|
||||
this._peer.messages.errorSocket?.send({
|
||||
...this._getSocketRequest(),
|
||||
err,
|
||||
});
|
||||
});
|
||||
|
||||
// @ts-ignore
|
||||
this.on("timeout", () => {
|
||||
this._peer.messages.timeoutSocket?.send(this._getSocketRequest());
|
||||
});
|
||||
// @ts-ignore
|
||||
this.on("connect", () => {
|
||||
this._peer.messages.openSocket?.send(this._getSocketRequest());
|
||||
});
|
||||
|
||||
if (![4001, 4002].includes(this._options.port)) {
|
||||
this.emit("error", new Error(`port ${this._options.port} not allowed`));
|
||||
return;
|
||||
}
|
||||
|
||||
this._socket = net.connect(this._options);
|
||||
["timeout", "error", "connect", "end", "destroy", "close"].forEach(
|
||||
(event) => {
|
||||
this._socket?.on(event, (...args: any) =>
|
||||
this.emit(event as any, ...args)
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
this._socket.pipe(this as any);
|
||||
this.pipe(this._socket);
|
||||
}
|
||||
|
||||
private _getSocketRequest(): SocketRequest {
|
||||
return {
|
||||
id: this._id,
|
||||
remoteId: this._remoteId,
|
||||
};
|
||||
}
|
||||
}
|
36
src/types.ts
36
src/types.ts
|
@ -1,36 +0,0 @@
|
|||
import { Peer } from "@lumeweb/libhyperproxy";
|
||||
|
||||
export interface PeerInfoResult {
|
||||
publicKey: Uint8Array;
|
||||
libp2pPublicKey: Uint8Array;
|
||||
}
|
||||
|
||||
export type Message = {
|
||||
send: (pubkey: Uint8Array | any) => void;
|
||||
};
|
||||
|
||||
export interface PeerEntityMessages {
|
||||
keyExchange: Message;
|
||||
openSocket: Message;
|
||||
writeSocket: Message;
|
||||
closeSocket: Message;
|
||||
timeoutSocket: Message;
|
||||
errorSocket: Message;
|
||||
}
|
||||
|
||||
export interface PeerEntity {
|
||||
messages: PeerEntityMessages | Partial<PeerEntityMessages>;
|
||||
peer: Peer;
|
||||
}
|
||||
export interface SocketRequest {
|
||||
remoteId: number;
|
||||
id: number;
|
||||
}
|
||||
export type CloseSocketRequest = SocketRequest;
|
||||
|
||||
export interface WriteSocketRequest extends SocketRequest {
|
||||
data: Uint8Array;
|
||||
}
|
||||
export interface ErrorSocketRequest extends SocketRequest {
|
||||
err: Error;
|
||||
}
|
Loading…
Reference in New Issue