refactor: major refactor to put p2p message routing into map based handlers with types
This commit is contained in:
parent
db72e1eefd
commit
aed4865b73
|
@ -133,3 +133,4 @@ export const storageLocationTypeArchive = 0;
|
||||||
export const storageLocationTypeFile = 3;
|
export const storageLocationTypeFile = 3;
|
||||||
export const storageLocationTypeFull = 5;
|
export const storageLocationTypeFull = 5;
|
||||||
export const storageLocationTypeBridge = 7;
|
export const storageLocationTypeBridge = 7;
|
||||||
|
export const supportedFeatures = 3;
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
import Packer from "#serialization/pack.js";
|
||||||
|
import { protocolMethodHandshakeDone, supportedFeatures } from "#constants.js";
|
||||||
|
import { S5Node } from "#node.js";
|
||||||
|
import { Peer } from "#types.js";
|
||||||
|
import Unpacker from "#serialization/unpack.js";
|
||||||
|
|
||||||
|
export default async function (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
rawData: Uint8Array,
|
||||||
|
) {
|
||||||
|
const p = new Packer();
|
||||||
|
p.packInt(protocolMethodHandshakeDone);
|
||||||
|
p.packBinary(data.unpackBinary());
|
||||||
|
let peerNetworkId: string | null = null;
|
||||||
|
try {
|
||||||
|
peerNetworkId = data.unpackString();
|
||||||
|
} catch {}
|
||||||
|
|
||||||
|
if (this.networkId && peerNetworkId !== this.networkId) {
|
||||||
|
throw `Peer is in different network: ${peerNetworkId}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
p.packInt(supportedFeatures);
|
||||||
|
p.packInt(node.services.p2p.selfConnectionUris.length);
|
||||||
|
for (const uri of this.selfConnectionUris) {
|
||||||
|
p.packString(uri.toString());
|
||||||
|
}
|
||||||
|
// TODO Protocol version
|
||||||
|
// p.packInt(protocolVersion);
|
||||||
|
peer.sendMessage(await this.signMessageSimple(p.takeBytes()));
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
import { P2PMessageHandler } from "#types.js";
|
||||||
|
import handshakeOpen from "#messages/handshakeOpen.js";
|
||||||
|
|
||||||
|
const messages = new Map<number, P2PMessageHandler>(
|
||||||
|
Object.entries({
|
||||||
|
protocolMethodHandshakeOpen: handshakeOpen,
|
||||||
|
}).map(([key, value]) => [Number(key), value]),
|
||||||
|
);
|
||||||
|
|
||||||
|
Object.freeze(messages);
|
||||||
|
|
||||||
|
export default messages;
|
|
@ -0,0 +1,13 @@
|
||||||
|
import { S5Node } from "#node.js";
|
||||||
|
import { Peer } from "#types.js";
|
||||||
|
import Unpacker from "#serialization/unpack.js";
|
||||||
|
|
||||||
|
export default async function (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
rawData: Uint8Array,
|
||||||
|
) {
|
||||||
|
const sre = node.services.registry.deserializeRegistryEntry(rawData);
|
||||||
|
await node.services.registry.set(sre, false, peer);
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
import { S5Node } from "#node.js";
|
||||||
|
import { Peer } from "#types.js";
|
||||||
|
import Unpacker from "#serialization/unpack.js";
|
||||||
|
|
||||||
|
export default async function (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
rawData: Uint8Array,
|
||||||
|
) {
|
||||||
|
const pk = data.unpackBinary();
|
||||||
|
const sre = await node.services.registry.getFromDB(pk);
|
||||||
|
if (sre !== null) {
|
||||||
|
peer.sendMessage(node.services.registry.serializeRegistryEntry(sre));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
import { S5Node } from "#node.js";
|
||||||
|
import { Peer } from "#types.js";
|
||||||
|
import Unpacker from "#serialization/unpack.js";
|
||||||
|
import messages from "#messages/signedMessages/index.js";
|
||||||
|
|
||||||
|
export default async function (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
rawData: Uint8Array,
|
||||||
|
verifyId = true,
|
||||||
|
) {
|
||||||
|
const sm = await node.services.p2p.unpackAndVerifySignature(data);
|
||||||
|
const u = Unpacker.fromPacked(sm.message);
|
||||||
|
const method = data.unpackInt();
|
||||||
|
|
||||||
|
if (method !== null && messages.has(method)) {
|
||||||
|
await messages.get(method)?.(node, peer, u, sm, verifyId);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
import { S5Node } from "#node.js";
|
||||||
|
import { Peer, SignedMessage } from "#types.js";
|
||||||
|
import Unpacker from "#serialization/unpack.js";
|
||||||
|
import { equalBytes } from "@noble/curves/abstract/utils";
|
||||||
|
import { URL } from "url";
|
||||||
|
import NodeId from "#nodeId.js";
|
||||||
|
|
||||||
|
export default async function (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
message: SignedMessage,
|
||||||
|
verifyId: boolean,
|
||||||
|
) {
|
||||||
|
const length = data.unpackInt() as number;
|
||||||
|
for (let i = 0; i < length; i++) {
|
||||||
|
const peerIdBinary = data.unpackBinary();
|
||||||
|
const id = new NodeId(peerIdBinary);
|
||||||
|
|
||||||
|
const isConnected = data.unpackBool() as boolean;
|
||||||
|
|
||||||
|
const connectionUrisCount = data.unpackInt() as number;
|
||||||
|
|
||||||
|
const connectionUris: URL[] = [];
|
||||||
|
|
||||||
|
for (let i = 0; i < connectionUrisCount; i++) {
|
||||||
|
connectionUris.push(new URL(data.unpackString() as string));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (connectionUris.length > 0) {
|
||||||
|
// TODO Fully support multiple connection uris
|
||||||
|
const uri = new URL(connectionUris[0].toString());
|
||||||
|
uri.username = id.toBase58();
|
||||||
|
if (!this.reconnectDelay.has(NodeId.decode(uri.username).toString())) {
|
||||||
|
node.services.p2p.connectToNode([uri]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
import { S5Node } from "#node.js";
|
||||||
|
import { Peer, SignedMessage } from "#types.js";
|
||||||
|
import Unpacker from "#serialization/unpack.js";
|
||||||
|
import { equalBytes } from "@noble/curves/abstract/utils";
|
||||||
|
import { URL } from "url";
|
||||||
|
|
||||||
|
export default async function (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
message: SignedMessage,
|
||||||
|
verifyId: boolean,
|
||||||
|
) {
|
||||||
|
const challenge = data.unpackBinary();
|
||||||
|
|
||||||
|
if (!equalBytes(peer.challenge, challenge)) {
|
||||||
|
throw "Invalid challenge";
|
||||||
|
}
|
||||||
|
|
||||||
|
const pId = message.nodeId;
|
||||||
|
|
||||||
|
if (!verifyId) {
|
||||||
|
peer.id = pId;
|
||||||
|
} else {
|
||||||
|
if (!peer.id.equals(pId)) {
|
||||||
|
throw "Invalid transports id on initial list";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
peer.isConnected = true;
|
||||||
|
|
||||||
|
const supportedFeatures = data.unpackInt();
|
||||||
|
|
||||||
|
if (supportedFeatures !== 3) {
|
||||||
|
throw "Remote node does not support required features";
|
||||||
|
}
|
||||||
|
|
||||||
|
node.services.p2p.peers.set(peer.id.toString(), peer);
|
||||||
|
node.services.p2p.reconnectDelay.set(peer.id.toString(), 1);
|
||||||
|
|
||||||
|
const connectionUrisCount = data.unpackInt() as number;
|
||||||
|
|
||||||
|
peer.connectionUris = [];
|
||||||
|
for (let i = 0; i < connectionUrisCount; i++) {
|
||||||
|
peer.connectionUris.push(new URL(data.unpackString() as string));
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(
|
||||||
|
`[+] ${peer.id.toString()} (${peer.renderLocationUri().toString()})`,
|
||||||
|
);
|
||||||
|
|
||||||
|
node.services.p2p.sendPublicPeersToPeer(
|
||||||
|
peer,
|
||||||
|
Array.from(node.services.p2p.peers.values()),
|
||||||
|
);
|
||||||
|
for (const p of this._peers.values()) {
|
||||||
|
if (p.id.equals(peer.id)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p.isConnected) {
|
||||||
|
this.sendPublicPeersToPeer(p, [peer]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
import { P2PSignedMessageHandler } from "#types.js";
|
||||||
|
import handshakeDone from "#messages/signedMessages/handshakeDone.js";
|
||||||
|
|
||||||
|
const messages = new Map<number, P2PSignedMessageHandler>(
|
||||||
|
Object.entries({
|
||||||
|
protocolMethodHandshakeOpen: handshakeDone,
|
||||||
|
}).map(([key, value]) => [Number(key), value]),
|
||||||
|
);
|
||||||
|
|
||||||
|
Object.freeze(messages);
|
||||||
|
|
||||||
|
export default messages;
|
|
@ -0,0 +1,77 @@
|
||||||
|
import { S5Node } from "#node.js";
|
||||||
|
import { Peer } from "#types.js";
|
||||||
|
import Unpacker from "#serialization/unpack.js";
|
||||||
|
import { Multihash } from "#multihash.js";
|
||||||
|
import { decodeEndian } from "#util.js";
|
||||||
|
import { mkeyEd25519 } from "#constants.js";
|
||||||
|
import { ed25519 } from "@noble/curves/ed25519";
|
||||||
|
import NodeId from "#nodeId.js";
|
||||||
|
import StorageLocation from "#storage.js";
|
||||||
|
|
||||||
|
export default async function (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
rawData: Uint8Array,
|
||||||
|
) {
|
||||||
|
const hash = new Multihash(rawData.subarray(1, 34));
|
||||||
|
const type = rawData[34];
|
||||||
|
const expiry = decodeEndian(rawData.subarray(35, 39));
|
||||||
|
const partCount = rawData[39];
|
||||||
|
const parts: string[] = [];
|
||||||
|
let cursor = 40;
|
||||||
|
for (let i = 0; i < partCount; i++) {
|
||||||
|
const length = decodeEndian(rawData.subarray(cursor, cursor + 2));
|
||||||
|
cursor += 2;
|
||||||
|
parts.push(
|
||||||
|
new TextDecoder().decode(rawData.subarray(cursor, cursor + length)),
|
||||||
|
);
|
||||||
|
cursor += length;
|
||||||
|
}
|
||||||
|
cursor++;
|
||||||
|
|
||||||
|
const publicKey = rawData.subarray(cursor, cursor + 33);
|
||||||
|
const signature = rawData.subarray(cursor + 33);
|
||||||
|
|
||||||
|
if (publicKey[0] !== mkeyEd25519) {
|
||||||
|
throw `Unsupported public key type ${mkeyEd25519}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
!ed25519.verify(
|
||||||
|
signature,
|
||||||
|
rawData.subarray(0, cursor),
|
||||||
|
publicKey.subarray(1),
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const nodeId = new NodeId(publicKey);
|
||||||
|
await node.addStorageLocation({
|
||||||
|
hash,
|
||||||
|
nodeId,
|
||||||
|
location: new StorageLocation(type, parts, expiry),
|
||||||
|
message: rawData,
|
||||||
|
config: this.node.config,
|
||||||
|
});
|
||||||
|
|
||||||
|
const list = this.hashQueryRoutingTable.get(hash) || new Set<NodeId>();
|
||||||
|
for (const peerId of list) {
|
||||||
|
if (peerId.equals(nodeId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (peerId.equals(peer.id)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this._peers.has(peerId.toString())) {
|
||||||
|
try {
|
||||||
|
this._peers.get(peerId.toString())?.sendMessage(event);
|
||||||
|
} catch (e) {
|
||||||
|
this.logger.catched(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.hashQueryRoutingTable.delete(hash);
|
||||||
|
}
|
64
src/node.ts
64
src/node.ts
|
@ -40,11 +40,29 @@ export interface S5NodeConfig {
|
||||||
|
|
||||||
export class S5Node {
|
export class S5Node {
|
||||||
private _nodeConfig: S5NodeConfig;
|
private _nodeConfig: S5NodeConfig;
|
||||||
private _config?: S5Config;
|
|
||||||
constructor(config: S5NodeConfig) {
|
constructor(config: S5NodeConfig) {
|
||||||
this._nodeConfig = config;
|
this._nodeConfig = config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _config?: S5Config;
|
||||||
|
|
||||||
|
get config() {
|
||||||
|
return this._config as S5Config;
|
||||||
|
}
|
||||||
|
|
||||||
|
get services() {
|
||||||
|
return this._config?.services as S5Services;
|
||||||
|
}
|
||||||
|
|
||||||
|
get db() {
|
||||||
|
return this._config?.db as AbstractLevel<Uint8Array, string, Uint8Array>;
|
||||||
|
}
|
||||||
|
|
||||||
|
get logger() {
|
||||||
|
return this._config?.logger as Logger;
|
||||||
|
}
|
||||||
|
|
||||||
public async start() {
|
public async start() {
|
||||||
this._config = {
|
this._config = {
|
||||||
keyPair: this._nodeConfig.keyPair,
|
keyPair: this._nodeConfig.keyPair,
|
||||||
|
@ -62,29 +80,11 @@ export class S5Node {
|
||||||
await p2p.start();
|
await p2p.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
get services() {
|
async readStorageLocationsFromDB(
|
||||||
return this._config?.services as S5Services;
|
hash: Multihash,
|
||||||
}
|
): Promise<Map<number, Map<string, Map<number, any>>>> {
|
||||||
get config() {
|
const map = new Map<number, Map<string, Map<number, any>>>();
|
||||||
return this._config as S5Config;
|
const bytes = await this.db.get(stringifyHash(hash));
|
||||||
}
|
|
||||||
get db() {
|
|
||||||
return this._config?.db as AbstractLevel<Uint8Array, string, Uint8Array>;
|
|
||||||
}
|
|
||||||
get logger() {
|
|
||||||
return this._config?.logger as Logger;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function readStorageLocationsFromDB({
|
|
||||||
hash,
|
|
||||||
config,
|
|
||||||
}: {
|
|
||||||
hash: Multihash;
|
|
||||||
config: S5Config;
|
|
||||||
}): Promise<Map<number, Map<NodeId, Map<number, any>>>> {
|
|
||||||
const map = new Map<number, Map<NodeId, Map<number, any>>>();
|
|
||||||
const bytes = await config.db.get(stringifyHash(hash));
|
|
||||||
if (bytes === null) {
|
if (bytes === null) {
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
@ -92,18 +92,21 @@ export async function readStorageLocationsFromDB({
|
||||||
const mapLength = unpacker.unpackMapLength();
|
const mapLength = unpacker.unpackMapLength();
|
||||||
for (let i = 0; i < mapLength; i++) {
|
for (let i = 0; i < mapLength; i++) {
|
||||||
const type = unpacker.unpackInt() as number;
|
const type = unpacker.unpackInt() as number;
|
||||||
const innerMap = new Map<NodeId, Map<number, any>>();
|
const innerMap = new Map<string, Map<number, any>>();
|
||||||
map.set(type, innerMap);
|
map.set(type, innerMap);
|
||||||
const innerMapLength = unpacker.unpackMapLength();
|
const innerMapLength = unpacker.unpackMapLength();
|
||||||
for (let j = 0; j < innerMapLength; j++) {
|
for (let j = 0; j < innerMapLength; j++) {
|
||||||
const nodeId = new NodeId(unpacker.unpackBinary());
|
const nodeId = new NodeId(unpacker.unpackBinary());
|
||||||
innerMap.set(nodeId, new Map(unpacker.unpackMap() as [number, any][]));
|
innerMap.set(
|
||||||
|
nodeId.toString(),
|
||||||
|
new Map(unpacker.unpackMap() as [number, any][]),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function addStorageLocation({
|
async addStorageLocation({
|
||||||
hash,
|
hash,
|
||||||
nodeId,
|
nodeId,
|
||||||
location,
|
location,
|
||||||
|
@ -116,9 +119,9 @@ export async function addStorageLocation({
|
||||||
message?: Uint8Array;
|
message?: Uint8Array;
|
||||||
config: S5Config;
|
config: S5Config;
|
||||||
}) {
|
}) {
|
||||||
const map = this.readStorageLocationsFromDB(hash);
|
const map = await this.readStorageLocationsFromDB(hash);
|
||||||
const innerMap =
|
const innerMap =
|
||||||
map.get(location.type) || new Map<NodeId, Map<number, any>>();
|
map.get(location.type) || new Map<string, Map<number, any>>();
|
||||||
map.set(location.type, innerMap);
|
map.set(location.type, innerMap);
|
||||||
|
|
||||||
const locationMap = new Map<number, any>([
|
const locationMap = new Map<number, any>([
|
||||||
|
@ -128,12 +131,13 @@ export async function addStorageLocation({
|
||||||
[4, message],
|
[4, message],
|
||||||
]);
|
]);
|
||||||
|
|
||||||
innerMap.set(nodeId, locationMap);
|
innerMap.set(nodeId.toString(), locationMap);
|
||||||
await config.cacheDb.put(
|
await config.cacheDb.put(
|
||||||
stringifyHash(hash),
|
stringifyHash(hash),
|
||||||
new Packer().pack(map).takeBytes(),
|
new Packer().pack(map).takeBytes(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export function stringifyBytes(data: Uint8Array) {
|
export function stringifyBytes(data: Uint8Array) {
|
||||||
return String.fromCharCode(...data);
|
return String.fromCharCode(...data);
|
||||||
|
|
|
@ -1,43 +1,37 @@
|
||||||
import { Multihash } from "../multihash.js";
|
import { Multihash } from "../multihash.js";
|
||||||
import NodeId from "../nodeId.js";
|
import NodeId from "../nodeId.js";
|
||||||
import { equalBytes } from "@noble/curves/abstract/utils";
|
import { Logger, Peer, SignedMessage } from "../types.js";
|
||||||
import { Logger, Peer, S5Config, SignedMessage } from "../types.js";
|
|
||||||
import KeyPairEd25519 from "../ed25519.js";
|
import KeyPairEd25519 from "../ed25519.js";
|
||||||
import * as crypto from "crypto";
|
import * as crypto from "crypto";
|
||||||
import {
|
import {
|
||||||
mkeyEd25519,
|
|
||||||
protocolMethodAnnouncePeers,
|
protocolMethodAnnouncePeers,
|
||||||
protocolMethodHandshakeDone,
|
|
||||||
protocolMethodHandshakeOpen,
|
protocolMethodHandshakeOpen,
|
||||||
protocolMethodHashQuery,
|
protocolMethodHashQuery,
|
||||||
protocolMethodSignedMessage,
|
protocolMethodSignedMessage,
|
||||||
recordTypeRegistryEntry,
|
|
||||||
recordTypeStorageLocation,
|
recordTypeStorageLocation,
|
||||||
storageLocationTypeFull,
|
storageLocationTypeFull,
|
||||||
} from "../constants.js";
|
} from "../constants.js";
|
||||||
import defer from "p-defer";
|
import defer from "p-defer";
|
||||||
import { calculateScore, decodeEndian, encodeEndian } from "#util.js";
|
import { calculateScore, encodeEndian } from "#util.js";
|
||||||
import Packer from "#serialization/pack.js";
|
import Packer from "#serialization/pack.js";
|
||||||
import Unpacker from "#serialization/unpack.js";
|
import Unpacker from "#serialization/unpack.js";
|
||||||
import { ed25519 } from "@noble/curves/ed25519";
|
import { ed25519 } from "@noble/curves/ed25519";
|
||||||
import { AbstractLevel, AbstractSublevel } from "abstract-level";
|
import { AbstractLevel, AbstractSublevel } from "abstract-level";
|
||||||
import StorageLocation from "#storage.js";
|
import StorageLocation from "#storage.js";
|
||||||
import { addStorageLocation, S5Node, stringifyNode } from "#node.js";
|
import { S5Node, stringifyNode } from "#node.js";
|
||||||
import { URL } from "url";
|
import { URL } from "url";
|
||||||
import { Buffer } from "buffer";
|
import { Buffer } from "buffer";
|
||||||
import {
|
import {
|
||||||
createTransportPeer,
|
createTransportPeer,
|
||||||
createTransportSocket,
|
createTransportSocket,
|
||||||
isTransport,
|
|
||||||
} from "#transports/index.js";
|
} from "#transports/index.js";
|
||||||
|
import messages from "#messages/index.js";
|
||||||
|
|
||||||
export class P2PService {
|
export class P2PService {
|
||||||
private node: S5Node;
|
|
||||||
private logger: Logger;
|
private logger: Logger;
|
||||||
private nodeKeyPair: KeyPairEd25519;
|
private nodeKeyPair: KeyPairEd25519;
|
||||||
private localNodeId?: NodeId;
|
private localNodeId?: NodeId;
|
||||||
private networkId?: string;
|
private networkId?: string;
|
||||||
private reconnectDelay: Map<string, number> = new Map();
|
|
||||||
private selfConnectionUris: Array<URL> = [];
|
|
||||||
private nodesDb?: AbstractSublevel<
|
private nodesDb?: AbstractSublevel<
|
||||||
AbstractLevel<Uint8Array, string, Uint8Array>,
|
AbstractLevel<Uint8Array, string, Uint8Array>,
|
||||||
Uint8Array,
|
Uint8Array,
|
||||||
|
@ -47,7 +41,7 @@ export class P2PService {
|
||||||
private hashQueryRoutingTable: Map<Multihash, Set<NodeId>> = new Map();
|
private hashQueryRoutingTable: Map<Multihash, Set<NodeId>> = new Map();
|
||||||
|
|
||||||
constructor(node: S5Node) {
|
constructor(node: S5Node) {
|
||||||
this.node = node;
|
this._node = node;
|
||||||
this.networkId = node.config.p2p?.network;
|
this.networkId = node.config.p2p?.network;
|
||||||
this.nodeKeyPair = node.config.keyPair;
|
this.nodeKeyPair = node.config.keyPair;
|
||||||
this.logger = node.logger;
|
this.logger = node.logger;
|
||||||
|
@ -55,6 +49,24 @@ export class P2PService {
|
||||||
node.config.services.p2p = this;
|
node.config.services.p2p = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _node: S5Node;
|
||||||
|
|
||||||
|
get node(): S5Node {
|
||||||
|
return this._node;
|
||||||
|
}
|
||||||
|
|
||||||
|
private _reconnectDelay: Map<string, number> = new Map();
|
||||||
|
|
||||||
|
get reconnectDelay(): Map<string, number> {
|
||||||
|
return this._reconnectDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
private _selfConnectionUris: Array<URL> = [];
|
||||||
|
|
||||||
|
get selfConnectionUris(): Array<URL> {
|
||||||
|
return this._selfConnectionUris;
|
||||||
|
}
|
||||||
|
|
||||||
private _peers: Map<string, Peer> = new Map();
|
private _peers: Map<string, Peer> = new Map();
|
||||||
|
|
||||||
get peers(): Map<string, Peer> {
|
get peers(): Map<string, Peer> {
|
||||||
|
@ -63,11 +75,11 @@ export class P2PService {
|
||||||
|
|
||||||
async init(): Promise<void> {
|
async init(): Promise<void> {
|
||||||
this.localNodeId = new NodeId(this.nodeKeyPair.publicKey); // Define the NodeId constructor
|
this.localNodeId = new NodeId(this.nodeKeyPair.publicKey); // Define the NodeId constructor
|
||||||
this.nodesDb = this.node.db.sublevel<string, Uint8Array>("s5-nodes", {});
|
this.nodesDb = this._node.db.sublevel<string, Uint8Array>("s5-nodes", {});
|
||||||
}
|
}
|
||||||
|
|
||||||
async start(): Promise<void> {
|
async start(): Promise<void> {
|
||||||
const initialPeers = this.node.config?.p2p?.peers?.initial || [];
|
const initialPeers = this._node.config?.p2p?.peers?.initial || [];
|
||||||
|
|
||||||
for (const p of initialPeers) {
|
for (const p of initialPeers) {
|
||||||
this.connectToNode([new URL(p)]);
|
this.connectToNode([new URL(p)]);
|
||||||
|
@ -86,196 +98,14 @@ export class P2PService {
|
||||||
|
|
||||||
const completer = defer<void>();
|
const completer = defer<void>();
|
||||||
|
|
||||||
const supportedFeatures = 3; // 0b00000011
|
|
||||||
|
|
||||||
peer.listenForMessages(
|
peer.listenForMessages(
|
||||||
async (event: Uint8Array) => {
|
async (event: Uint8Array) => {
|
||||||
let u = Unpacker.fromPacked(event);
|
let u = Unpacker.fromPacked(event);
|
||||||
const method = u.unpackInt();
|
const method = u.unpackInt();
|
||||||
if (method === protocolMethodHandshakeOpen) {
|
|
||||||
const p = new Packer();
|
|
||||||
p.packInt(protocolMethodHandshakeDone);
|
|
||||||
p.packBinary(u.unpackBinary());
|
|
||||||
let peerNetworkId: string | null = null;
|
|
||||||
try {
|
|
||||||
peerNetworkId = u.unpackString();
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
if (this.networkId && peerNetworkId !== this.networkId) {
|
if (method !== null && messages.has(method)) {
|
||||||
throw `Peer is in different network: ${peerNetworkId}`;
|
await messages.get(method)?.(this.node, peer, u, event, verifyId);
|
||||||
}
|
}
|
||||||
|
|
||||||
p.packInt(supportedFeatures);
|
|
||||||
p.packInt(this.selfConnectionUris.length);
|
|
||||||
for (const uri of this.selfConnectionUris) {
|
|
||||||
p.packString(uri.toString());
|
|
||||||
}
|
|
||||||
// TODO Protocol version
|
|
||||||
// p.packInt(protocolVersion);
|
|
||||||
peer.sendMessage(await this.signMessageSimple(p.takeBytes()));
|
|
||||||
return;
|
|
||||||
} else if (method === recordTypeRegistryEntry) {
|
|
||||||
const sre =
|
|
||||||
this.node.services.registry.deserializeRegistryEntry(event);
|
|
||||||
await this.node.services.registry.set(sre, false, peer);
|
|
||||||
return;
|
|
||||||
} else if (method === recordTypeStorageLocation) {
|
|
||||||
const hash = new Multihash(event.subarray(1, 34));
|
|
||||||
const type = event[34];
|
|
||||||
const expiry = decodeEndian(event.subarray(35, 39));
|
|
||||||
const partCount = event[39];
|
|
||||||
const parts: string[] = [];
|
|
||||||
let cursor = 40;
|
|
||||||
for (let i = 0; i < partCount; i++) {
|
|
||||||
const length = decodeEndian(event.subarray(cursor, cursor + 2));
|
|
||||||
cursor += 2;
|
|
||||||
parts.push(
|
|
||||||
new TextDecoder().decode(event.subarray(cursor, cursor + length)),
|
|
||||||
);
|
|
||||||
cursor += length;
|
|
||||||
}
|
|
||||||
cursor++;
|
|
||||||
|
|
||||||
const publicKey = event.subarray(cursor, cursor + 33);
|
|
||||||
const signature = event.subarray(cursor + 33);
|
|
||||||
|
|
||||||
if (publicKey[0] !== mkeyEd25519) {
|
|
||||||
throw `Unsupported public key type ${mkeyEd25519}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
!ed25519.verify(
|
|
||||||
signature,
|
|
||||||
event.subarray(0, cursor),
|
|
||||||
publicKey.subarray(1),
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const nodeId = new NodeId(publicKey);
|
|
||||||
await addStorageLocation({
|
|
||||||
hash,
|
|
||||||
nodeId,
|
|
||||||
location: new StorageLocation(type, parts, expiry),
|
|
||||||
message: event,
|
|
||||||
config: this.node.config,
|
|
||||||
});
|
|
||||||
|
|
||||||
const list =
|
|
||||||
this.hashQueryRoutingTable.get(hash) || new Set<NodeId>();
|
|
||||||
for (const peerId of list) {
|
|
||||||
if (peerId.equals(nodeId)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (peerId.equals(peer.id)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this._peers.has(peerId.toString())) {
|
|
||||||
try {
|
|
||||||
this._peers.get(peerId.toString())?.sendMessage(event);
|
|
||||||
} catch (e) {
|
|
||||||
this.logger.catched(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.hashQueryRoutingTable.delete(hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (method === protocolMethodSignedMessage) {
|
|
||||||
const sm = await this.unpackAndVerifySignature(u);
|
|
||||||
u = Unpacker.fromPacked(sm.message);
|
|
||||||
const method2 = u.unpackInt();
|
|
||||||
|
|
||||||
if (method2 === protocolMethodHandshakeDone) {
|
|
||||||
const challenge = u.unpackBinary();
|
|
||||||
|
|
||||||
if (!equalBytes(peer.challenge, challenge)) {
|
|
||||||
throw "Invalid challenge";
|
|
||||||
}
|
|
||||||
|
|
||||||
const pId = sm.nodeId;
|
|
||||||
|
|
||||||
if (!verifyId) {
|
|
||||||
peer.id = pId;
|
|
||||||
} else {
|
|
||||||
if (!peer.id.equals(pId)) {
|
|
||||||
throw "Invalid transports id on initial list";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
peer.isConnected = true;
|
|
||||||
|
|
||||||
const supportedFeatures = u.unpackInt();
|
|
||||||
|
|
||||||
if (supportedFeatures !== 3) {
|
|
||||||
throw "Remote node does not support required features";
|
|
||||||
}
|
|
||||||
|
|
||||||
this._peers.set(peer.id.toString(), peer);
|
|
||||||
this.reconnectDelay.set(peer.id.toString(), 1);
|
|
||||||
|
|
||||||
const connectionUrisCount = u.unpackInt() as number;
|
|
||||||
|
|
||||||
peer.connectionUris = [];
|
|
||||||
for (let i = 0; i < connectionUrisCount; i++) {
|
|
||||||
peer.connectionUris.push(new URL(u.unpackString() as string));
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.info(
|
|
||||||
`[+] ${peer.id.toString()} (${peer
|
|
||||||
.renderLocationUri()
|
|
||||||
.toString()})`,
|
|
||||||
);
|
|
||||||
|
|
||||||
this.sendPublicPeersToPeer(peer, Array.from(this._peers.values()));
|
|
||||||
for (const p of this._peers.values()) {
|
|
||||||
if (p.id.equals(peer.id)) continue;
|
|
||||||
|
|
||||||
if (p.isConnected) {
|
|
||||||
this.sendPublicPeersToPeer(p, [peer]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
} else if (method2 === protocolMethodAnnouncePeers) {
|
|
||||||
const length = u.unpackInt() as number;
|
|
||||||
for (let i = 0; i < length; i++) {
|
|
||||||
const peerIdBinary = u.unpackBinary();
|
|
||||||
const id = new NodeId(peerIdBinary);
|
|
||||||
|
|
||||||
const isConnected = u.unpackBool() as boolean;
|
|
||||||
|
|
||||||
const connectionUrisCount = u.unpackInt() as number;
|
|
||||||
|
|
||||||
const connectionUris: URL[] = [];
|
|
||||||
|
|
||||||
for (let i = 0; i < connectionUrisCount; i++) {
|
|
||||||
connectionUris.push(new URL(u.unpackString() as string));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connectionUris.length > 0) {
|
|
||||||
// TODO Fully support multiple connection uris
|
|
||||||
const uri = new URL(connectionUris[0].toString());
|
|
||||||
uri.username = id.toBase58();
|
|
||||||
if (
|
|
||||||
!this.reconnectDelay.has(
|
|
||||||
NodeId.decode(uri.username).toString(),
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
this.connectToNode([uri]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} /* else if (method === protocolMethodRegistryQuery) {
|
|
||||||
const pk = u.unpackBinary();
|
|
||||||
const sre = node.registry.getFromDB(pk);
|
|
||||||
if (sre !== null) {
|
|
||||||
transports.sendMessage(node.registry.serializeRegistryEntry(sre));
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
onDone: async () => {
|
onDone: async () => {
|
||||||
|
@ -466,9 +296,9 @@ export class P2PService {
|
||||||
|
|
||||||
const id = NodeId.decode(connectionUri.username);
|
const id = NodeId.decode(connectionUri.username);
|
||||||
|
|
||||||
this.reconnectDelay.set(
|
this._reconnectDelay.set(
|
||||||
id.toString(),
|
id.toString(),
|
||||||
this.reconnectDelay.get(id.toString()) || 1,
|
this._reconnectDelay.get(id.toString()) || 1,
|
||||||
);
|
);
|
||||||
|
|
||||||
if (id.equals(this.localNodeId)) {
|
if (id.equals(this.localNodeId)) {
|
||||||
|
@ -491,8 +321,8 @@ export class P2PService {
|
||||||
|
|
||||||
this.logger.catched(e);
|
this.logger.catched(e);
|
||||||
|
|
||||||
const delay = this.reconnectDelay.get(id.toString())!;
|
const delay = this._reconnectDelay.get(id.toString())!;
|
||||||
this.reconnectDelay.set(id.toString(), delay * 2);
|
this._reconnectDelay.set(id.toString(), delay * 2);
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay * 1000));
|
await new Promise((resolve) => setTimeout(resolve, delay * 1000));
|
||||||
|
|
||||||
await this.connectToNode(connectionUris, retried);
|
await this.connectToNode(connectionUris, retried);
|
||||||
|
|
|
@ -68,7 +68,7 @@ export class RegistryService {
|
||||||
throw new Error("Data too long");
|
throw new Error("Data too long");
|
||||||
}
|
}
|
||||||
|
|
||||||
const isValid = verifyRegistryEntry(sre);
|
const isValid = this.verifyRegistryEntry(sre);
|
||||||
if (!isValid) {
|
if (!isValid) {
|
||||||
throw new Error("Invalid signature found");
|
throw new Error("Invalid signature found");
|
||||||
}
|
}
|
||||||
|
@ -81,7 +81,7 @@ export class RegistryService {
|
||||||
if (existingEntry.revision === sre.revision) {
|
if (existingEntry.revision === sre.revision) {
|
||||||
return;
|
return;
|
||||||
} else if (existingEntry.revision > sre.revision) {
|
} else if (existingEntry.revision > sre.revision) {
|
||||||
const updateMessage = serializeRegistryEntry(existingEntry);
|
const updateMessage = this.serializeRegistryEntry(existingEntry);
|
||||||
receivedFrom.sendMessage(updateMessage);
|
receivedFrom.sendMessage(updateMessage);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ export class RegistryService {
|
||||||
const key = new Multihash(sre.pk);
|
const key = new Multihash(sre.pk);
|
||||||
this.streams.get(key.toString())?.emit("event", sre);
|
this.streams.get(key.toString())?.emit("event", sre);
|
||||||
|
|
||||||
this.db?.put(stringifyBytes(sre.pk), serializeRegistryEntry(sre));
|
this.db?.put(stringifyBytes(sre.pk), this.serializeRegistryEntry(sre));
|
||||||
|
|
||||||
this.broadcastEntry(sre, receivedFrom);
|
this.broadcastEntry(sre, receivedFrom);
|
||||||
}
|
}
|
||||||
|
@ -104,7 +104,7 @@ export class RegistryService {
|
||||||
// TODO: If there are more than X peers, only broadcast to subscribed nodes (routing table) and shard-nodes (256)
|
// TODO: If there are more than X peers, only broadcast to subscribed nodes (routing table) and shard-nodes (256)
|
||||||
broadcastEntry(sre: SignedRegistryEntry, receivedFrom?: Peer): void {
|
broadcastEntry(sre: SignedRegistryEntry, receivedFrom?: Peer): void {
|
||||||
this.logger.verbose("[registry] broadcastEntry");
|
this.logger.verbose("[registry] broadcastEntry");
|
||||||
const updateMessage = serializeRegistryEntry(sre);
|
const updateMessage = this.serializeRegistryEntry(sre);
|
||||||
|
|
||||||
for (const p of Object.values(this.node.services.p2p.peers)) {
|
for (const p of Object.values(this.node.services.p2p.peers)) {
|
||||||
if (receivedFrom == null || p.id !== receivedFrom.id) {
|
if (receivedFrom == null || p.id !== receivedFrom.id) {
|
||||||
|
@ -216,9 +216,8 @@ export class RegistryService {
|
||||||
signature: event.slice(43 + dataLength),
|
signature: event.slice(43 + dataLength),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
function verifyRegistryEntry(sre: SignedRegistryEntry): boolean {
|
public verifyRegistryEntry(sre: SignedRegistryEntry): boolean {
|
||||||
const list: Uint8Array = Uint8Array.from([
|
const list: Uint8Array = Uint8Array.from([
|
||||||
recordTypeRegistryEntry,
|
recordTypeRegistryEntry,
|
||||||
...encodeEndian(sre.revision, 8),
|
...encodeEndian(sre.revision, 8),
|
||||||
|
@ -228,7 +227,7 @@ function verifyRegistryEntry(sre: SignedRegistryEntry): boolean {
|
||||||
|
|
||||||
return ed25519.verify(list, sre.signature, sre.pk.slice(1));
|
return ed25519.verify(list, sre.signature, sre.pk.slice(1));
|
||||||
}
|
}
|
||||||
function serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array {
|
public serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array {
|
||||||
return Uint8Array.from([
|
return Uint8Array.from([
|
||||||
recordTypeRegistryEntry,
|
recordTypeRegistryEntry,
|
||||||
...sre.pk,
|
...sre.pk,
|
||||||
|
@ -238,3 +237,4 @@ function serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array {
|
||||||
...sre.signature,
|
...sre.signature,
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
18
src/types.ts
18
src/types.ts
|
@ -3,6 +3,8 @@ import KeyPairEd25519 from "#ed25519.js";
|
||||||
import { AbstractLevel } from "abstract-level";
|
import { AbstractLevel } from "abstract-level";
|
||||||
import { P2PService } from "./service/p2p.js";
|
import { P2PService } from "./service/p2p.js";
|
||||||
import { RegistryService } from "./service/registry.js";
|
import { RegistryService } from "./service/registry.js";
|
||||||
|
import { S5Node } from "#node.js";
|
||||||
|
import Unpacker from "#serialization/unpack.js";
|
||||||
|
|
||||||
export interface Peer {
|
export interface Peer {
|
||||||
id: NodeId;
|
id: NodeId;
|
||||||
|
@ -63,3 +65,19 @@ export interface SignedMessage {
|
||||||
nodeId: NodeId;
|
nodeId: NodeId;
|
||||||
message: Uint8Array;
|
message: Uint8Array;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type P2PMessageHandler = (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
rawData: Uint8Array,
|
||||||
|
verifyId: boolean,
|
||||||
|
) => Promise<void>;
|
||||||
|
|
||||||
|
export type P2PSignedMessageHandler = (
|
||||||
|
node: S5Node,
|
||||||
|
peer: Peer,
|
||||||
|
data: Unpacker,
|
||||||
|
message: SignedMessage,
|
||||||
|
verifyId: boolean,
|
||||||
|
) => Promise<void>;
|
||||||
|
|
Loading…
Reference in New Issue