feat: add dns query api that uses mock sockets and a dedicated hyper proxy for DNS (port 53) requests

This commit is contained in:
Derrick Hammer 2023-08-02 17:03:06 -04:00
parent 8bcb788286
commit d675a44b4e
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 166 additions and 1 deletions

132
src/dns/tcpClient.ts Normal file
View File

@ -0,0 +1,132 @@
import { Client as Base, TCPSocket } from "bns/lib/internal/net.js";
import IP from "binet";
import { Duplex } from "streamx";
import { MultiSocketProxy } from "@lumeweb/libhyperproxy";
import defer from "p-defer";
class Stream extends Duplex {
public remoteAddress = "127.0.0.1";
public family = "IPv4";
public port = 0;
public remotePort = 0;
public parent: any;
constructor(options) {
super(options);
this.parent = options.parent;
}
setNoDelay() {}
connect() {
setTimeout(() => {
// @ts-ignore
this.emit("connect");
}, 1);
}
unref() {}
}
const PROTOCOL = "lumeweb.proxy.handshake.dns";
export default class TCPClient extends Base {
private node: any;
private proxy: MultiSocketProxy;
private peerConnected = defer();
constructor(options) {
super(options);
this.node = options.node;
this.proxy = new MultiSocketProxy({
protocol: PROTOCOL,
swarm: options.swarm,
server: false,
autostart: true,
listen: true,
});
this.proxy.on("peerChannelOpen", () => {
this.peerConnected.resolve();
});
}
protected declare sockets: Map<any, any>;
async write(msg, port, host) {
const local = host === "127.0.0.1";
const key = IP.toHost(host, port);
const cache = this.sockets.get(key);
if (cache) {
cache.write(msg);
return;
}
let socket: any = null;
try {
if (local) {
socket = await this.createLocalSocket();
} else {
await this.peerConnected.promise;
socket = new TCPSocket(this);
socket.socket = this.proxy.createSocket({
host,
port,
});
socket.socket.remoteAddress = host;
socket.socket.remotePort = port;
await socket.connect();
}
} catch (e) {
return;
}
if (this.sockets.has(key)) {
socket.destroy();
socket = this.sockets.get(key);
} else {
socket.parent = this;
this.sockets.set(key, socket);
}
socket.write(msg);
}
async createLocalSocket() {
const socket = new TCPSocket(this);
const remoteSocket = new Stream({
parent: this,
write: (data, cb) => {
// @ts-ignore
socket.socket.push(data);
cb?.(null);
},
});
// @ts-ignore
socket.socket = new Stream({
parent: this,
write: (data, cb) => {
// @ts-ignore
remoteSocket.push(data);
cb?.(null);
},
});
const port = this.node.rs.hns.getAuthority().servers.slice().pop().port;
remoteSocket.port = port;
remoteSocket.remotePort = port;
// @ts-ignore
socket.socket.port = port;
// @ts-ignore
socket.socket.remotePort = port;
await socket.connect();
this.node.ns.server.server.emit("connection", remoteSocket);
return socket;
}
}

View File

@ -12,6 +12,8 @@ import { SPVNode } from "hsd/lib/node";
import defer from "p-defer";
import dns from "@i2labs/dns";
import assert from "assert";
import { wire } from "bns";
import TCPClient from "./dns/tcpClient.js";
const PROTOCOL = "lumeweb.proxy.handshake";
const TYPES = ["blockchain"];
@ -29,6 +31,7 @@ addHandler("status", handleStatus, { receiveUpdates: true });
addHandler("name", handleName);
addHandler("ready", handleReady);
addHandler("query", handleQuery);
addHandler("dnsQuery", handleDnsQuery);
let swarm;
let proxy: MultiSocketProxy;
@ -75,7 +78,6 @@ async function handlePresentKey(aq: ActiveQuery) {
config: false,
argv: false,
env: false,
noDns: true,
memory: false,
logFile: false,
logConsole: true,
@ -143,6 +145,10 @@ async function handlePresentKey(aq: ActiveQuery) {
node.http.http.listen = (port: number, host: string, cb: Function) => cb();
}
node.rs.hns.forceTCP = true;
node.rs.hns.socket = new TCPClient({ node, swarm });
node.rs.hns.init();
proxy = new MultiSocketProxy({
protocol: PROTOCOL,
swarm,
@ -196,6 +202,33 @@ async function handleQuery(aq: ActiveQuery) {
}
}
async function handleDnsQuery(aq: ActiveQuery) {
if (!node.chain.synced || !node.pool.peers.head()) {
aq.reject("not ready");
return;
}
if (!("fqdn" in aq.callerInput)) {
aq.reject("fqdn required");
return;
}
if (!("type" in aq.callerInput)) {
aq.reject("type required");
return;
}
const msg = new wire.Message();
const q = new wire.Question(aq.callerInput.fqdn, aq.callerInput.type);
msg.question.push(q);
const ret = await node.rs.answer(msg);
aq.respond(
ret.collect(aq.callerInput.fqdn, wire.stringToType(aq.callerInput.type)),
);
}
async function handleRegister(aq: ActiveQuery) {
await networkRegistry.registerNetwork(TYPES);