refactor: change transport handling to be dynamic, so new ones can be registered

This commit is contained in:
Derrick Hammer 2023-08-31 05:04:33 -04:00
parent 08123762ce
commit db72e1eefd
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
6 changed files with 89 additions and 40 deletions

View File

@ -3,6 +3,11 @@ import type { S5NodeConfig } from "#node.js";
import type { SignedRegistryEntry } from "#service/registry.js";
export * from "./types.js";
export {
createTransportSocket,
isTransport,
createTransportPeer,
} from "./transports/index.js";
export type { S5NodeConfig, SignedRegistryEntry };
export function createNode(config: S5NodeConfig) {

View File

@ -25,9 +25,11 @@ import StorageLocation from "#storage.js";
import { addStorageLocation, S5Node, stringifyNode } from "#node.js";
import { URL } from "url";
import { Buffer } from "buffer";
import { connect as tcpConnect, TcpPeer } from "../peer/tcp.js";
import { connect as wsConnect, WebSocketPeer } from "../peer/webSocket.js";
import {
createTransportPeer,
createTransportSocket,
isTransport,
} from "#transports/index.js";
export class P2PService {
private node: S5Node;
private logger: Logger;
@ -199,7 +201,7 @@ export class P2PService {
peer.id = pId;
} else {
if (!peer.id.equals(pId)) {
throw "Invalid peer id on initial list";
throw "Invalid transports id on initial list";
}
}
@ -271,7 +273,7 @@ export class P2PService {
const pk = u.unpackBinary();
const sre = node.registry.getFromDB(pk);
if (sre !== null) {
peer.sendMessage(node.registry.serializeRegistryEntry(sre));
transports.sendMessage(node.registry.serializeRegistryEntry(sre));
}
}*/
},
@ -456,7 +458,7 @@ export class P2PService {
);
}
const protocol = connectionUri.protocol;
const protocol = connectionUri.protocol.replace(":", "");
if (!connectionUri.username) {
throw new Error("Connection URI does not contain node id");
@ -475,21 +477,12 @@ export class P2PService {
try {
this.logger.verbose(`[connect] ${connectionUri}`);
if (protocol === "tcp:") {
const ip = connectionUri.hostname;
const port = parseInt(connectionUri.port);
const socket = await tcpConnect(port, ip);
const peer = new TcpPeer(socket, [connectionUri]);
peer.id = id;
await this.onNewPeer(peer, true);
} else {
const channel = await wsConnect(connectionUri.toString());
const peer = new WebSocketPeer(channel, [connectionUri]);
peer.id = id;
await this.onNewPeer(peer, true);
}
const socket = await createTransportSocket(protocol, connectionUri);
await this.onNewPeer(
createTransportPeer(protocol, socket, [connectionUri]),
true,
);
} catch (e) {
if (retried) {
return;

42
src/transports/index.ts Normal file
View File

@ -0,0 +1,42 @@
import { URL } from "url";
import { TcpPeer } from "#transports/tcp.js";
import { WebSocketPeer } from "#transports/webSocket.js";
import { PeerStatic } from "#types.js";
const transports = new Map<string, PeerStatic>();
export function registerTransport(type: string, transport: PeerStatic) {
transports.set(type, transport);
}
export function isTransport(type: string) {
return transports.has(type);
}
export function createTransportSocket(type: string, uri: URL) {
if (!isTransport(type)) {
throw new Error(`transport ${type} does not exist`);
}
const transport = transports.get(type) as PeerStatic;
return transport.connect(uri);
}
export function createTransportPeer(
type: string,
socket: any,
connectionUris: URL[] = [],
) {
if (!isTransport(type)) {
throw new Error(`transport ${type} does not exist`);
}
const transport = transports.get(type) as PeerStatic;
return new transport(socket, connectionUris);
}
registerTransport("tcp", TcpPeer);
registerTransport("ws", WebSocketPeer);
registerTransport("wss", WebSocketPeer);

View File

@ -3,6 +3,7 @@ import NodeId from "../nodeId.js";
import * as net from "net";
import { URL } from "url";
import { decodeEndian } from "../util.js";
import * as console from "console";
export class TcpPeer implements Peer {
connectionUris: Array<URL>;
@ -79,15 +80,17 @@ export class TcpPeer implements Peer {
this._socket.on("error", onError);
}
}
}
export async function connect(port: number, host: string): Promise<net.Socket> {
return new Promise((resolve, reject) => {
const socket = net.connect(port, host, () => {
resolve(socket);
public static async connect(uri: URL): Promise<net.Socket> {
const host = uri.hostname;
const port = parseInt(uri.port);
return new Promise((resolve, reject) => {
const socket = net.connect(port, host, () => {
resolve(socket);
});
socket.on("error", (err) => {
reject(err);
});
});
socket.on("error", (err) => {
reject(err);
});
});
}
}

View File

@ -56,15 +56,16 @@ export class WebSocketPeer implements Peer {
this._socket.addEventListener("error", onError);
}
}
}
export async function connect(uri: string): Promise<WebSocket> {
return new Promise((resolve, reject) => {
const socket = new WebSocket(uri);
socket.addEventListener("open", () => {
resolve(socket);
public static async connect(uri: URL): Promise<WebSocket> {
return new Promise((resolve, reject) => {
const socket = new WebSocket(uri);
socket.addEventListener("open", () => {
resolve(socket);
});
socket.addEventListener("error", (err) => {
reject(err);
});
});
socket.addEventListener("error", (err) => {
reject(err);
});
});
}
}

View File

@ -9,7 +9,6 @@ export interface Peer {
connectionUris: Array<URL>;
isConnected: boolean;
challenge: Uint8Array;
sendMessage(message: Uint8Array): void;
listenForMessages(
@ -28,6 +27,12 @@ export interface Peer {
renderLocationUri(): string;
}
// Define the static side of the class
export interface PeerStatic {
new (_socket: any, uri: URL[]): Peer;
connect(uri: URL): Promise<any>;
}
export interface Logger {
info(s: string): void;
verbose(s: string): void;