2023-08-30 18:37:51 +00:00
|
|
|
import { Logger, Peer, S5Config } from "#types.js";
|
|
|
|
import { AbstractLevel, AbstractSublevel } from "abstract-level";
|
|
|
|
import {
|
|
|
|
mkeyEd25519,
|
|
|
|
protocolMethodRegistryQuery,
|
|
|
|
recordTypeRegistryEntry,
|
|
|
|
registryMaxDataSize,
|
|
|
|
} from "#constants.js";
|
|
|
|
import { Multihash } from "#multihash.js";
|
|
|
|
import { base64url } from "multiformats/bases/base64";
|
|
|
|
import { decodeEndian, encodeEndian } from "#util.js";
|
|
|
|
import { ed25519 } from "@noble/curves/ed25519";
|
|
|
|
import Packer from "#serialization/pack.js";
|
|
|
|
import { Buffer } from "buffer";
|
|
|
|
import { EventEmitter } from "events";
|
|
|
|
import KeyPairEd25519 from "#ed25519.js";
|
2023-08-31 07:29:28 +00:00
|
|
|
import { S5Node, stringifyBytes } from "#node.js";
|
2023-08-30 18:37:51 +00:00
|
|
|
|
|
|
|
interface SignedRegistryEntry {
|
|
|
|
pk: Uint8Array; // public key with multicodec prefix
|
|
|
|
revision: number; // revision number of this entry, maximum is (256^8)-1
|
|
|
|
data: Uint8Array; // data stored in this entry, can have a maximum length of 48 bytes
|
|
|
|
signature: Uint8Array; // signature of this registry entry
|
|
|
|
}
|
|
|
|
|
|
|
|
export class RegistryService {
|
|
|
|
private db?: AbstractSublevel<
|
|
|
|
AbstractLevel<Uint8Array, string, Uint8Array>,
|
|
|
|
Uint8Array,
|
|
|
|
string,
|
|
|
|
Uint8Array
|
|
|
|
>;
|
2023-08-31 07:29:28 +00:00
|
|
|
private node: S5Node;
|
2023-08-30 18:37:51 +00:00
|
|
|
private logger: Logger;
|
|
|
|
private streams: Map<string, EventEmitter> = new Map<string, EventEmitter>();
|
|
|
|
|
2023-08-31 07:29:28 +00:00
|
|
|
constructor(node: S5Node) {
|
|
|
|
this.node = node;
|
|
|
|
this.logger = this.node.logger;
|
2023-08-30 18:37:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async init(): Promise<void> {
|
2023-08-31 07:29:28 +00:00
|
|
|
this.db = this.node.db.sublevel<string, Uint8Array>("s5-registry-db", {});
|
2023-08-30 18:37:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async set(
|
|
|
|
sre: SignedRegistryEntry,
|
|
|
|
trusted: boolean = false,
|
|
|
|
receivedFrom?: Peer,
|
|
|
|
): Promise<void> {
|
|
|
|
this.logger.verbose(
|
|
|
|
`[registry] set ${base64url.encode(sre.pk)} ${sre.revision} (${
|
|
|
|
receivedFrom?.id
|
|
|
|
})`,
|
|
|
|
);
|
|
|
|
|
|
|
|
if (!trusted) {
|
|
|
|
if (sre.pk.length !== 33) {
|
|
|
|
throw new Error("Invalid pubkey");
|
|
|
|
}
|
|
|
|
if (sre.pk[0] !== mkeyEd25519) {
|
|
|
|
throw new Error("Only ed25519 keys are supported");
|
|
|
|
}
|
|
|
|
if (sre.revision < 0 || sre.revision > 281474976710656) {
|
|
|
|
throw new Error("Invalid revision");
|
|
|
|
}
|
|
|
|
if (sre.data.length > registryMaxDataSize) {
|
|
|
|
throw new Error("Data too long");
|
|
|
|
}
|
|
|
|
|
|
|
|
const isValid = verifyRegistryEntry(sre);
|
|
|
|
if (!isValid) {
|
|
|
|
throw new Error("Invalid signature found");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const existingEntry = await this.getFromDB(sre.pk);
|
|
|
|
|
|
|
|
if (existingEntry) {
|
|
|
|
if (receivedFrom) {
|
|
|
|
if (existingEntry.revision === sre.revision) {
|
|
|
|
return;
|
|
|
|
} else if (existingEntry.revision > sre.revision) {
|
|
|
|
const updateMessage = serializeRegistryEntry(existingEntry);
|
|
|
|
receivedFrom.sendMessage(updateMessage);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (existingEntry.revision >= sre.revision) {
|
|
|
|
throw new Error("Revision number too low");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const key = new Multihash(sre.pk);
|
|
|
|
this.streams.get(key.toString())?.emit("event", sre);
|
|
|
|
|
|
|
|
this.db?.put(stringifyBytes(sre.pk), serializeRegistryEntry(sre));
|
|
|
|
|
|
|
|
this.broadcastEntry(sre, receivedFrom);
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: Clean this table after some time
|
|
|
|
// TODO: If there are more than X peers, only broadcast to subscribed nodes (routing table) and shard-nodes (256)
|
|
|
|
broadcastEntry(sre: SignedRegistryEntry, receivedFrom?: Peer): void {
|
|
|
|
this.logger.verbose("[registry] broadcastEntry");
|
|
|
|
const updateMessage = serializeRegistryEntry(sre);
|
|
|
|
|
2023-08-31 07:29:28 +00:00
|
|
|
for (const p of Object.values(this.node.services.p2p.peers)) {
|
2023-08-30 18:37:51 +00:00
|
|
|
if (receivedFrom == null || p.id !== receivedFrom.id) {
|
|
|
|
p.sendMessage(updateMessage);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
sendRegistryRequest(pk: Uint8Array): void {
|
|
|
|
const p = new Packer();
|
|
|
|
|
|
|
|
p.packInt(protocolMethodRegistryQuery);
|
|
|
|
p.packBinary(Buffer.from(pk));
|
|
|
|
|
|
|
|
const req = p.takeBytes();
|
|
|
|
|
|
|
|
// TODO: Use shard system if there are more than X peers
|
2023-08-31 07:29:28 +00:00
|
|
|
for (const peer of Object.values(this.node.services.p2p.peers)) {
|
2023-08-30 18:37:51 +00:00
|
|
|
peer.sendMessage(req);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async get(pk: Uint8Array): Promise<SignedRegistryEntry | null> {
|
|
|
|
const key = new Multihash(pk);
|
|
|
|
if (this.streams.has(key.toString())) {
|
|
|
|
this.logger.verbose(`[registry] get (subbed) ${key}`);
|
|
|
|
let res = await this.getFromDB(pk);
|
|
|
|
if (res !== null) {
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
this.sendRegistryRequest(pk);
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
|
|
|
return this.getFromDB(pk);
|
|
|
|
} else {
|
|
|
|
this.sendRegistryRequest(pk);
|
|
|
|
this.streams.set(key.toString(), new EventEmitter());
|
|
|
|
let res = await this.getFromDB(pk);
|
|
|
|
if (res === null) {
|
|
|
|
this.logger.verbose(`[registry] get (clean) ${key}`);
|
|
|
|
for (let i = 0; i < 200; i++) {
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
|
|
if ((await this.getFromDB(pk)) !== null) break;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
this.logger.verbose(`[registry] get (cached) ${key}`);
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
|
|
|
}
|
|
|
|
return this.getFromDB(pk);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private async setEntryHelper(
|
|
|
|
keyPair: KeyPairEd25519,
|
|
|
|
data: Uint8Array,
|
|
|
|
): Promise<void> {
|
|
|
|
const revision = Math.round(Date.now() / 1000);
|
|
|
|
|
|
|
|
const sre: SignedRegistryEntry = await this.signRegistryEntry({
|
|
|
|
kp: keyPair,
|
|
|
|
data,
|
|
|
|
revision,
|
|
|
|
});
|
|
|
|
|
|
|
|
await this.set(sre);
|
|
|
|
}
|
|
|
|
|
|
|
|
async signRegistryEntry({
|
|
|
|
kp,
|
|
|
|
data,
|
|
|
|
revision,
|
|
|
|
}: {
|
|
|
|
kp: KeyPairEd25519;
|
|
|
|
data: Uint8Array;
|
|
|
|
revision: number;
|
|
|
|
}): Promise<SignedRegistryEntry> {
|
|
|
|
const list = new Uint8Array([
|
|
|
|
recordTypeRegistryEntry,
|
|
|
|
...encodeEndian(revision, 8),
|
|
|
|
data.length,
|
|
|
|
...data,
|
|
|
|
]);
|
|
|
|
|
|
|
|
const signature = ed25519.sign(list, kp.extractBytes());
|
|
|
|
|
|
|
|
return {
|
|
|
|
pk: kp.publicKey,
|
|
|
|
revision,
|
|
|
|
data,
|
|
|
|
signature: new Uint8Array(signature),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
async getFromDB(pk: Uint8Array): Promise<SignedRegistryEntry | null> {
|
|
|
|
const val = await this.db?.get(stringifyBytes(pk));
|
|
|
|
|
|
|
|
if (val) {
|
2023-08-31 06:37:06 +00:00
|
|
|
return this.deserializeRegistryEntry(val);
|
2023-08-30 18:37:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return null;
|
|
|
|
}
|
2023-08-31 06:37:06 +00:00
|
|
|
|
|
|
|
public deserializeRegistryEntry(event: Uint8Array): SignedRegistryEntry {
|
|
|
|
const dataLength = event[42];
|
|
|
|
return {
|
|
|
|
pk: event.slice(1, 34),
|
|
|
|
revision: decodeEndian(event.slice(34, 42)),
|
|
|
|
data: event.slice(43, 43 + dataLength),
|
|
|
|
signature: event.slice(43 + dataLength),
|
|
|
|
};
|
|
|
|
}
|
2023-08-30 18:37:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
function verifyRegistryEntry(sre: SignedRegistryEntry): boolean {
|
|
|
|
const list: Uint8Array = Uint8Array.from([
|
|
|
|
recordTypeRegistryEntry,
|
|
|
|
...encodeEndian(sre.revision, 8),
|
|
|
|
sre.data.length, // 1 byte
|
|
|
|
...sre.data,
|
|
|
|
]);
|
|
|
|
|
|
|
|
return ed25519.verify(list, sre.signature, sre.pk.slice(1));
|
|
|
|
}
|
|
|
|
function serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array {
|
|
|
|
return Uint8Array.from([
|
|
|
|
recordTypeRegistryEntry,
|
|
|
|
...sre.pk,
|
|
|
|
...encodeEndian(sre.revision, 8),
|
|
|
|
sre.data.length,
|
|
|
|
...sre.data,
|
|
|
|
...sre.signature,
|
|
|
|
]);
|
|
|
|
}
|