Compare commits

...

5 Commits

21 changed files with 527 additions and 335 deletions

View File

@ -1,3 +1,5 @@
# [0.1.0-develop.3](https://git.lumeweb.com/LumeWeb/libs5/compare/v0.1.0-develop.2...v0.1.0-develop.3) (2023-08-31)
# [0.1.0-develop.2](https://git.lumeweb.com/LumeWeb/libs5/compare/v0.1.0-develop.1...v0.1.0-develop.2) (2023-08-31) # [0.1.0-develop.2](https://git.lumeweb.com/LumeWeb/libs5/compare/v0.1.0-develop.1...v0.1.0-develop.2) (2023-08-31)

4
npm-shrinkwrap.json generated
View File

@ -1,12 +1,12 @@
{ {
"name": "@lumeweb/libs5", "name": "@lumeweb/libs5",
"version": "0.1.0-develop.2", "version": "0.1.0-develop.3",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "@lumeweb/libs5", "name": "@lumeweb/libs5",
"version": "0.1.0-develop.2", "version": "0.1.0-develop.3",
"dependencies": { "dependencies": {
"@noble/curves": "^1.1.0", "@noble/curves": "^1.1.0",
"@noble/hashes": "^1.3.1", "@noble/hashes": "^1.3.1",

View File

@ -1,6 +1,6 @@
{ {
"name": "@lumeweb/libs5", "name": "@lumeweb/libs5",
"version": "0.1.0-develop.2", "version": "0.1.0-develop.3",
"type": "module", "type": "module",
"main": "lib/index.js", "main": "lib/index.js",
"repository": { "repository": {

View File

@ -133,3 +133,4 @@ export const storageLocationTypeArchive = 0;
export const storageLocationTypeFile = 3; export const storageLocationTypeFile = 3;
export const storageLocationTypeFull = 5; export const storageLocationTypeFull = 5;
export const storageLocationTypeBridge = 7; export const storageLocationTypeBridge = 7;
export const supportedFeatures = 3;

View File

@ -1,8 +1,14 @@
import { S5Node } from "#node.js"; import { S5Node } from "#node.js";
import type { S5NodeConfig } from "#node.js"; import type { S5NodeConfig } from "#node.js";
import type { SignedRegistryEntry } from "#service/registry.js";
export * from "./types.js"; export * from "./types.js";
export type { S5NodeConfig }; export {
createTransportSocket,
isTransport,
createTransportPeer,
} from "./transports/index.js";
export type { S5NodeConfig, SignedRegistryEntry };
export function createNode(config: S5NodeConfig) { export function createNode(config: S5NodeConfig) {
return new S5Node(config); return new S5Node(config);

View File

@ -0,0 +1,33 @@
import Packer from "#serialization/pack.js";
import { protocolMethodHandshakeDone, supportedFeatures } from "#constants.js";
import { S5Node } from "#node.js";
import { Peer } from "#types.js";
import Unpacker from "#serialization/unpack.js";
export default async function (
node: S5Node,
peer: Peer,
data: Unpacker,
rawData: Uint8Array,
) {
const p = new Packer();
p.packInt(protocolMethodHandshakeDone);
p.packBinary(data.unpackBinary());
let peerNetworkId: string | null = null;
try {
peerNetworkId = data.unpackString();
} catch {}
if (this.networkId && peerNetworkId !== this.networkId) {
throw `Peer is in different network: ${peerNetworkId}`;
}
p.packInt(supportedFeatures);
p.packInt(node.services.p2p.selfConnectionUris.length);
for (const uri of this.selfConnectionUris) {
p.packString(uri.toString());
}
// TODO Protocol version
// p.packInt(protocolVersion);
peer.sendMessage(await this.signMessageSimple(p.takeBytes()));
}

12
src/messages/index.ts Normal file
View File

@ -0,0 +1,12 @@
import { P2PMessageHandler } from "#types.js";
import handshakeOpen from "#messages/handshakeOpen.js";
const messages = new Map<number, P2PMessageHandler>(
Object.entries({
protocolMethodHandshakeOpen: handshakeOpen,
}).map(([key, value]) => [Number(key), value]),
);
Object.freeze(messages);
export default messages;

View File

@ -0,0 +1,13 @@
import { S5Node } from "#node.js";
import { Peer } from "#types.js";
import Unpacker from "#serialization/unpack.js";
export default async function (
node: S5Node,
peer: Peer,
data: Unpacker,
rawData: Uint8Array,
) {
const sre = node.services.registry.deserializeRegistryEntry(rawData);
await node.services.registry.set(sre, false, peer);
}

View File

@ -0,0 +1,16 @@
import { S5Node } from "#node.js";
import { Peer } from "#types.js";
import Unpacker from "#serialization/unpack.js";
export default async function (
node: S5Node,
peer: Peer,
data: Unpacker,
rawData: Uint8Array,
) {
const pk = data.unpackBinary();
const sre = await node.services.registry.getFromDB(pk);
if (sre !== null) {
peer.sendMessage(node.services.registry.serializeRegistryEntry(sre));
}
}

View File

@ -0,0 +1,20 @@
import { S5Node } from "#node.js";
import { Peer } from "#types.js";
import Unpacker from "#serialization/unpack.js";
import messages from "#messages/signedMessages/index.js";
export default async function (
node: S5Node,
peer: Peer,
data: Unpacker,
rawData: Uint8Array,
verifyId = true,
) {
const sm = await node.services.p2p.unpackAndVerifySignature(data);
const u = Unpacker.fromPacked(sm.message);
const method = data.unpackInt();
if (method !== null && messages.has(method)) {
await messages.get(method)?.(node, peer, u, sm, verifyId);
}
}

View File

@ -0,0 +1,39 @@
import { S5Node } from "#node.js";
import { Peer, SignedMessage } from "#types.js";
import Unpacker from "#serialization/unpack.js";
import { equalBytes } from "@noble/curves/abstract/utils";
import { URL } from "url";
import NodeId from "#nodeId.js";
export default async function (
node: S5Node,
peer: Peer,
data: Unpacker,
message: SignedMessage,
verifyId: boolean,
) {
const length = data.unpackInt() as number;
for (let i = 0; i < length; i++) {
const peerIdBinary = data.unpackBinary();
const id = new NodeId(peerIdBinary);
const isConnected = data.unpackBool() as boolean;
const connectionUrisCount = data.unpackInt() as number;
const connectionUris: URL[] = [];
for (let i = 0; i < connectionUrisCount; i++) {
connectionUris.push(new URL(data.unpackString() as string));
}
if (connectionUris.length > 0) {
// TODO Fully support multiple connection uris
const uri = new URL(connectionUris[0].toString());
uri.username = id.toBase58();
if (!this.reconnectDelay.has(NodeId.decode(uri.username).toString())) {
node.services.p2p.connectToNode([uri]);
}
}
}
}

View File

@ -0,0 +1,65 @@
import { S5Node } from "#node.js";
import { Peer, SignedMessage } from "#types.js";
import Unpacker from "#serialization/unpack.js";
import { equalBytes } from "@noble/curves/abstract/utils";
import { URL } from "url";
export default async function (
node: S5Node,
peer: Peer,
data: Unpacker,
message: SignedMessage,
verifyId: boolean,
) {
const challenge = data.unpackBinary();
if (!equalBytes(peer.challenge, challenge)) {
throw "Invalid challenge";
}
const pId = message.nodeId;
if (!verifyId) {
peer.id = pId;
} else {
if (!peer.id.equals(pId)) {
throw "Invalid transports id on initial list";
}
}
peer.isConnected = true;
const supportedFeatures = data.unpackInt();
if (supportedFeatures !== 3) {
throw "Remote node does not support required features";
}
node.services.p2p.peers.set(peer.id.toString(), peer);
node.services.p2p.reconnectDelay.set(peer.id.toString(), 1);
const connectionUrisCount = data.unpackInt() as number;
peer.connectionUris = [];
for (let i = 0; i < connectionUrisCount; i++) {
peer.connectionUris.push(new URL(data.unpackString() as string));
}
this.logger.info(
`[+] ${peer.id.toString()} (${peer.renderLocationUri().toString()})`,
);
node.services.p2p.sendPublicPeersToPeer(
peer,
Array.from(node.services.p2p.peers.values()),
);
for (const p of this._peers.values()) {
if (p.id.equals(peer.id)) {
continue;
}
if (p.isConnected) {
this.sendPublicPeersToPeer(p, [peer]);
}
}
}

View File

@ -0,0 +1,12 @@
import { P2PSignedMessageHandler } from "#types.js";
import handshakeDone from "#messages/signedMessages/handshakeDone.js";
const messages = new Map<number, P2PSignedMessageHandler>(
Object.entries({
protocolMethodHandshakeOpen: handshakeDone,
}).map(([key, value]) => [Number(key), value]),
);
Object.freeze(messages);
export default messages;

View File

@ -0,0 +1,77 @@
import { S5Node } from "#node.js";
import { Peer } from "#types.js";
import Unpacker from "#serialization/unpack.js";
import { Multihash } from "#multihash.js";
import { decodeEndian } from "#util.js";
import { mkeyEd25519 } from "#constants.js";
import { ed25519 } from "@noble/curves/ed25519";
import NodeId from "#nodeId.js";
import StorageLocation from "#storage.js";
export default async function (
node: S5Node,
peer: Peer,
data: Unpacker,
rawData: Uint8Array,
) {
const hash = new Multihash(rawData.subarray(1, 34));
const type = rawData[34];
const expiry = decodeEndian(rawData.subarray(35, 39));
const partCount = rawData[39];
const parts: string[] = [];
let cursor = 40;
for (let i = 0; i < partCount; i++) {
const length = decodeEndian(rawData.subarray(cursor, cursor + 2));
cursor += 2;
parts.push(
new TextDecoder().decode(rawData.subarray(cursor, cursor + length)),
);
cursor += length;
}
cursor++;
const publicKey = rawData.subarray(cursor, cursor + 33);
const signature = rawData.subarray(cursor + 33);
if (publicKey[0] !== mkeyEd25519) {
throw `Unsupported public key type ${mkeyEd25519}`;
}
if (
!ed25519.verify(
signature,
rawData.subarray(0, cursor),
publicKey.subarray(1),
)
) {
return;
}
const nodeId = new NodeId(publicKey);
await node.addStorageLocation({
hash,
nodeId,
location: new StorageLocation(type, parts, expiry),
message: rawData,
config: this.node.config,
});
const list = this.hashQueryRoutingTable.get(hash) || new Set<NodeId>();
for (const peerId of list) {
if (peerId.equals(nodeId)) {
continue;
}
if (peerId.equals(peer.id)) {
continue;
}
if (this._peers.has(peerId.toString())) {
try {
this._peers.get(peerId.toString())?.sendMessage(event);
} catch (e) {
this.logger.catched(e);
}
}
}
this.hashQueryRoutingTable.delete(hash);
}

View File

@ -40,11 +40,29 @@ export interface S5NodeConfig {
export class S5Node { export class S5Node {
private _nodeConfig: S5NodeConfig; private _nodeConfig: S5NodeConfig;
private _config?: S5Config;
constructor(config: S5NodeConfig) { constructor(config: S5NodeConfig) {
this._nodeConfig = config; this._nodeConfig = config;
} }
private _config?: S5Config;
get config() {
return this._config as S5Config;
}
get services() {
return this._config?.services as S5Services;
}
get db() {
return this._config?.db as AbstractLevel<Uint8Array, string, Uint8Array>;
}
get logger() {
return this._config?.logger as Logger;
}
public async start() { public async start() {
this._config = { this._config = {
keyPair: this._nodeConfig.keyPair, keyPair: this._nodeConfig.keyPair,
@ -62,29 +80,11 @@ export class S5Node {
await p2p.start(); await p2p.start();
} }
get services() { async readStorageLocationsFromDB(
return this._config?.services as S5Services; hash: Multihash,
} ): Promise<Map<number, Map<string, Map<number, any>>>> {
get config() { const map = new Map<number, Map<string, Map<number, any>>>();
return this._config as S5Config; const bytes = await this.db.get(stringifyHash(hash));
}
get db() {
return this._config?.db as AbstractLevel<Uint8Array, string, Uint8Array>;
}
get logger() {
return this._config?.logger as Logger;
}
}
export async function readStorageLocationsFromDB({
hash,
config,
}: {
hash: Multihash;
config: S5Config;
}): Promise<Map<number, Map<NodeId, Map<number, any>>>> {
const map = new Map<number, Map<NodeId, Map<number, any>>>();
const bytes = await config.db.get(stringifyHash(hash));
if (bytes === null) { if (bytes === null) {
return map; return map;
} }
@ -92,33 +92,36 @@ export async function readStorageLocationsFromDB({
const mapLength = unpacker.unpackMapLength(); const mapLength = unpacker.unpackMapLength();
for (let i = 0; i < mapLength; i++) { for (let i = 0; i < mapLength; i++) {
const type = unpacker.unpackInt() as number; const type = unpacker.unpackInt() as number;
const innerMap = new Map<NodeId, Map<number, any>>(); const innerMap = new Map<string, Map<number, any>>();
map.set(type, innerMap); map.set(type, innerMap);
const innerMapLength = unpacker.unpackMapLength(); const innerMapLength = unpacker.unpackMapLength();
for (let j = 0; j < innerMapLength; j++) { for (let j = 0; j < innerMapLength; j++) {
const nodeId = new NodeId(unpacker.unpackBinary()); const nodeId = new NodeId(unpacker.unpackBinary());
innerMap.set(nodeId, new Map(unpacker.unpackMap() as [number, any][])); innerMap.set(
nodeId.toString(),
new Map(unpacker.unpackMap() as [number, any][]),
);
} }
} }
return map; return map;
} }
export async function addStorageLocation({ async addStorageLocation({
hash, hash,
nodeId, nodeId,
location, location,
message, message,
config, config,
}: { }: {
hash: Multihash; hash: Multihash;
nodeId: NodeId; nodeId: NodeId;
location: StorageLocation; location: StorageLocation;
message?: Uint8Array; message?: Uint8Array;
config: S5Config; config: S5Config;
}) { }) {
const map = this.readStorageLocationsFromDB(hash); const map = await this.readStorageLocationsFromDB(hash);
const innerMap = const innerMap =
map.get(location.type) || new Map<NodeId, Map<number, any>>(); map.get(location.type) || new Map<string, Map<number, any>>();
map.set(location.type, innerMap); map.set(location.type, innerMap);
const locationMap = new Map<number, any>([ const locationMap = new Map<number, any>([
@ -128,11 +131,12 @@ export async function addStorageLocation({
[4, message], [4, message],
]); ]);
innerMap.set(nodeId, locationMap); innerMap.set(nodeId.toString(), locationMap);
await config.cacheDb.put( await config.cacheDb.put(
stringifyHash(hash), stringifyHash(hash),
new Packer().pack(map).takeBytes(), new Packer().pack(map).takeBytes(),
); );
}
} }
export function stringifyBytes(data: Uint8Array) { export function stringifyBytes(data: Uint8Array) {

View File

@ -1,41 +1,37 @@
import { Multihash } from "../multihash.js"; import { Multihash } from "../multihash.js";
import NodeId from "../nodeId.js"; import NodeId from "../nodeId.js";
import { equalBytes } from "@noble/curves/abstract/utils"; import { Logger, Peer, SignedMessage } from "../types.js";
import { Logger, Peer, S5Config, SignedMessage } from "../types.js";
import KeyPairEd25519 from "../ed25519.js"; import KeyPairEd25519 from "../ed25519.js";
import * as crypto from "crypto"; import * as crypto from "crypto";
import { import {
mkeyEd25519,
protocolMethodAnnouncePeers, protocolMethodAnnouncePeers,
protocolMethodHandshakeDone,
protocolMethodHandshakeOpen, protocolMethodHandshakeOpen,
protocolMethodHashQuery, protocolMethodHashQuery,
protocolMethodSignedMessage, protocolMethodSignedMessage,
recordTypeRegistryEntry,
recordTypeStorageLocation, recordTypeStorageLocation,
storageLocationTypeFull, storageLocationTypeFull,
} from "../constants.js"; } from "../constants.js";
import defer from "p-defer"; import defer from "p-defer";
import { calculateScore, decodeEndian, encodeEndian } from "#util.js"; import { calculateScore, encodeEndian } from "#util.js";
import Packer from "#serialization/pack.js"; import Packer from "#serialization/pack.js";
import Unpacker from "#serialization/unpack.js"; import Unpacker from "#serialization/unpack.js";
import { ed25519 } from "@noble/curves/ed25519"; import { ed25519 } from "@noble/curves/ed25519";
import { AbstractLevel, AbstractSublevel } from "abstract-level"; import { AbstractLevel, AbstractSublevel } from "abstract-level";
import StorageLocation from "#storage.js"; import StorageLocation from "#storage.js";
import { addStorageLocation, S5Node, stringifyNode } from "#node.js"; import { S5Node, stringifyNode } from "#node.js";
import { URL } from "url"; import { URL } from "url";
import { Buffer } from "buffer"; import { Buffer } from "buffer";
import { connect as tcpConnect, TcpPeer } from "../peer/tcp.js"; import {
import { connect as wsConnect, WebSocketPeer } from "../peer/webSocket.js"; createTransportPeer,
createTransportSocket,
} from "#transports/index.js";
import messages from "#messages/index.js";
export class P2PService { export class P2PService {
private node: S5Node;
private logger: Logger; private logger: Logger;
private nodeKeyPair: KeyPairEd25519; private nodeKeyPair: KeyPairEd25519;
private localNodeId?: NodeId; private localNodeId?: NodeId;
private networkId?: string; private networkId?: string;
private reconnectDelay: Map<string, number> = new Map();
private selfConnectionUris: Array<URL> = [];
private nodesDb?: AbstractSublevel< private nodesDb?: AbstractSublevel<
AbstractLevel<Uint8Array, string, Uint8Array>, AbstractLevel<Uint8Array, string, Uint8Array>,
Uint8Array, Uint8Array,
@ -45,7 +41,7 @@ export class P2PService {
private hashQueryRoutingTable: Map<Multihash, Set<NodeId>> = new Map(); private hashQueryRoutingTable: Map<Multihash, Set<NodeId>> = new Map();
constructor(node: S5Node) { constructor(node: S5Node) {
this.node = node; this._node = node;
this.networkId = node.config.p2p?.network; this.networkId = node.config.p2p?.network;
this.nodeKeyPair = node.config.keyPair; this.nodeKeyPair = node.config.keyPair;
this.logger = node.logger; this.logger = node.logger;
@ -53,6 +49,24 @@ export class P2PService {
node.config.services.p2p = this; node.config.services.p2p = this;
} }
private _node: S5Node;
get node(): S5Node {
return this._node;
}
private _reconnectDelay: Map<string, number> = new Map();
get reconnectDelay(): Map<string, number> {
return this._reconnectDelay;
}
private _selfConnectionUris: Array<URL> = [];
get selfConnectionUris(): Array<URL> {
return this._selfConnectionUris;
}
private _peers: Map<string, Peer> = new Map(); private _peers: Map<string, Peer> = new Map();
get peers(): Map<string, Peer> { get peers(): Map<string, Peer> {
@ -61,11 +75,11 @@ export class P2PService {
async init(): Promise<void> { async init(): Promise<void> {
this.localNodeId = new NodeId(this.nodeKeyPair.publicKey); // Define the NodeId constructor this.localNodeId = new NodeId(this.nodeKeyPair.publicKey); // Define the NodeId constructor
this.nodesDb = this.node.db.sublevel<string, Uint8Array>("s5-nodes", {}); this.nodesDb = this._node.db.sublevel<string, Uint8Array>("s5-nodes", {});
} }
async start(): Promise<void> { async start(): Promise<void> {
const initialPeers = this.node.config?.p2p?.peers?.initial || []; const initialPeers = this._node.config?.p2p?.peers?.initial || [];
for (const p of initialPeers) { for (const p of initialPeers) {
this.connectToNode([new URL(p)]); this.connectToNode([new URL(p)]);
@ -84,196 +98,14 @@ export class P2PService {
const completer = defer<void>(); const completer = defer<void>();
const supportedFeatures = 3; // 0b00000011
peer.listenForMessages( peer.listenForMessages(
async (event: Uint8Array) => { async (event: Uint8Array) => {
let u = Unpacker.fromPacked(event); let u = Unpacker.fromPacked(event);
const method = u.unpackInt(); const method = u.unpackInt();
if (method === protocolMethodHandshakeOpen) {
const p = new Packer();
p.packInt(protocolMethodHandshakeDone);
p.packBinary(u.unpackBinary());
let peerNetworkId: string | null = null;
try {
peerNetworkId = u.unpackString();
} catch {}
if (this.networkId && peerNetworkId !== this.networkId) { if (method !== null && messages.has(method)) {
throw `Peer is in different network: ${peerNetworkId}`; await messages.get(method)?.(this.node, peer, u, event, verifyId);
} }
p.packInt(supportedFeatures);
p.packInt(this.selfConnectionUris.length);
for (const uri of this.selfConnectionUris) {
p.packString(uri.toString());
}
// TODO Protocol version
// p.packInt(protocolVersion);
peer.sendMessage(await this.signMessageSimple(p.takeBytes()));
return;
} else if (method === recordTypeRegistryEntry) {
const sre =
this.node.services.registry.deserializeRegistryEntry(event);
await this.node.services.registry.set(sre, false, peer);
return;
} else if (method === recordTypeStorageLocation) {
const hash = new Multihash(event.subarray(1, 34));
const type = event[34];
const expiry = decodeEndian(event.subarray(35, 39));
const partCount = event[39];
const parts: string[] = [];
let cursor = 40;
for (let i = 0; i < partCount; i++) {
const length = decodeEndian(event.subarray(cursor, cursor + 2));
cursor += 2;
parts.push(
new TextDecoder().decode(event.subarray(cursor, cursor + length)),
);
cursor += length;
}
cursor++;
const publicKey = event.subarray(cursor, cursor + 33);
const signature = event.subarray(cursor + 33);
if (publicKey[0] !== mkeyEd25519) {
throw `Unsupported public key type ${mkeyEd25519}`;
}
if (
!ed25519.verify(
signature,
event.subarray(0, cursor),
publicKey.subarray(1),
)
) {
return;
}
const nodeId = new NodeId(publicKey);
await addStorageLocation({
hash,
nodeId,
location: new StorageLocation(type, parts, expiry),
message: event,
config: this.node.config,
});
const list =
this.hashQueryRoutingTable.get(hash) || new Set<NodeId>();
for (const peerId of list) {
if (peerId.equals(nodeId)) {
continue;
}
if (peerId.equals(peer.id)) {
continue;
}
if (this._peers.has(peerId.toString())) {
try {
this._peers.get(peerId.toString())?.sendMessage(event);
} catch (e) {
this.logger.catched(e);
}
}
}
this.hashQueryRoutingTable.delete(hash);
}
if (method === protocolMethodSignedMessage) {
const sm = await this.unpackAndVerifySignature(u);
u = Unpacker.fromPacked(sm.message);
const method2 = u.unpackInt();
if (method2 === protocolMethodHandshakeDone) {
const challenge = u.unpackBinary();
if (!equalBytes(peer.challenge, challenge)) {
throw "Invalid challenge";
}
const pId = sm.nodeId;
if (!verifyId) {
peer.id = pId;
} else {
if (!peer.id.equals(pId)) {
throw "Invalid peer id on initial list";
}
}
peer.isConnected = true;
const supportedFeatures = u.unpackInt();
if (supportedFeatures !== 3) {
throw "Remote node does not support required features";
}
this._peers.set(peer.id.toString(), peer);
this.reconnectDelay.set(peer.id.toString(), 1);
const connectionUrisCount = u.unpackInt() as number;
peer.connectionUris = [];
for (let i = 0; i < connectionUrisCount; i++) {
peer.connectionUris.push(new URL(u.unpackString() as string));
}
this.logger.info(
`[+] ${peer.id.toString()} (${peer
.renderLocationUri()
.toString()})`,
);
this.sendPublicPeersToPeer(peer, Array.from(this._peers.values()));
for (const p of this._peers.values()) {
if (p.id.equals(peer.id)) continue;
if (p.isConnected) {
this.sendPublicPeersToPeer(p, [peer]);
}
}
return;
} else if (method2 === protocolMethodAnnouncePeers) {
const length = u.unpackInt() as number;
for (let i = 0; i < length; i++) {
const peerIdBinary = u.unpackBinary();
const id = new NodeId(peerIdBinary);
const isConnected = u.unpackBool() as boolean;
const connectionUrisCount = u.unpackInt() as number;
const connectionUris: URL[] = [];
for (let i = 0; i < connectionUrisCount; i++) {
connectionUris.push(new URL(u.unpackString() as string));
}
if (connectionUris.length > 0) {
// TODO Fully support multiple connection uris
const uri = new URL(connectionUris[0].toString());
uri.username = id.toBase58();
if (
!this.reconnectDelay.has(
NodeId.decode(uri.username).toString(),
)
) {
this.connectToNode([uri]);
}
}
}
}
} /* else if (method === protocolMethodRegistryQuery) {
const pk = u.unpackBinary();
const sre = node.registry.getFromDB(pk);
if (sre !== null) {
peer.sendMessage(node.registry.serializeRegistryEntry(sre));
}
}*/
}, },
{ {
onDone: async () => { onDone: async () => {
@ -456,7 +288,7 @@ export class P2PService {
); );
} }
const protocol = connectionUri.protocol; const protocol = connectionUri.protocol.replace(":", "");
if (!connectionUri.username) { if (!connectionUri.username) {
throw new Error("Connection URI does not contain node id"); throw new Error("Connection URI does not contain node id");
@ -464,9 +296,9 @@ export class P2PService {
const id = NodeId.decode(connectionUri.username); const id = NodeId.decode(connectionUri.username);
this.reconnectDelay.set( this._reconnectDelay.set(
id.toString(), id.toString(),
this.reconnectDelay.get(id.toString()) || 1, this._reconnectDelay.get(id.toString()) || 1,
); );
if (id.equals(this.localNodeId)) { if (id.equals(this.localNodeId)) {
@ -475,21 +307,12 @@ export class P2PService {
try { try {
this.logger.verbose(`[connect] ${connectionUri}`); this.logger.verbose(`[connect] ${connectionUri}`);
if (protocol === "tcp:") {
const ip = connectionUri.hostname;
const port = parseInt(connectionUri.port);
const socket = await tcpConnect(port, ip);
const peer = new TcpPeer(socket, [connectionUri]); const socket = await createTransportSocket(protocol, connectionUri);
peer.id = id; await this.onNewPeer(
createTransportPeer(protocol, socket, [connectionUri]),
await this.onNewPeer(peer, true); true,
} else { );
const channel = await wsConnect(connectionUri.toString());
const peer = new WebSocketPeer(channel, [connectionUri]);
peer.id = id;
await this.onNewPeer(peer, true);
}
} catch (e) { } catch (e) {
if (retried) { if (retried) {
return; return;
@ -498,8 +321,8 @@ export class P2PService {
this.logger.catched(e); this.logger.catched(e);
const delay = this.reconnectDelay.get(id.toString())!; const delay = this._reconnectDelay.get(id.toString())!;
this.reconnectDelay.set(id.toString(), delay * 2); this._reconnectDelay.set(id.toString(), delay * 2);
await new Promise((resolve) => setTimeout(resolve, delay * 1000)); await new Promise((resolve) => setTimeout(resolve, delay * 1000));
await this.connectToNode(connectionUris, retried); await this.connectToNode(connectionUris, retried);

View File

@ -16,7 +16,7 @@ import { EventEmitter } from "events";
import KeyPairEd25519 from "#ed25519.js"; import KeyPairEd25519 from "#ed25519.js";
import { S5Node, stringifyBytes } from "#node.js"; import { S5Node, stringifyBytes } from "#node.js";
interface SignedRegistryEntry { export interface SignedRegistryEntry {
pk: Uint8Array; // public key with multicodec prefix pk: Uint8Array; // public key with multicodec prefix
revision: number; // revision number of this entry, maximum is (256^8)-1 revision: number; // revision number of this entry, maximum is (256^8)-1
data: Uint8Array; // data stored in this entry, can have a maximum length of 48 bytes data: Uint8Array; // data stored in this entry, can have a maximum length of 48 bytes
@ -68,7 +68,7 @@ export class RegistryService {
throw new Error("Data too long"); throw new Error("Data too long");
} }
const isValid = verifyRegistryEntry(sre); const isValid = this.verifyRegistryEntry(sre);
if (!isValid) { if (!isValid) {
throw new Error("Invalid signature found"); throw new Error("Invalid signature found");
} }
@ -81,7 +81,7 @@ export class RegistryService {
if (existingEntry.revision === sre.revision) { if (existingEntry.revision === sre.revision) {
return; return;
} else if (existingEntry.revision > sre.revision) { } else if (existingEntry.revision > sre.revision) {
const updateMessage = serializeRegistryEntry(existingEntry); const updateMessage = this.serializeRegistryEntry(existingEntry);
receivedFrom.sendMessage(updateMessage); receivedFrom.sendMessage(updateMessage);
return; return;
} }
@ -95,7 +95,7 @@ export class RegistryService {
const key = new Multihash(sre.pk); const key = new Multihash(sre.pk);
this.streams.get(key.toString())?.emit("event", sre); this.streams.get(key.toString())?.emit("event", sre);
this.db?.put(stringifyBytes(sre.pk), serializeRegistryEntry(sre)); this.db?.put(stringifyBytes(sre.pk), this.serializeRegistryEntry(sre));
this.broadcastEntry(sre, receivedFrom); this.broadcastEntry(sre, receivedFrom);
} }
@ -104,7 +104,7 @@ export class RegistryService {
// TODO: If there are more than X peers, only broadcast to subscribed nodes (routing table) and shard-nodes (256) // TODO: If there are more than X peers, only broadcast to subscribed nodes (routing table) and shard-nodes (256)
broadcastEntry(sre: SignedRegistryEntry, receivedFrom?: Peer): void { broadcastEntry(sre: SignedRegistryEntry, receivedFrom?: Peer): void {
this.logger.verbose("[registry] broadcastEntry"); this.logger.verbose("[registry] broadcastEntry");
const updateMessage = serializeRegistryEntry(sre); const updateMessage = this.serializeRegistryEntry(sre);
for (const p of Object.values(this.node.services.p2p.peers)) { for (const p of Object.values(this.node.services.p2p.peers)) {
if (receivedFrom == null || p.id !== receivedFrom.id) { if (receivedFrom == null || p.id !== receivedFrom.id) {
@ -216,9 +216,8 @@ export class RegistryService {
signature: event.slice(43 + dataLength), signature: event.slice(43 + dataLength),
}; };
} }
}
function verifyRegistryEntry(sre: SignedRegistryEntry): boolean { public verifyRegistryEntry(sre: SignedRegistryEntry): boolean {
const list: Uint8Array = Uint8Array.from([ const list: Uint8Array = Uint8Array.from([
recordTypeRegistryEntry, recordTypeRegistryEntry,
...encodeEndian(sre.revision, 8), ...encodeEndian(sre.revision, 8),
@ -227,8 +226,8 @@ function verifyRegistryEntry(sre: SignedRegistryEntry): boolean {
]); ]);
return ed25519.verify(list, sre.signature, sre.pk.slice(1)); return ed25519.verify(list, sre.signature, sre.pk.slice(1));
} }
function serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array { public serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array {
return Uint8Array.from([ return Uint8Array.from([
recordTypeRegistryEntry, recordTypeRegistryEntry,
...sre.pk, ...sre.pk,
@ -237,4 +236,5 @@ function serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array {
...sre.data, ...sre.data,
...sre.signature, ...sre.signature,
]); ]);
}
} }

42
src/transports/index.ts Normal file
View File

@ -0,0 +1,42 @@
import { URL } from "url";
import { TcpPeer } from "#transports/tcp.js";
import { WebSocketPeer } from "#transports/webSocket.js";
import { PeerStatic } from "#types.js";
const transports = new Map<string, PeerStatic>();
export function registerTransport(type: string, transport: PeerStatic) {
transports.set(type, transport);
}
export function isTransport(type: string) {
return transports.has(type);
}
export function createTransportSocket(type: string, uri: URL) {
if (!isTransport(type)) {
throw new Error(`transport ${type} does not exist`);
}
const transport = transports.get(type) as PeerStatic;
return transport.connect(uri);
}
export function createTransportPeer(
type: string,
socket: any,
connectionUris: URL[] = [],
) {
if (!isTransport(type)) {
throw new Error(`transport ${type} does not exist`);
}
const transport = transports.get(type) as PeerStatic;
return new transport(socket, connectionUris);
}
registerTransport("tcp", TcpPeer);
registerTransport("ws", WebSocketPeer);
registerTransport("wss", WebSocketPeer);

View File

@ -3,6 +3,7 @@ import NodeId from "../nodeId.js";
import * as net from "net"; import * as net from "net";
import { URL } from "url"; import { URL } from "url";
import { decodeEndian } from "../util.js"; import { decodeEndian } from "../util.js";
import * as console from "console";
export class TcpPeer implements Peer { export class TcpPeer implements Peer {
connectionUris: Array<URL>; connectionUris: Array<URL>;
@ -79,9 +80,10 @@ export class TcpPeer implements Peer {
this._socket.on("error", onError); this._socket.on("error", onError);
} }
} }
}
export async function connect(port: number, host: string): Promise<net.Socket> { public static async connect(uri: URL): Promise<net.Socket> {
const host = uri.hostname;
const port = parseInt(uri.port);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const socket = net.connect(port, host, () => { const socket = net.connect(port, host, () => {
resolve(socket); resolve(socket);
@ -90,4 +92,5 @@ export async function connect(port: number, host: string): Promise<net.Socket> {
reject(err); reject(err);
}); });
}); });
}
} }

View File

@ -56,8 +56,8 @@ export class WebSocketPeer implements Peer {
this._socket.addEventListener("error", onError); this._socket.addEventListener("error", onError);
} }
} }
}
export async function connect(uri: string): Promise<WebSocket> { public static async connect(uri: URL): Promise<WebSocket> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const socket = new WebSocket(uri); const socket = new WebSocket(uri);
socket.addEventListener("open", () => { socket.addEventListener("open", () => {
@ -67,4 +67,5 @@ export async function connect(uri: string): Promise<WebSocket> {
reject(err); reject(err);
}); });
}); });
}
} }

View File

@ -3,13 +3,14 @@ import KeyPairEd25519 from "#ed25519.js";
import { AbstractLevel } from "abstract-level"; import { AbstractLevel } from "abstract-level";
import { P2PService } from "./service/p2p.js"; import { P2PService } from "./service/p2p.js";
import { RegistryService } from "./service/registry.js"; import { RegistryService } from "./service/registry.js";
import { S5Node } from "#node.js";
import Unpacker from "#serialization/unpack.js";
export interface Peer { export interface Peer {
id: NodeId; id: NodeId;
connectionUris: Array<URL>; connectionUris: Array<URL>;
isConnected: boolean; isConnected: boolean;
challenge: Uint8Array; challenge: Uint8Array;
sendMessage(message: Uint8Array): void; sendMessage(message: Uint8Array): void;
listenForMessages( listenForMessages(
@ -28,6 +29,12 @@ export interface Peer {
renderLocationUri(): string; renderLocationUri(): string;
} }
// Define the static side of the class
export interface PeerStatic {
new (_socket: any, uri: URL[]): Peer;
connect(uri: URL): Promise<any>;
}
export interface Logger { export interface Logger {
info(s: string): void; info(s: string): void;
verbose(s: string): void; verbose(s: string): void;
@ -58,3 +65,19 @@ export interface SignedMessage {
nodeId: NodeId; nodeId: NodeId;
message: Uint8Array; message: Uint8Array;
} }
export type P2PMessageHandler = (
node: S5Node,
peer: Peer,
data: Unpacker,
rawData: Uint8Array,
verifyId: boolean,
) => Promise<void>;
export type P2PSignedMessageHandler = (
node: S5Node,
peer: Peer,
data: Unpacker,
message: SignedMessage,
verifyId: boolean,
) => Promise<void>;