diff --git a/src/dns/tcpClient.ts b/src/dns/tcpClient.ts new file mode 100644 index 0000000..82b24e3 --- /dev/null +++ b/src/dns/tcpClient.ts @@ -0,0 +1,132 @@ +import { Client as Base, TCPSocket } from "bns/lib/internal/net.js"; +import IP from "binet"; +import { Duplex } from "streamx"; +import { MultiSocketProxy } from "@lumeweb/libhyperproxy"; +import defer from "p-defer"; + +class Stream extends Duplex { + public remoteAddress = "127.0.0.1"; + public family = "IPv4"; + public port = 0; + public remotePort = 0; + public parent: any; + + constructor(options) { + super(options); + this.parent = options.parent; + } + + setNoDelay() {} + + connect() { + setTimeout(() => { + // @ts-ignore + this.emit("connect"); + }, 1); + } + + unref() {} +} + +const PROTOCOL = "lumeweb.proxy.handshake.dns"; + +export default class TCPClient extends Base { + private node: any; + private proxy: MultiSocketProxy; + private peerConnected = defer(); + + constructor(options) { + super(options); + + this.node = options.node; + this.proxy = new MultiSocketProxy({ + protocol: PROTOCOL, + swarm: options.swarm, + server: false, + autostart: true, + listen: true, + }); + + this.proxy.on("peerChannelOpen", () => { + this.peerConnected.resolve(); + }); + } + + protected declare sockets: Map; + + async write(msg, port, host) { + const local = host === "127.0.0.1"; + const key = IP.toHost(host, port); + const cache = this.sockets.get(key); + + if (cache) { + cache.write(msg); + return; + } + + let socket: any = null; + + try { + if (local) { + socket = await this.createLocalSocket(); + } else { + await this.peerConnected.promise; + socket = new TCPSocket(this); + socket.socket = this.proxy.createSocket({ + host, + port, + }); + socket.socket.remoteAddress = host; + socket.socket.remotePort = port; + + await socket.connect(); + } + } catch (e) { + return; + } + + if (this.sockets.has(key)) { + socket.destroy(); + socket = this.sockets.get(key); + } else { + socket.parent = this; + this.sockets.set(key, socket); + } + + socket.write(msg); + } + + async createLocalSocket() { + const socket = new TCPSocket(this); + const remoteSocket = new Stream({ + parent: this, + write: (data, cb) => { + // @ts-ignore + socket.socket.push(data); + cb?.(null); + }, + }); + // @ts-ignore + socket.socket = new Stream({ + parent: this, + write: (data, cb) => { + // @ts-ignore + remoteSocket.push(data); + cb?.(null); + }, + }); + const port = this.node.rs.hns.getAuthority().servers.slice().pop().port; + remoteSocket.port = port; + remoteSocket.remotePort = port; + // @ts-ignore + socket.socket.port = port; + // @ts-ignore + socket.socket.remotePort = port; + + await socket.connect(); + + this.node.ns.server.server.emit("connection", remoteSocket); + + return socket; + } +} diff --git a/src/index.ts b/src/index.ts index a97946c..c00926a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,8 @@ import { SPVNode } from "hsd/lib/node"; import defer from "p-defer"; import dns from "@i2labs/dns"; import assert from "assert"; +import { wire } from "bns"; +import TCPClient from "./dns/tcpClient.js"; const PROTOCOL = "lumeweb.proxy.handshake"; const TYPES = ["blockchain"]; @@ -29,6 +31,7 @@ addHandler("status", handleStatus, { receiveUpdates: true }); addHandler("name", handleName); addHandler("ready", handleReady); addHandler("query", handleQuery); +addHandler("dnsQuery", handleDnsQuery); let swarm; let proxy: MultiSocketProxy; @@ -75,7 +78,6 @@ async function handlePresentKey(aq: ActiveQuery) { config: false, argv: false, env: false, - noDns: true, memory: false, logFile: false, logConsole: true, @@ -143,6 +145,10 @@ async function handlePresentKey(aq: ActiveQuery) { node.http.http.listen = (port: number, host: string, cb: Function) => cb(); } + node.rs.hns.forceTCP = true; + node.rs.hns.socket = new TCPClient({ node, swarm }); + node.rs.hns.init(); + proxy = new MultiSocketProxy({ protocol: PROTOCOL, swarm, @@ -196,6 +202,33 @@ async function handleQuery(aq: ActiveQuery) { } } +async function handleDnsQuery(aq: ActiveQuery) { + if (!node.chain.synced || !node.pool.peers.head()) { + aq.reject("not ready"); + return; + } + + if (!("fqdn" in aq.callerInput)) { + aq.reject("fqdn required"); + return; + } + + if (!("type" in aq.callerInput)) { + aq.reject("type required"); + return; + } + + const msg = new wire.Message(); + const q = new wire.Question(aq.callerInput.fqdn, aq.callerInput.type); + msg.question.push(q); + + const ret = await node.rs.answer(msg); + + aq.respond( + ret.collect(aq.callerInput.fqdn, wire.stringToType(aq.callerInput.type)), + ); +} + async function handleRegister(aq: ActiveQuery) { await networkRegistry.registerNetwork(TYPES);