feat: add dns query api that uses mock sockets and a dedicated hyper proxy for DNS (port 53) requests
This commit is contained in:
parent
8bcb788286
commit
d675a44b4e
|
@ -0,0 +1,132 @@
|
||||||
|
import { Client as Base, TCPSocket } from "bns/lib/internal/net.js";
|
||||||
|
import IP from "binet";
|
||||||
|
import { Duplex } from "streamx";
|
||||||
|
import { MultiSocketProxy } from "@lumeweb/libhyperproxy";
|
||||||
|
import defer from "p-defer";
|
||||||
|
|
||||||
|
class Stream extends Duplex {
|
||||||
|
public remoteAddress = "127.0.0.1";
|
||||||
|
public family = "IPv4";
|
||||||
|
public port = 0;
|
||||||
|
public remotePort = 0;
|
||||||
|
public parent: any;
|
||||||
|
|
||||||
|
constructor(options) {
|
||||||
|
super(options);
|
||||||
|
this.parent = options.parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
setNoDelay() {}
|
||||||
|
|
||||||
|
connect() {
|
||||||
|
setTimeout(() => {
|
||||||
|
// @ts-ignore
|
||||||
|
this.emit("connect");
|
||||||
|
}, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
unref() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
const PROTOCOL = "lumeweb.proxy.handshake.dns";
|
||||||
|
|
||||||
|
export default class TCPClient extends Base {
|
||||||
|
private node: any;
|
||||||
|
private proxy: MultiSocketProxy;
|
||||||
|
private peerConnected = defer();
|
||||||
|
|
||||||
|
constructor(options) {
|
||||||
|
super(options);
|
||||||
|
|
||||||
|
this.node = options.node;
|
||||||
|
this.proxy = new MultiSocketProxy({
|
||||||
|
protocol: PROTOCOL,
|
||||||
|
swarm: options.swarm,
|
||||||
|
server: false,
|
||||||
|
autostart: true,
|
||||||
|
listen: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
this.proxy.on("peerChannelOpen", () => {
|
||||||
|
this.peerConnected.resolve();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected declare sockets: Map<any, any>;
|
||||||
|
|
||||||
|
async write(msg, port, host) {
|
||||||
|
const local = host === "127.0.0.1";
|
||||||
|
const key = IP.toHost(host, port);
|
||||||
|
const cache = this.sockets.get(key);
|
||||||
|
|
||||||
|
if (cache) {
|
||||||
|
cache.write(msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let socket: any = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (local) {
|
||||||
|
socket = await this.createLocalSocket();
|
||||||
|
} else {
|
||||||
|
await this.peerConnected.promise;
|
||||||
|
socket = new TCPSocket(this);
|
||||||
|
socket.socket = this.proxy.createSocket({
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
});
|
||||||
|
socket.socket.remoteAddress = host;
|
||||||
|
socket.socket.remotePort = port;
|
||||||
|
|
||||||
|
await socket.connect();
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.sockets.has(key)) {
|
||||||
|
socket.destroy();
|
||||||
|
socket = this.sockets.get(key);
|
||||||
|
} else {
|
||||||
|
socket.parent = this;
|
||||||
|
this.sockets.set(key, socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.write(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
async createLocalSocket() {
|
||||||
|
const socket = new TCPSocket(this);
|
||||||
|
const remoteSocket = new Stream({
|
||||||
|
parent: this,
|
||||||
|
write: (data, cb) => {
|
||||||
|
// @ts-ignore
|
||||||
|
socket.socket.push(data);
|
||||||
|
cb?.(null);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
// @ts-ignore
|
||||||
|
socket.socket = new Stream({
|
||||||
|
parent: this,
|
||||||
|
write: (data, cb) => {
|
||||||
|
// @ts-ignore
|
||||||
|
remoteSocket.push(data);
|
||||||
|
cb?.(null);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const port = this.node.rs.hns.getAuthority().servers.slice().pop().port;
|
||||||
|
remoteSocket.port = port;
|
||||||
|
remoteSocket.remotePort = port;
|
||||||
|
// @ts-ignore
|
||||||
|
socket.socket.port = port;
|
||||||
|
// @ts-ignore
|
||||||
|
socket.socket.remotePort = port;
|
||||||
|
|
||||||
|
await socket.connect();
|
||||||
|
|
||||||
|
this.node.ns.server.server.emit("connection", remoteSocket);
|
||||||
|
|
||||||
|
return socket;
|
||||||
|
}
|
||||||
|
}
|
35
src/index.ts
35
src/index.ts
|
@ -12,6 +12,8 @@ import { SPVNode } from "hsd/lib/node";
|
||||||
import defer from "p-defer";
|
import defer from "p-defer";
|
||||||
import dns from "@i2labs/dns";
|
import dns from "@i2labs/dns";
|
||||||
import assert from "assert";
|
import assert from "assert";
|
||||||
|
import { wire } from "bns";
|
||||||
|
import TCPClient from "./dns/tcpClient.js";
|
||||||
|
|
||||||
const PROTOCOL = "lumeweb.proxy.handshake";
|
const PROTOCOL = "lumeweb.proxy.handshake";
|
||||||
const TYPES = ["blockchain"];
|
const TYPES = ["blockchain"];
|
||||||
|
@ -29,6 +31,7 @@ addHandler("status", handleStatus, { receiveUpdates: true });
|
||||||
addHandler("name", handleName);
|
addHandler("name", handleName);
|
||||||
addHandler("ready", handleReady);
|
addHandler("ready", handleReady);
|
||||||
addHandler("query", handleQuery);
|
addHandler("query", handleQuery);
|
||||||
|
addHandler("dnsQuery", handleDnsQuery);
|
||||||
|
|
||||||
let swarm;
|
let swarm;
|
||||||
let proxy: MultiSocketProxy;
|
let proxy: MultiSocketProxy;
|
||||||
|
@ -75,7 +78,6 @@ async function handlePresentKey(aq: ActiveQuery) {
|
||||||
config: false,
|
config: false,
|
||||||
argv: false,
|
argv: false,
|
||||||
env: false,
|
env: false,
|
||||||
noDns: true,
|
|
||||||
memory: false,
|
memory: false,
|
||||||
logFile: false,
|
logFile: false,
|
||||||
logConsole: true,
|
logConsole: true,
|
||||||
|
@ -143,6 +145,10 @@ async function handlePresentKey(aq: ActiveQuery) {
|
||||||
node.http.http.listen = (port: number, host: string, cb: Function) => cb();
|
node.http.http.listen = (port: number, host: string, cb: Function) => cb();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node.rs.hns.forceTCP = true;
|
||||||
|
node.rs.hns.socket = new TCPClient({ node, swarm });
|
||||||
|
node.rs.hns.init();
|
||||||
|
|
||||||
proxy = new MultiSocketProxy({
|
proxy = new MultiSocketProxy({
|
||||||
protocol: PROTOCOL,
|
protocol: PROTOCOL,
|
||||||
swarm,
|
swarm,
|
||||||
|
@ -196,6 +202,33 @@ async function handleQuery(aq: ActiveQuery) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function handleDnsQuery(aq: ActiveQuery) {
|
||||||
|
if (!node.chain.synced || !node.pool.peers.head()) {
|
||||||
|
aq.reject("not ready");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!("fqdn" in aq.callerInput)) {
|
||||||
|
aq.reject("fqdn required");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!("type" in aq.callerInput)) {
|
||||||
|
aq.reject("type required");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const msg = new wire.Message();
|
||||||
|
const q = new wire.Question(aq.callerInput.fqdn, aq.callerInput.type);
|
||||||
|
msg.question.push(q);
|
||||||
|
|
||||||
|
const ret = await node.rs.answer(msg);
|
||||||
|
|
||||||
|
aq.respond(
|
||||||
|
ret.collect(aq.callerInput.fqdn, wire.stringToType(aq.callerInput.type)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
async function handleRegister(aq: ActiveQuery) {
|
async function handleRegister(aq: ActiveQuery) {
|
||||||
await networkRegistry.registerNetwork(TYPES);
|
await networkRegistry.registerNetwork(TYPES);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue