From a189fab1bec44b47b8e16341568a602cc7383c56 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Thu, 31 Aug 2023 03:29:28 -0400 Subject: [PATCH] refactor: use a simple S5Node object to coordinate everything with helper getters --- src/node.ts | 73 +++++++++++++++++++++++++++++++++++++- src/service/p2p.ts | 78 ++++++++++++++++++++--------------------- src/service/registry.ts | 16 ++++----- src/types.ts | 10 +++--- 4 files changed, 125 insertions(+), 52 deletions(-) diff --git a/src/node.ts b/src/node.ts index ac58042..1e94177 100644 --- a/src/node.ts +++ b/src/node.ts @@ -1,9 +1,80 @@ import { Multihash } from "./multihash.js"; import NodeId from "./nodeId.js"; -import { S5Config } from "./types.js"; +import { Logger, S5Config, S5Services } from "./types.js"; import Unpacker from "./serialization/unpack.js"; import Packer from "./serialization/pack.js"; import StorageLocation from "./storage.js"; +import KeyPairEd25519 from "#ed25519.js"; +import { AbstractLevel } from "abstract-level"; +import { P2PService } from "#service/p2p.js"; +import { RegistryService } from "#service/registry.js"; +const DEFAULT_LOGGER = { + info(s: any) { + console.info(s); + }, + verbose(s: any) { + console.log(s); + }, + warn(s: any) { + console.warn(s); + }, + error(s: any) { + console.error(s); + }, + catched(e: any, context?: string | null) { + console.error(e, context); + }, +}; + +interface S5NodeConfig { + p2p?: { + network: string; + peers?: { + initial?: string[]; + }; + }; + keyPair: KeyPairEd25519; + db: AbstractLevel; + logger?: Logger; +} + +export class S5Node { + private _nodeConfig: S5NodeConfig; + private _config?: S5Config; + constructor(config: S5NodeConfig) { + this._nodeConfig = config; + } + + public async start() { + this._config = { + keyPair: this._nodeConfig.keyPair, + db: this._nodeConfig.db, + logger: this._nodeConfig.logger ?? DEFAULT_LOGGER, + cacheDb: this._nodeConfig.db.sublevel("s5-object-cache", {}), + services: {} as any, + }; + + const p2p = new P2PService(this); + const registry = new RegistryService(this); + + await p2p.init(); + await registry.init(); + await p2p.start(); + } + + get services() { + return this._config?.services as S5Services; + } + get config() { + return this._config as S5Config; + } + get db() { + return this._config?.db as AbstractLevel; + } + get logger() { + return this._config?.logger as Logger; + } +} export async function readStorageLocationsFromDB({ hash, diff --git a/src/service/p2p.ts b/src/service/p2p.ts index 53eabac..2d05856 100644 --- a/src/service/p2p.ts +++ b/src/service/p2p.ts @@ -22,23 +22,18 @@ import Unpacker from "#serialization/unpack.js"; import { ed25519 } from "@noble/curves/ed25519"; import { AbstractLevel, AbstractSublevel } from "abstract-level"; import StorageLocation from "#storage.js"; -import { addStorageLocation, stringifyNode } from "#node.js"; +import { addStorageLocation, S5Node, stringifyNode } from "#node.js"; import { URL } from "url"; import { Buffer } from "buffer"; import { connect as tcpConnect, TcpPeer } from "../peer/tcp.js"; import { connect as wsConnect, WebSocketPeer } from "../peer/webSocket.js"; export class P2PService { - get peers(): Map { - return this._peers; - } - - private config: S5Config; + private node: S5Node; private logger: Logger; private nodeKeyPair: KeyPairEd25519; private localNodeId?: NodeId; private networkId?: string; - private _peers: Map = new Map(); private reconnectDelay: Map = new Map(); private selfConnectionUris: Array = []; private nodesDb?: AbstractSublevel< @@ -47,25 +42,30 @@ export class P2PService { string, Uint8Array >; - private hashQueryRoutingTable: Map> = new Map(); - constructor(config: S5Config) { - this.config = config; - this.networkId = config?.p2p?.network; - this.nodeKeyPair = config.keyPair; - this.logger = config.logger; + constructor(node: S5Node) { + this.node = node; + this.networkId = node.config.p2p?.network; + this.nodeKeyPair = node.config.keyPair; + this.logger = node.logger; - config.services.p2p = this; + node.config.services.p2p = this; + } + + private _peers: Map = new Map(); + + get peers(): Map { + return this._peers; } async init(): Promise { this.localNodeId = new NodeId(this.nodeKeyPair.publicKey); // Define the NodeId constructor - this.nodesDb = this.config.db.sublevel("s5-nodes", {}); + this.nodesDb = this.node.db.sublevel("s5-nodes", {}); } async start(): Promise { - const initialPeers = this.config?.p2p?.peers?.initial || []; + const initialPeers = this.node.config?.p2p?.peers?.initial || []; for (const p of initialPeers) { this.connectToNode([new URL(p)]); @@ -114,8 +114,8 @@ export class P2PService { return; } else if (method === recordTypeRegistryEntry) { const sre = - this.config.services.registry.deserializeRegistryEntry(event); - await this.config.services.registry.set(sre, false, peer); + this.node.services.registry.deserializeRegistryEntry(event); + await this.node.services.registry.set(sre, false, peer); return; } else if (method === recordTypeStorageLocation) { const hash = new Multihash(event.subarray(1, 34)); @@ -157,7 +157,7 @@ export class P2PService { nodeId, location: new StorageLocation(type, parts, expiry), message: event, - config: this.config, + config: this.node.config, }); const list = @@ -361,26 +361,6 @@ export class P2PService { return calculateScore(map.get(1), map.get(2)); } - private async _vote(nodeId: NodeId, upvote: boolean): Promise { - const node = await this.nodesDb?.get(stringifyNode(nodeId)); - const map = node - ? Unpacker.fromPacked(node).unpackMap() - : new Map( - Object.entries({ 1: 0, 2: 0 }).map(([k, v]) => [+k, v]), - ); - - if (upvote) { - map.set(1, (map.get(1) ?? 0) + 1); - } else { - map.set(2, (map.get(2) ?? 0) + 1); - } - - await this.nodesDb?.put( - stringifyNode(nodeId), - new Packer().pack(map).takeBytes(), - ); - } - async upvote(nodeId: NodeId): Promise { await this._vote(nodeId, true); } @@ -525,4 +505,24 @@ export class P2PService { await this.connectToNode(connectionUris, retried); } } + + private async _vote(nodeId: NodeId, upvote: boolean): Promise { + const node = await this.nodesDb?.get(stringifyNode(nodeId)); + const map = node + ? Unpacker.fromPacked(node).unpackMap() + : new Map( + Object.entries({ 1: 0, 2: 0 }).map(([k, v]) => [+k, v]), + ); + + if (upvote) { + map.set(1, (map.get(1) ?? 0) + 1); + } else { + map.set(2, (map.get(2) ?? 0) + 1); + } + + await this.nodesDb?.put( + stringifyNode(nodeId), + new Packer().pack(map).takeBytes(), + ); + } } diff --git a/src/service/registry.ts b/src/service/registry.ts index dcbf1f2..3708fb7 100644 --- a/src/service/registry.ts +++ b/src/service/registry.ts @@ -14,7 +14,7 @@ import Packer from "#serialization/pack.js"; import { Buffer } from "buffer"; import { EventEmitter } from "events"; import KeyPairEd25519 from "#ed25519.js"; -import { stringifyBytes } from "#node.js"; +import { S5Node, stringifyBytes } from "#node.js"; interface SignedRegistryEntry { pk: Uint8Array; // public key with multicodec prefix @@ -30,17 +30,17 @@ export class RegistryService { string, Uint8Array >; - private config: S5Config; + private node: S5Node; private logger: Logger; private streams: Map = new Map(); - constructor(config: S5Config) { - this.config = config; - this.logger = this.config.logger; + constructor(node: S5Node) { + this.node = node; + this.logger = this.node.logger; } async init(): Promise { - this.db = this.config.db.sublevel("s5-registry-db", {}); + this.db = this.node.db.sublevel("s5-registry-db", {}); } async set( @@ -106,7 +106,7 @@ export class RegistryService { this.logger.verbose("[registry] broadcastEntry"); const updateMessage = serializeRegistryEntry(sre); - for (const p of Object.values(this.config.services.p2p.peers)) { + for (const p of Object.values(this.node.services.p2p.peers)) { if (receivedFrom == null || p.id !== receivedFrom.id) { p.sendMessage(updateMessage); } @@ -122,7 +122,7 @@ export class RegistryService { const req = p.takeBytes(); // TODO: Use shard system if there are more than X peers - for (const peer of Object.values(this.config.services.p2p.peers)) { + for (const peer of Object.values(this.node.services.p2p.peers)) { peer.sendMessage(req); } } diff --git a/src/types.ts b/src/types.ts index cc3b290..f765b1c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -36,6 +36,11 @@ export interface Logger { catched(e: any, context?: string | null): void; } +export interface S5Services { + p2p: P2PService; + registry: RegistryService; +} + export interface S5Config { p2p?: { network: string; @@ -47,10 +52,7 @@ export interface S5Config { logger: Logger; db: AbstractLevel; cacheDb: AbstractLevel; - services: { - p2p: P2PService; - registry: RegistryService; - }; + services: S5Services; } export interface SignedMessage { nodeId: NodeId;