From d416966760cb5008020dc53e4128c3017b336c26 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 30 Dec 2022 04:40:14 -0500 Subject: [PATCH] *Initial version with memory only store --- LICENSE | 2 +- messages.proto | 19 +++++ package.json | 24 ++++++ pkg/load-registry.json | 5 ++ rollup.config.js | 13 +++ src/index.ts | 150 ++++++++++++++++++++++++++++++++ src/messages.ts | 190 +++++++++++++++++++++++++++++++++++++++++ src/types.ts | 12 +++ src/utils.ts | 33 +++++++ tsconfig.json | 13 +++ 10 files changed, 460 insertions(+), 1 deletion(-) create mode 100644 messages.proto create mode 100644 package.json create mode 100644 pkg/load-registry.json create mode 100644 rollup.config.js create mode 100644 src/index.ts create mode 100644 src/messages.ts create mode 100644 src/types.ts create mode 100644 src/utils.ts create mode 100644 tsconfig.json diff --git a/LICENSE b/LICENSE index 2071b23..4fad34f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) +Copyright (c) 2022 Hammer Technologies LLC Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/messages.proto b/messages.proto new file mode 100644 index 0000000..13f34a7 --- /dev/null +++ b/messages.proto @@ -0,0 +1,19 @@ +syntax = "proto2"; + +message Query { + required bytes pubkey = 1; +} + +enum MessageType { + CREATE = 1; + CREATED = 2; + RESPONSE = 3; +} + +message Message { + required MessageType type = 1; + required bytes pubkey = 2; + required uint32 revision = 3; + required bytes data = 4; + required bytes signature = 5; +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..0805b70 --- /dev/null +++ b/package.json @@ -0,0 +1,24 @@ +{ + "name": "@lumeweb/relay-plugin-registry", + "type": "module", + "version": "0.1.0", + "scripts": { + "build": "rollup -c rollup.config.js" + }, + "dependencies": { + "@lumeweb/dht-flood": "https://git.lumeweb.com/LumeWeb/dht-flood.git", + "@lumeweb/relay-plugin-rollup-preset": "https://git.lumeweb.com/LumeWeb/relay-plugin-rollup-preset.git", + "@lumeweb/relay-types": "https://git.lumeweb.com/LumeWeb/relay-types.git", + "@noble/ed25519": "^1.7.1", + "b4a": "^1.6.1", + "node-cache": "^5.1.2", + "object-merger": "^1.0.3" + }, + "devDependencies": { + "@lumeweb/cfg": "https://git.lumeweb.com/LumeWeb/cfg.git", + "@protobuf-ts/plugin": "^2.8.2", + "@protobuf-ts/runtime": "^2.8.2", + "@types/node": "^18.11.18", + "prettier": "^2.8.1" + } +} diff --git a/pkg/load-registry.json b/pkg/load-registry.json new file mode 100644 index 0000000..c7074a3 --- /dev/null +++ b/pkg/load-registry.json @@ -0,0 +1,5 @@ +{ + "plugins": [ + "registry" + ] +} diff --git a/rollup.config.js b/rollup.config.js new file mode 100644 index 0000000..5746c1c --- /dev/null +++ b/rollup.config.js @@ -0,0 +1,13 @@ +import { defineConfig } from "rollup"; +import preset from "@lumeweb/relay-plugin-rollup-preset"; +import merger from "object-merger"; + +export default defineConfig( + merger(preset(), { + input: "src/index.ts", + output: { + file: "dist/registry.js", + format: "cjs", + }, + }) +); diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..f9be057 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,150 @@ +import type { Plugin, PluginAPI } from "@lumeweb/relay-types"; +import DHTFlood from "@lumeweb/dht-flood"; +import { Message, Query, MessageType } from "./messages.js"; +import EventEmitter from "events"; +import NodeCache from "node-cache"; +import b4a from "b4a"; +import { SignedRegistryEntry } from "./types.js"; +import { verifyEntry } from "./utils.js"; + +const PROTOCOL = "lumeweb.registry"; + +const events = new EventEmitter(); +let messenger: DHTFlood; +let api: PluginAPI; +let memStore: NodeCache; + +function setup() { + messenger = new DHTFlood({ + swarm: api.swarm, + protocol: PROTOCOL, + id: api.identity.publicKeyRaw as Buffer, + }); + + messenger.on("message", (data: Buffer, origin: Buffer) => { + try { + let response = Message.fromBinary(data); + switch (response.type) { + case MessageType.CREATE: + events.emit("create", response, origin); + break; + case MessageType.CREATED: + events.emit("created", response, origin); + break; + case MessageType.RESPONSE: + events.emit("response", response, origin); + break; + } + return; + } catch {} + + try { + let query = Query.fromBinary(data); + events.emit("response", query, origin); + } catch {} + }); +} + +function entryFromMessage(message: Message): SignedRegistryEntry { + return { + pk: message.pubkey, + data: message.data, + revision: message.revision, + signature: message.signature, + }; +} + +function sendDirectOrBroadcast(message: Message, pubkey: Buffer) { + let peer = api.swarm._allConnections.get(pubkey); + let data = Message.toBinary(message); + if (peer) { + messenger.send(peer, data, 0); + return; + } + + messenger.broadcast(data); +} + +async function getEntry( + pubkey: Uint8Array +): Promise { + let pubkeyHex = b4a.from(pubkey).toString("hex"); + if (memStore) { + return await memStore.get(pubkeyHex); + } + + return false; +} + +async function setEntry(entry: SignedRegistryEntry): Promise { + let pubkeyHex = b4a.from(entry.pk).toString("hex"); + if (memStore) { + return await memStore.set(pubkeyHex, entry); + } + + return false; +} + +const plugin: Plugin = { + name: "registry", + async plugin(_api: PluginAPI): Promise { + api = _api; + setup(); + + if ( + api.pluginConfig.bool("storememory") && + !api.pluginConfig.bool("store") + ) { + memStore = new NodeCache(); + } + + events.on("create", async (message: Message, origin: Buffer) => { + let newEntry = entryFromMessage(message); + if (!verifyEntry(newEntry)) { + return; + } + let entry = (await getEntry(newEntry.pk)) as SignedRegistryEntry; + + async function setAndRespond() { + await setEntry(newEntry); + sendDirectOrBroadcast( + Message.create({ + type: MessageType.CREATED, + pubkey: newEntry.pk, + revision: newEntry.revision, + signature: newEntry.signature, + data: newEntry.data, + }), + origin + ); + } + + if (entry) { + if (newEntry.revision <= entry.revision) { + setAndRespond(); + } + } else { + setAndRespond(); + } + }); + + events.on("query", async (query: Query, origin: Buffer) => { + let entry = (await getEntry(query.pubkey)) as SignedRegistryEntry; + + if (entry) { + sendDirectOrBroadcast( + Message.create({ + type: MessageType.RESPONSE, + pubkey: entry.pk, + revision: entry.revision, + signature: entry.signature, + data: entry.data, + }), + origin + ); + } + }); + }, +}; + +export default plugin; diff --git a/src/messages.ts b/src/messages.ts new file mode 100644 index 0000000..4391239 --- /dev/null +++ b/src/messages.ts @@ -0,0 +1,190 @@ +// @generated by protobuf-ts 2.8.2 +// @generated from protobuf file "messages.proto" (syntax proto2) +// tslint:disable +import type { BinaryWriteOptions } from "@protobuf-ts/runtime"; +import type { IBinaryWriter } from "@protobuf-ts/runtime"; +import { WireType } from "@protobuf-ts/runtime"; +import type { BinaryReadOptions } from "@protobuf-ts/runtime"; +import type { IBinaryReader } from "@protobuf-ts/runtime"; +import { UnknownFieldHandler } from "@protobuf-ts/runtime"; +import type { PartialMessage } from "@protobuf-ts/runtime"; +import { reflectionMergePartial } from "@protobuf-ts/runtime"; +import { MESSAGE_TYPE } from "@protobuf-ts/runtime"; +import { MessageType as MessageType$ } from "@protobuf-ts/runtime"; +/** + * @generated from protobuf message Query + */ +export interface Query { + /** + * @generated from protobuf field: bytes pubkey = 1; + */ + pubkey: Uint8Array; +} +/** + * @generated from protobuf message Message + */ +export interface Message { + /** + * @generated from protobuf field: MessageType type = 1; + */ + type: MessageType; + /** + * @generated from protobuf field: bytes pubkey = 2; + */ + pubkey: Uint8Array; + /** + * @generated from protobuf field: uint32 revision = 3; + */ + revision: number; + /** + * @generated from protobuf field: bytes data = 4; + */ + data: Uint8Array; + /** + * @generated from protobuf field: bytes signature = 5; + */ + signature: Uint8Array; +} +/** + * @generated from protobuf enum MessageType + */ +export enum MessageType { + /** + * @generated synthetic value - protobuf-ts requires all enums to have a 0 value + */ + UNSPECIFIED$ = 0, + /** + * @generated from protobuf enum value: CREATE = 1; + */ + CREATE = 1, + /** + * @generated from protobuf enum value: CREATED = 2; + */ + CREATED = 2, + /** + * @generated from protobuf enum value: RESPONSE = 3; + */ + RESPONSE = 3 +} +// @generated message type with reflection information, may provide speed optimized methods +class Query$Type extends MessageType$ { + constructor() { + super("Query", [ + { no: 1, name: "pubkey", kind: "scalar", T: 12 /*ScalarType.BYTES*/ } + ]); + } + create(value?: PartialMessage): Query { + const message = { pubkey: new Uint8Array(0) }; + globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this }); + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: Query): Query { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* bytes pubkey */ 1: + message.pubkey = reader.bytes(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: Query, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* bytes pubkey = 1; */ + if (message.pubkey.length) + writer.tag(1, WireType.LengthDelimited).bytes(message.pubkey); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message Query + */ +export const Query = new Query$Type(); +// @generated message type with reflection information, may provide speed optimized methods +class Message$Type extends MessageType$ { + constructor() { + super("Message", [ + { no: 1, name: "type", kind: "enum", T: () => ["MessageType", MessageType] }, + { no: 2, name: "pubkey", kind: "scalar", T: 12 /*ScalarType.BYTES*/ }, + { no: 3, name: "revision", kind: "scalar", T: 13 /*ScalarType.UINT32*/ }, + { no: 4, name: "data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ }, + { no: 5, name: "signature", kind: "scalar", T: 12 /*ScalarType.BYTES*/ } + ]); + } + create(value?: PartialMessage): Message { + const message = { type: 0, pubkey: new Uint8Array(0), revision: 0, data: new Uint8Array(0), signature: new Uint8Array(0) }; + globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this }); + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: Message): Message { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* MessageType type */ 1: + message.type = reader.int32(); + break; + case /* bytes pubkey */ 2: + message.pubkey = reader.bytes(); + break; + case /* uint32 revision */ 3: + message.revision = reader.uint32(); + break; + case /* bytes data */ 4: + message.data = reader.bytes(); + break; + case /* bytes signature */ 5: + message.signature = reader.bytes(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: Message, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* MessageType type = 1; */ + if (message.type !== 0) + writer.tag(1, WireType.Varint).int32(message.type); + /* bytes pubkey = 2; */ + if (message.pubkey.length) + writer.tag(2, WireType.LengthDelimited).bytes(message.pubkey); + /* uint32 revision = 3; */ + if (message.revision !== 0) + writer.tag(3, WireType.Varint).uint32(message.revision); + /* bytes data = 4; */ + if (message.data.length) + writer.tag(4, WireType.LengthDelimited).bytes(message.data); + /* bytes signature = 5; */ + if (message.signature.length) + writer.tag(5, WireType.LengthDelimited).bytes(message.signature); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message Message + */ +export const Message = new Message$Type(); diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..6d8e3a1 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,12 @@ +export interface SignedRegistryEntry { + pk: Uint8Array; + + // revision number of this entry, maximum is (256^8)-1 + revision: number; + + // data stored in this entry, can have a maximum length of 48 bytes + data: Uint8Array; + + // signature of this registry entry + signature?: Uint8Array; +} diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..f5a3c4c --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,33 @@ +import { SignedRegistryEntry } from "./types.js"; +import * as ed from "@noble/ed25519"; +import b4a from "b4a"; + +export function verifyEntry(entry: SignedRegistryEntry) { + return ed.sync.verify(entry.signature, createSignatureData(entry), entry.pk); +} + +export function signEntry( + entry: SignedRegistryEntry, + privateKey: Uint8Array +): Uint8Array { + return ed.sync.sign(createSignatureData(entry), privateKey); +} + +export function createSignatureData(entry: SignedRegistryEntry): Uint8Array { + return b4a.concat([ + encodeEndian(entry.revision, 8), + entry.data.length, + entry.data, + ]); +} + +export function encodeEndian(value: number, length: number) { + let res = new Uint8Array(length); + + for (let i = 0; i < length; i++) { + res[i] = value & 0xff; + value = value >> 8; + } + + return res; +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..09c3093 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "module": "esnext", + "target": "esnext", + "esModuleInterop": true, + "outDir": "dist", + "moduleResolution": "node" + }, + "include": ["src"], + "exclude": [ + "node_modules" + ] +}