feat: add dns query api that uses mock sockets and a dedicated hyper proxy for DNS (port 53) requests
This commit is contained in:
parent
8bcb788286
commit
d675a44b4e
|
@ -0,0 +1,132 @@
|
|||
import { Client as Base, TCPSocket } from "bns/lib/internal/net.js";
|
||||
import IP from "binet";
|
||||
import { Duplex } from "streamx";
|
||||
import { MultiSocketProxy } from "@lumeweb/libhyperproxy";
|
||||
import defer from "p-defer";
|
||||
|
||||
class Stream extends Duplex {
|
||||
public remoteAddress = "127.0.0.1";
|
||||
public family = "IPv4";
|
||||
public port = 0;
|
||||
public remotePort = 0;
|
||||
public parent: any;
|
||||
|
||||
constructor(options) {
|
||||
super(options);
|
||||
this.parent = options.parent;
|
||||
}
|
||||
|
||||
setNoDelay() {}
|
||||
|
||||
connect() {
|
||||
setTimeout(() => {
|
||||
// @ts-ignore
|
||||
this.emit("connect");
|
||||
}, 1);
|
||||
}
|
||||
|
||||
unref() {}
|
||||
}
|
||||
|
||||
const PROTOCOL = "lumeweb.proxy.handshake.dns";
|
||||
|
||||
export default class TCPClient extends Base {
|
||||
private node: any;
|
||||
private proxy: MultiSocketProxy;
|
||||
private peerConnected = defer();
|
||||
|
||||
constructor(options) {
|
||||
super(options);
|
||||
|
||||
this.node = options.node;
|
||||
this.proxy = new MultiSocketProxy({
|
||||
protocol: PROTOCOL,
|
||||
swarm: options.swarm,
|
||||
server: false,
|
||||
autostart: true,
|
||||
listen: true,
|
||||
});
|
||||
|
||||
this.proxy.on("peerChannelOpen", () => {
|
||||
this.peerConnected.resolve();
|
||||
});
|
||||
}
|
||||
|
||||
protected declare sockets: Map<any, any>;
|
||||
|
||||
async write(msg, port, host) {
|
||||
const local = host === "127.0.0.1";
|
||||
const key = IP.toHost(host, port);
|
||||
const cache = this.sockets.get(key);
|
||||
|
||||
if (cache) {
|
||||
cache.write(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
let socket: any = null;
|
||||
|
||||
try {
|
||||
if (local) {
|
||||
socket = await this.createLocalSocket();
|
||||
} else {
|
||||
await this.peerConnected.promise;
|
||||
socket = new TCPSocket(this);
|
||||
socket.socket = this.proxy.createSocket({
|
||||
host,
|
||||
port,
|
||||
});
|
||||
socket.socket.remoteAddress = host;
|
||||
socket.socket.remotePort = port;
|
||||
|
||||
await socket.connect();
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.sockets.has(key)) {
|
||||
socket.destroy();
|
||||
socket = this.sockets.get(key);
|
||||
} else {
|
||||
socket.parent = this;
|
||||
this.sockets.set(key, socket);
|
||||
}
|
||||
|
||||
socket.write(msg);
|
||||
}
|
||||
|
||||
async createLocalSocket() {
|
||||
const socket = new TCPSocket(this);
|
||||
const remoteSocket = new Stream({
|
||||
parent: this,
|
||||
write: (data, cb) => {
|
||||
// @ts-ignore
|
||||
socket.socket.push(data);
|
||||
cb?.(null);
|
||||
},
|
||||
});
|
||||
// @ts-ignore
|
||||
socket.socket = new Stream({
|
||||
parent: this,
|
||||
write: (data, cb) => {
|
||||
// @ts-ignore
|
||||
remoteSocket.push(data);
|
||||
cb?.(null);
|
||||
},
|
||||
});
|
||||
const port = this.node.rs.hns.getAuthority().servers.slice().pop().port;
|
||||
remoteSocket.port = port;
|
||||
remoteSocket.remotePort = port;
|
||||
// @ts-ignore
|
||||
socket.socket.port = port;
|
||||
// @ts-ignore
|
||||
socket.socket.remotePort = port;
|
||||
|
||||
await socket.connect();
|
||||
|
||||
this.node.ns.server.server.emit("connection", remoteSocket);
|
||||
|
||||
return socket;
|
||||
}
|
||||
}
|
35
src/index.ts
35
src/index.ts
|
@ -12,6 +12,8 @@ import { SPVNode } from "hsd/lib/node";
|
|||
import defer from "p-defer";
|
||||
import dns from "@i2labs/dns";
|
||||
import assert from "assert";
|
||||
import { wire } from "bns";
|
||||
import TCPClient from "./dns/tcpClient.js";
|
||||
|
||||
const PROTOCOL = "lumeweb.proxy.handshake";
|
||||
const TYPES = ["blockchain"];
|
||||
|
@ -29,6 +31,7 @@ addHandler("status", handleStatus, { receiveUpdates: true });
|
|||
addHandler("name", handleName);
|
||||
addHandler("ready", handleReady);
|
||||
addHandler("query", handleQuery);
|
||||
addHandler("dnsQuery", handleDnsQuery);
|
||||
|
||||
let swarm;
|
||||
let proxy: MultiSocketProxy;
|
||||
|
@ -75,7 +78,6 @@ async function handlePresentKey(aq: ActiveQuery) {
|
|||
config: false,
|
||||
argv: false,
|
||||
env: false,
|
||||
noDns: true,
|
||||
memory: false,
|
||||
logFile: false,
|
||||
logConsole: true,
|
||||
|
@ -143,6 +145,10 @@ async function handlePresentKey(aq: ActiveQuery) {
|
|||
node.http.http.listen = (port: number, host: string, cb: Function) => cb();
|
||||
}
|
||||
|
||||
node.rs.hns.forceTCP = true;
|
||||
node.rs.hns.socket = new TCPClient({ node, swarm });
|
||||
node.rs.hns.init();
|
||||
|
||||
proxy = new MultiSocketProxy({
|
||||
protocol: PROTOCOL,
|
||||
swarm,
|
||||
|
@ -196,6 +202,33 @@ async function handleQuery(aq: ActiveQuery) {
|
|||
}
|
||||
}
|
||||
|
||||
async function handleDnsQuery(aq: ActiveQuery) {
|
||||
if (!node.chain.synced || !node.pool.peers.head()) {
|
||||
aq.reject("not ready");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!("fqdn" in aq.callerInput)) {
|
||||
aq.reject("fqdn required");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!("type" in aq.callerInput)) {
|
||||
aq.reject("type required");
|
||||
return;
|
||||
}
|
||||
|
||||
const msg = new wire.Message();
|
||||
const q = new wire.Question(aq.callerInput.fqdn, aq.callerInput.type);
|
||||
msg.question.push(q);
|
||||
|
||||
const ret = await node.rs.answer(msg);
|
||||
|
||||
aq.respond(
|
||||
ret.collect(aq.callerInput.fqdn, wire.stringToType(aq.callerInput.type)),
|
||||
);
|
||||
}
|
||||
|
||||
async function handleRegister(aq: ActiveQuery) {
|
||||
await networkRegistry.registerNetwork(TYPES);
|
||||
|
||||
|
|
Loading…
Reference in New Issue