diff --git a/src/index.ts b/src/index.ts index cd97eff..429822e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,11 @@ import type { S5NodeConfig } from "#node.js"; import type { SignedRegistryEntry } from "#service/registry.js"; export * from "./types.js"; +export { + createTransportSocket, + isTransport, + createTransportPeer, +} from "./transports/index.js"; export type { S5NodeConfig, SignedRegistryEntry }; export function createNode(config: S5NodeConfig) { diff --git a/src/service/p2p.ts b/src/service/p2p.ts index 2d05856..7ed03d1 100644 --- a/src/service/p2p.ts +++ b/src/service/p2p.ts @@ -25,9 +25,11 @@ import StorageLocation from "#storage.js"; import { addStorageLocation, S5Node, stringifyNode } from "#node.js"; import { URL } from "url"; import { Buffer } from "buffer"; -import { connect as tcpConnect, TcpPeer } from "../peer/tcp.js"; -import { connect as wsConnect, WebSocketPeer } from "../peer/webSocket.js"; - +import { + createTransportPeer, + createTransportSocket, + isTransport, +} from "#transports/index.js"; export class P2PService { private node: S5Node; private logger: Logger; @@ -199,7 +201,7 @@ export class P2PService { peer.id = pId; } else { if (!peer.id.equals(pId)) { - throw "Invalid peer id on initial list"; + throw "Invalid transports id on initial list"; } } @@ -271,7 +273,7 @@ export class P2PService { const pk = u.unpackBinary(); const sre = node.registry.getFromDB(pk); if (sre !== null) { - peer.sendMessage(node.registry.serializeRegistryEntry(sre)); + transports.sendMessage(node.registry.serializeRegistryEntry(sre)); } }*/ }, @@ -456,7 +458,7 @@ export class P2PService { ); } - const protocol = connectionUri.protocol; + const protocol = connectionUri.protocol.replace(":", ""); if (!connectionUri.username) { throw new Error("Connection URI does not contain node id"); @@ -475,21 +477,12 @@ export class P2PService { try { this.logger.verbose(`[connect] ${connectionUri}`); - if (protocol === "tcp:") { - const ip = connectionUri.hostname; - const port = parseInt(connectionUri.port); - const socket = await tcpConnect(port, ip); - const peer = new TcpPeer(socket, [connectionUri]); - peer.id = id; - - await this.onNewPeer(peer, true); - } else { - const channel = await wsConnect(connectionUri.toString()); - const peer = new WebSocketPeer(channel, [connectionUri]); - peer.id = id; - await this.onNewPeer(peer, true); - } + const socket = await createTransportSocket(protocol, connectionUri); + await this.onNewPeer( + createTransportPeer(protocol, socket, [connectionUri]), + true, + ); } catch (e) { if (retried) { return; diff --git a/src/transports/index.ts b/src/transports/index.ts new file mode 100644 index 0000000..ee3b81d --- /dev/null +++ b/src/transports/index.ts @@ -0,0 +1,42 @@ +import { URL } from "url"; +import { TcpPeer } from "#transports/tcp.js"; +import { WebSocketPeer } from "#transports/webSocket.js"; +import { PeerStatic } from "#types.js"; + +const transports = new Map(); + +export function registerTransport(type: string, transport: PeerStatic) { + transports.set(type, transport); +} + +export function isTransport(type: string) { + return transports.has(type); +} + +export function createTransportSocket(type: string, uri: URL) { + if (!isTransport(type)) { + throw new Error(`transport ${type} does not exist`); + } + + const transport = transports.get(type) as PeerStatic; + + return transport.connect(uri); +} + +export function createTransportPeer( + type: string, + socket: any, + connectionUris: URL[] = [], +) { + if (!isTransport(type)) { + throw new Error(`transport ${type} does not exist`); + } + + const transport = transports.get(type) as PeerStatic; + + return new transport(socket, connectionUris); +} + +registerTransport("tcp", TcpPeer); +registerTransport("ws", WebSocketPeer); +registerTransport("wss", WebSocketPeer); diff --git a/src/peer/tcp.ts b/src/transports/tcp.ts similarity index 84% rename from src/peer/tcp.ts rename to src/transports/tcp.ts index 62f9b0e..1a0ffe0 100644 --- a/src/peer/tcp.ts +++ b/src/transports/tcp.ts @@ -3,6 +3,7 @@ import NodeId from "../nodeId.js"; import * as net from "net"; import { URL } from "url"; import { decodeEndian } from "../util.js"; +import * as console from "console"; export class TcpPeer implements Peer { connectionUris: Array; @@ -79,15 +80,17 @@ export class TcpPeer implements Peer { this._socket.on("error", onError); } } -} -export async function connect(port: number, host: string): Promise { - return new Promise((resolve, reject) => { - const socket = net.connect(port, host, () => { - resolve(socket); + public static async connect(uri: URL): Promise { + const host = uri.hostname; + const port = parseInt(uri.port); + return new Promise((resolve, reject) => { + const socket = net.connect(port, host, () => { + resolve(socket); + }); + socket.on("error", (err) => { + reject(err); + }); }); - socket.on("error", (err) => { - reject(err); - }); - }); + } } diff --git a/src/peer/webSocket.ts b/src/transports/webSocket.ts similarity index 81% rename from src/peer/webSocket.ts rename to src/transports/webSocket.ts index d0a5d56..0db4b9e 100644 --- a/src/peer/webSocket.ts +++ b/src/transports/webSocket.ts @@ -56,15 +56,16 @@ export class WebSocketPeer implements Peer { this._socket.addEventListener("error", onError); } } -} -export async function connect(uri: string): Promise { - return new Promise((resolve, reject) => { - const socket = new WebSocket(uri); - socket.addEventListener("open", () => { - resolve(socket); + + public static async connect(uri: URL): Promise { + return new Promise((resolve, reject) => { + const socket = new WebSocket(uri); + socket.addEventListener("open", () => { + resolve(socket); + }); + socket.addEventListener("error", (err) => { + reject(err); + }); }); - socket.addEventListener("error", (err) => { - reject(err); - }); - }); + } } diff --git a/src/types.ts b/src/types.ts index f765b1c..47e7815 100644 --- a/src/types.ts +++ b/src/types.ts @@ -9,7 +9,6 @@ export interface Peer { connectionUris: Array; isConnected: boolean; challenge: Uint8Array; - sendMessage(message: Uint8Array): void; listenForMessages( @@ -28,6 +27,12 @@ export interface Peer { renderLocationUri(): string; } +// Define the static side of the class +export interface PeerStatic { + new (_socket: any, uri: URL[]): Peer; + connect(uri: URL): Promise; +} + export interface Logger { info(s: string): void; verbose(s: string): void;