From 2ebfcd13634931712231d44e90992c8bc3f7d013 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Thu, 5 Jan 2023 20:31:14 -0500 Subject: [PATCH] *Major refactor to support pluggable backends --- package.json | 5 +- rollup.config.js | 12 +-- src/index.ts | 158 +++++++++++++++++---------------- src/storage/backends/lmdb.ts | 25 ++++++ src/storage/backends/memory.ts | 15 ++++ src/storage/factory.ts | 18 ++++ src/storage/index.ts | 8 ++ src/types.ts | 7 ++ 8 files changed, 164 insertions(+), 84 deletions(-) create mode 100644 src/storage/backends/lmdb.ts create mode 100644 src/storage/backends/memory.ts create mode 100644 src/storage/factory.ts create mode 100644 src/storage/index.ts diff --git a/package.json b/package.json index 0805b70..0c55aad 100644 --- a/package.json +++ b/package.json @@ -11,14 +11,15 @@ "@lumeweb/relay-types": "https://git.lumeweb.com/LumeWeb/relay-types.git", "@noble/ed25519": "^1.7.1", "b4a": "^1.6.1", - "node-cache": "^5.1.2", - "object-merger": "^1.0.3" + "lmdb": "^2.7.3", + "node-cache": "^5.1.2" }, "devDependencies": { "@lumeweb/cfg": "https://git.lumeweb.com/LumeWeb/cfg.git", "@protobuf-ts/plugin": "^2.8.2", "@protobuf-ts/runtime": "^2.8.2", "@types/node": "^18.11.18", + "hyperswarm": "^4.3.5", "prettier": "^2.8.1" } } diff --git a/rollup.config.js b/rollup.config.js index 5746c1c..5620e3e 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -1,13 +1,13 @@ -import { defineConfig } from "rollup"; import preset from "@lumeweb/relay-plugin-rollup-preset"; -import merger from "object-merger"; - -export default defineConfig( - merger(preset(), { +export default preset( + { input: "src/index.ts", output: { file: "dist/registry.js", format: "cjs", + inlineDynamicImports: true, }, - }) + }, + {}, + { ignore: ["bun:ffi"], transformMixedEsModules: true } ); diff --git a/src/index.ts b/src/index.ts index 31ad686..a378e85 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,18 +1,18 @@ import type { Plugin, PluginAPI } from "@lumeweb/relay-types"; import DHTFlood from "@lumeweb/dht-flood"; -import { Message, Query, MessageType } from "./messages.js"; +import { Message, MessageType, Query } from "./messages.js"; import EventEmitter from "events"; -import NodeCache from "node-cache"; import b4a from "b4a"; -import { SignedRegistryEntry } from "./types.js"; +import { RegistryStorage, SignedRegistryEntry } from "./types.js"; import { verifyEntry } from "./utils.js"; +import { getStorage, hasStorage } from "./storage/index.js"; const PROTOCOL = "lumeweb.registry"; const events = new EventEmitter(); let messenger: DHTFlood; let api: PluginAPI; -let memStore: NodeCache; +let store: RegistryStorage; function setup() { messenger = new DHTFlood({ @@ -45,6 +45,11 @@ function setup() { }); } +function setupEventHandlers() { + events.on("create", handleCreate); + events.on("query", handleQuery); +} + function entryFromMessage(message: Message): SignedRegistryEntry { return { pk: message.pubkey, @@ -68,83 +73,25 @@ function sendDirectOrBroadcast(message: Message, pubkey: Buffer) { async function getEntry( pubkey: Uint8Array ): Promise { - let pubkeyHex = b4a.from(pubkey).toString("hex"); - if (memStore) { - return await memStore.get(pubkeyHex); - } - - return false; + return store.get(b4a.from(pubkey).toString("hex")); } -async function setEntry(entry: SignedRegistryEntry): Promise { - let pubkeyHex = b4a.from(entry.pk).toString("hex"); - if (memStore) { - return await memStore.set(pubkeyHex, entry); - } +async function handleCreate(message: Message, origin: Buffer) { + { + const setEntry = async (entry: SignedRegistryEntry): Promise => { + let pubkeyHex = b4a.from(entry.pk).toString("hex"); - return false; -} - -const plugin: Plugin = { - name: "registry", - async plugin(_api: PluginAPI): Promise { - api = _api; - setup(); - - if (api.pluginConfig.bool("store.memory")) { - memStore = new NodeCache(); - } - - events.on("create", async (message: Message, origin: Buffer) => { - let newEntry = entryFromMessage(message); - if (!newEntry.signature?.length) { - return; + return store.set(pubkeyHex, entry); + }; + const setAndRespond = async (entry: SignedRegistryEntry, set = true) => { + let ret = true; + if (set) { + ret = await setEntry(newEntry); } - if (!verifyEntry(newEntry)) { - return; - } - if (newEntry.data.length > 48) { - return; - } - let entry = (await getEntry(newEntry.pk)) as SignedRegistryEntry; - - async function setAndRespond(entry: SignedRegistryEntry, set = true) { - let ret = true; - if (set) { - ret = await setEntry(newEntry); - } - if (ret) { - sendDirectOrBroadcast( - Message.create({ - type: MessageType.CREATED, - pubkey: entry.pk, - revision: entry.revision, - signature: entry.signature, - data: entry.data, - }), - origin - ); - } - } - - if (entry) { - if (newEntry.revision <= entry.revision) { - setAndRespond(newEntry); - return; - } - setAndRespond(entry, false); - return; - } - setAndRespond(newEntry); - }); - - events.on("query", async (query: Query, origin: Buffer) => { - let entry = (await getEntry(query.pubkey)) as SignedRegistryEntry; - - if (entry) { + if (ret) { sendDirectOrBroadcast( Message.create({ - type: MessageType.RESPONSE, + type: MessageType.CREATED, pubkey: entry.pk, revision: entry.revision, signature: entry.signature, @@ -152,8 +99,67 @@ const plugin: Plugin = { }), origin ); + api.logger.info("added entry %s", b4a.from(entry.pk).toString("hex")); } - }); + }; + + let newEntry = entryFromMessage(message); + if (!newEntry.signature?.length) { + return; + } + if (!verifyEntry(newEntry)) { + return; + } + if (newEntry.data.length > 48) { + return; + } + let entry = (await getEntry(newEntry.pk)) as SignedRegistryEntry; + + if (entry) { + if (newEntry.revision <= entry.revision) { + setAndRespond(newEntry); + return; + } + setAndRespond(entry, false); + return; + } + setAndRespond(newEntry); + } +} + +async function handleQuery(query: Query, origin: Buffer) { + let entry = (await getEntry(query.pubkey)) as SignedRegistryEntry; + + if (entry) { + sendDirectOrBroadcast( + Message.create({ + type: MessageType.RESPONSE, + pubkey: entry.pk, + revision: entry.revision, + signature: entry.signature, + data: entry.data, + }), + origin + ); + } +} + +const plugin: Plugin = { + name: "registry", + async plugin(_api: PluginAPI): Promise { + api = _api; + + const storageType = api.pluginConfig.str("store.type"); + const storageOptions = api.pluginConfig.str("store.type", {}); + if (!hasStorage(storageType)) { + api.logger.error("Storage type %s does not exist", storageType); + return; + } + + store = getStorage(storageType, { ...storageOptions, api }); + + setup(); + setupEventHandlers(); }, }; diff --git a/src/storage/backends/lmdb.ts b/src/storage/backends/lmdb.ts new file mode 100644 index 0000000..abfcf1d --- /dev/null +++ b/src/storage/backends/lmdb.ts @@ -0,0 +1,25 @@ +import { SignedRegistryEntry, RegistryStorage } from "../../types.js"; +import { open, RootDatabase } from "lmdb"; +import { PluginAPI } from "@lumeweb/relay-types"; +import path from "node:path"; + +export default class Lmdb implements RegistryStorage { + private _database: RootDatabase; + constructor(opts: { api: PluginAPI }) { + const config = opts.api.config.str("configdir"); + this._database = open({ + path: path.join(path.dirname(config), "data", "registry"), + sharedStructuresKey: Symbol.for("structures"), + }); + } + + async get(key: string): Promise { + const ret = await this._database.get(key); + + return ret ?? false; + } + + async set(key: string, value: SignedRegistryEntry): Promise { + return await this._database.put(key, value); + } +} diff --git a/src/storage/backends/memory.ts b/src/storage/backends/memory.ts new file mode 100644 index 0000000..6596d3e --- /dev/null +++ b/src/storage/backends/memory.ts @@ -0,0 +1,15 @@ +import { SignedRegistryEntry, RegistryStorage } from "../../types.js"; +import NodeCache from "node-cache"; + +export default class Memory implements RegistryStorage { + private _storage = new NodeCache(); + async get(key: string): Promise { + const ret = await this._storage.get(key); + + return ret ?? false; + } + + async set(key: string, value: SignedRegistryEntry): Promise { + return await this._storage.set(key, value); + } +} diff --git a/src/storage/factory.ts b/src/storage/factory.ts new file mode 100644 index 0000000..e01fc3e --- /dev/null +++ b/src/storage/factory.ts @@ -0,0 +1,18 @@ +import { RegistryStorage, RegistryStorageConstructor } from "../types.js"; + +const backends = new Map(); + +export function register( + type: string, + instance: new (opts?: any) => RegistryStorage +): void { + backends.set(type, instance); +} + +export function get(type: string, opts = {}): RegistryStorage { + const ctor = backends.get(type); + return new ctor(opts); +} +export function has(type: string): boolean { + return backends.has(type); +} diff --git a/src/storage/index.ts b/src/storage/index.ts new file mode 100644 index 0000000..e963836 --- /dev/null +++ b/src/storage/index.ts @@ -0,0 +1,8 @@ +import { register } from "./factory.js"; +import Memory from "./backends/memory.js"; +import Lmdb from "./backends/lmdb.js"; + +register("memory", Memory); +register("lmdb", Lmdb); + +export { get as getStorage, has as hasStorage } from "./factory.js"; diff --git a/src/types.ts b/src/types.ts index 6d8e3a1..b559e92 100644 --- a/src/types.ts +++ b/src/types.ts @@ -10,3 +10,10 @@ export interface SignedRegistryEntry { // signature of this registry entry signature?: Uint8Array; } + +export type RegistryStorageConstructor = new (opts?: any) => RegistryStorage; + +export interface RegistryStorage { + get(key: string): Promise; + set(key: string, value: SignedRegistryEntry): Promise; +}