Compare commits

...

23 Commits

Author SHA1 Message Date
semantic-release-bot 3be3ae4de5 chore(release): 0.1.0-develop.1 [skip ci]
# [0.1.0-develop.1](https://git.lumeweb.com/LumeWeb/libs5/compare/v0.0.1...v0.1.0-develop.1) (2023-08-31)

### Bug Fixes

* _newBuf needs to reset offset to 0 ([31e63f6](31e63f6c63))
* add registry to services object in interface ([22e486e](22e486ea18))
* add some exports ([5a1dca9](5a1dca9775))
* check protocol with colon ([a4b692b](a4b692b28f))
* do a truthy check on networkId ([68b7ffa](68b7ffa855))
* fix retry logic ([1221d7d](1221d7de63))
* import websocket ([132f43c](132f43c34b))
* need a getter for id ([fbffb1d](fbffb1da72))
* pass the event data, not the event itself ([42cd101](42cd101fb2))
* remove unneeded getPublicKey call ([ae40d52](ae40d52f9e))
* unsupported url needs to be a real but dummy one ([17ff5fd](17ff5fd96b))
* update import ([91a15bd](91a15bd428))
* update level imports ([68e6c3a](68e6c3a682))
* we dont need to strip out auth ([a021243](a021243c89))

### Features

* initial version ([2f2ae2f](2f2ae2f4fc))
2023-08-31 06:44:42 +00:00
Derrick Hammer 03e7d9ba04
ci: setup 2023-08-31 02:43:14 -04:00
Derrick Hammer 5a1dca9775
fix: add some exports 2023-08-31 02:37:32 -04:00
Derrick Hammer 05cbd60373
refactor: make deserializeRegistryEntry a method 2023-08-31 02:37:06 -04:00
Derrick Hammer 1221d7de63
fix: fix retry logic 2023-08-31 02:36:32 -04:00
Derrick Hammer a021243c89
fix: we dont need to strip out auth 2023-08-31 02:35:29 -04:00
Derrick Hammer a4b692b28f
fix: check protocol with colon 2023-08-31 02:34:59 -04:00
Derrick Hammer 17ff5fd96b
fix: unsupported url needs to be a real but dummy one 2023-08-31 02:34:13 -04:00
Derrick Hammer 2ef91a4d9c
refactor: need to use a query chain access 2023-08-31 02:32:59 -04:00
Derrick Hammer 91034708bc
refactor: reconnectDelay and _peers need to store by the string version of peer id 2023-08-31 02:32:04 -04:00
Derrick Hammer 56bb5007f6
refactor: add registry message handling back in 2023-08-31 02:30:19 -04:00
Derrick Hammer 68b7ffa855
fix: do a truthy check on networkId 2023-08-31 02:29:32 -04:00
Derrick Hammer 68e6c3a682
fix: update level imports 2023-08-31 02:29:09 -04:00
Derrick Hammer 91a15bd428
fix: update import 2023-08-31 02:28:33 -04:00
Derrick Hammer ae40d52f9e
fix: remove unneeded getPublicKey call 2023-08-31 02:28:24 -04:00
Derrick Hammer 22e486ea18
fix: add registry to services object in interface 2023-08-31 02:28:01 -04:00
Derrick Hammer eaf35bcd2e
refactor: update db store structure to create sublevel inside p2p service and not pass it to it 2023-08-31 02:27:29 -04:00
Derrick Hammer 42cd101fb2
fix: pass the event data, not the event itself 2023-08-31 02:25:52 -04:00
Derrick Hammer 132f43c34b
fix: import websocket 2023-08-31 02:25:37 -04:00
Derrick Hammer fbffb1da72
fix: need a getter for id 2023-08-31 02:25:27 -04:00
Derrick Hammer 31e63f6c63
fix: _newBuf needs to reset offset to 0 2023-08-31 02:23:49 -04:00
Derrick Hammer b1f4ab93d8
dep: add ws 2023-08-31 02:23:27 -04:00
Derrick Hammer 2f2ae2f4fc
feat: initial version 2023-08-30 14:38:53 -04:00
23 changed files with 21418 additions and 1 deletions

13
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,13 @@
name: Build/Publish
on:
push:
branches:
- master
- develop
- develop-*
jobs:
main:
uses: lumeweb/github-node-deploy-workflow/.github/workflows/main.yml@master
secrets: inherit

5
.presetterrc.json Normal file
View File

@ -0,0 +1,5 @@
{
"preset": [
"@lumeweb/node-library-preset"
]
}

24
CHANGELOG.md Normal file
View File

@ -0,0 +1,24 @@
# [0.1.0-develop.1](https://git.lumeweb.com/LumeWeb/libs5/compare/v0.0.1...v0.1.0-develop.1) (2023-08-31)
### Bug Fixes
* _newBuf needs to reset offset to 0 ([31e63f6](https://git.lumeweb.com/LumeWeb/libs5/commit/31e63f6c63d785ba3456709bcea91760e8a6c00b))
* add registry to services object in interface ([22e486e](https://git.lumeweb.com/LumeWeb/libs5/commit/22e486ea18bed2255fc3a32eb062a8eb07fef3fc))
* add some exports ([5a1dca9](https://git.lumeweb.com/LumeWeb/libs5/commit/5a1dca97756d57a469184ca54ca3d4617e3369fc))
* check protocol with colon ([a4b692b](https://git.lumeweb.com/LumeWeb/libs5/commit/a4b692b28f2bd62d3546d666c4318e20a89e049a))
* do a truthy check on networkId ([68b7ffa](https://git.lumeweb.com/LumeWeb/libs5/commit/68b7ffa855dd4ae3178b4472837d533b5145b5bb))
* fix retry logic ([1221d7d](https://git.lumeweb.com/LumeWeb/libs5/commit/1221d7de63633f05e10eab5ca092134549199964))
* import websocket ([132f43c](https://git.lumeweb.com/LumeWeb/libs5/commit/132f43c34ba85b82db47f42e7049bfa1938055a1))
* need a getter for id ([fbffb1d](https://git.lumeweb.com/LumeWeb/libs5/commit/fbffb1da72f55567af45420dc54abe230ff8b062))
* pass the event data, not the event itself ([42cd101](https://git.lumeweb.com/LumeWeb/libs5/commit/42cd101fb29a4a1f7451ab6251e9bd89e7a51145))
* remove unneeded getPublicKey call ([ae40d52](https://git.lumeweb.com/LumeWeb/libs5/commit/ae40d52f9e6d44e9dce2ead1c97a0d7b4f772e50))
* unsupported url needs to be a real but dummy one ([17ff5fd](https://git.lumeweb.com/LumeWeb/libs5/commit/17ff5fd96b1a1e773e4b09254181dfe7180e0a7f))
* update import ([91a15bd](https://git.lumeweb.com/LumeWeb/libs5/commit/91a15bd42849e540c56edef20e1508acd8fd32c6))
* update level imports ([68e6c3a](https://git.lumeweb.com/LumeWeb/libs5/commit/68e6c3a682acc6b6541bfc871f80150c11a495b4))
* we dont need to strip out auth ([a021243](https://git.lumeweb.com/LumeWeb/libs5/commit/a021243c8954ddedcd99c8bd32901055dbba49a7))
### Features
* initial version ([2f2ae2f](https://git.lumeweb.com/LumeWeb/libs5/commit/2f2ae2f4fca7174b289d658b387a941e0d6fc120))

View File

@ -1,6 +1,6 @@
MIT License MIT License
Copyright (c) 2023 LumeWeb Copyright (c) 2023 Hammer Technologies LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

View File

@ -1,2 +1,5 @@
# libs5 # libs5
`src/serialization` is licensed under BSD 3-Clause and original author is Nail Gilaziev. It is a port of `messagepack` from dart. See LICENSE at https://github.com/nailgilaziev/messagepack/blob/9fc7d685ac8519c2c02feed604b4de342ef96764/LICENSE.
This library is a port of libs5 and s5 combined from `https://github.com/s5-dev` under MIT license by redsolver. License at https://github.com/s5-dev/lib5/blob/1a4fafbac105a948e5c4ca2befacd2f731d72672/LICENSE

19438
npm-shrinkwrap.json generated Normal file

File diff suppressed because it is too large Load Diff

30
package.json Normal file
View File

@ -0,0 +1,30 @@
{
"name": "@lumeweb/libs5",
"version": "0.1.0-develop.1",
"type": "module",
"repository": {
"type": "git",
"url": "gitea@git.lumeweb.com:LumeWeb/libs5.git"
},
"devDependencies": {
"@lumeweb/node-library-preset": "^0.2.7",
"presetter": "*"
},
"readme": "ERROR: No README data found!",
"scripts": {
"prepare": "presetter bootstrap",
"build": "run build",
"semantic-release": "semantic-release"
},
"dependencies": {
"@noble/curves": "^1.1.0",
"@noble/hashes": "^1.3.1",
"level": "^8.0.0",
"multiformats": "^12.0.1",
"p-defer": "^4.0.0",
"ws": "^8.13.0"
},
"publishConfig": {
"access": "public"
}
}

135
src/constants.ts Normal file
View File

@ -0,0 +1,135 @@
export const cidTypeRaw = 0x26;
export const cidTypeMetadataMedia = 0xc5;
// const cidTypeMetadataFile = 0xc6;
export const cidTypeMetadataWebApp = 0x59;
export const cidTypeResolver = 0x25;
export const cidTypeUserIdentity = 0x77;
export const cidTypeBridge = 0x3a;
// format for dynamic encrypted CID
// type algo key resolver_type mkey_ed255 pubkey
// in entry: encrypt(RAW CID or MEDIA or SOMETHING)
/// Used for immutable encrypted files and metadata formats, key can never be re-used
///
/// Used for file versions in Vup
export const cidTypeEncryptedStatic = 0xae;
/// Used for encrypted files with update support
///
/// can point to resolver CID, Stream CID, Directory Metadata or Media Metadata object
export const cidTypeEncryptedDynamic = 0xad;
export const registryS5CIDByte = 0x5a;
export const registryS5EncryptedByte = 0x5e;
// ! some multicodec bytes
// BLAKE3 with default output size of 256 bits
export const mhashBlake3Default = 0x1f;
export const mkeyEd25519 = 0xed;
export const encryptionAlgorithmXChaCha20Poly1305 = 0xa6;
export const encryptionAlgorithmXChaCha20Poly1305NonceSize = 24;
export const contentPackFileHeader = Uint8Array.from([0x5f, 0x26, 0x73, 0x35]);
// ! metadata files
// used as the first byte of metadata files
export const metadataMagicByte = 0x5f;
// types for metadata files
export const metadataTypeMedia = 0x02;
export const metadataTypeWebApp = 0x03;
export const metadataTypeDirectory = 0x04;
export const metadataTypeProofs = 0x05;
export const metadataTypeUserIdentity = 0x07;
export const parentLinkTypeUserIdentity = 1;
export const parentLinkTypeBoard = 5;
export const parentLinkTypeBridgeUser = 10;
export const registryMaxDataSize = 64;
// ! user identity
export const authPayloadVersion1 = 0x01;
export const userIdentityLinkProfile = 0x00;
export const userIdentityLinkPublicFileSystem = 0x01;
// ! p2p protocol message types
export const protocolMethodHandshakeOpen = 1;
export const protocolMethodHandshakeDone = 2;
export const protocolMethodSignedMessage = 10;
export const protocolMethodHashQuery = 4;
export const protocolMethodAnnouncePeers = 8;
export const protocolMethodRegistryQuery = 13;
export const recordTypeStorageLocation = 0x05; // cache
export const recordTypeRegistryEntry = 0x07; // permanent
export const recordTypeStreamEvent = 0x09; // temporary, delete after time X (like storage locations)
// ! Some optional metadata extensions (same for files, media files and directories)
// List<String>, license identifier from https://spdx.org/licenses/
export const metadataExtensionLicenses = 11;
// List<Uint8List>, multicoded pubkey that references a registry entry that contains donation links and addresses
export const metadataExtensionDonationKeys = 12;
// map string->map, external ids of this object by their wikidata property id.
export const metadataExtensionWikidataClaims = 13;
// List<String>, for example [en, de, de-DE]
export const metadataExtensionLanguages = 14;
// List<String>,
export const metadataExtensionSourceUris = 15;
// Resolver CID, can be used to update this post. can also be used to "delete" a post.
export const metadataExtensionUpdateCID = 16;
// List<CID>, lists previous versions of this post
export const metadataExtensionPreviousVersions = 17;
// unix timestamp in milliseconds
export const metadataExtensionTimestamp = 18;
export const metadataExtensionTags = 19;
export const metadataExtensionCategories = 20;
// video, podcast, book, audio, music, ...
export const metadataExtensionViewTypes = 21;
export const metadataExtensionBasicMediaMetadata = 22;
export const metadataExtensionBridge = 23;
export const metadataExtensionOriginalTimestamp = 24;
// List<Uint8List>
export const metadataExtensionRoutingHints = 25;
// TODO comment to / reply to (use parents)
// TODO mentions (use new extension field)
// TODO Reposts (just link the original item)
// ! media details
export const metadataMediaDetailsDuration = 10;
export const metadataMediaDetailsIsLive = 11;
// ! metadata proofs
export const metadataProofTypeSignature = 1;
export const metadataProofTypeTimestamp = 2;
// ! storage locations
export const storageLocationTypeArchive = 0;
export const storageLocationTypeFile = 3;
export const storageLocationTypeFull = 5;
export const storageLocationTypeBridge = 7;

22
src/ed25519.ts Normal file
View File

@ -0,0 +1,22 @@
import { ed25519 } from "@noble/curves/ed25519";
import { concatBytes } from "@noble/curves/abstract/utils";
import { mkeyEd25519 } from "./constants.js";
export default class KeyPairEd25519 {
private _bytes: Uint8Array;
constructor(bytes: Uint8Array) {
this._bytes = bytes;
}
public get publicKey(): Uint8Array {
return concatBytes(
Uint8Array.from([mkeyEd25519]),
ed25519.getPublicKey(this._bytes),
);
}
public extractBytes(): Uint8Array {
return this._bytes;
}
}

3
src/index.ts Normal file
View File

@ -0,0 +1,3 @@
export * from "./types.js";
export * from "./service/p2p.js";
export * from "./service/registry.js";

55
src/multibase.ts Normal file
View File

@ -0,0 +1,55 @@
import { base58btc } from "multiformats/bases/base58";
import { bytesToHex, hexToBytes, utf8ToBytes } from "@noble/hashes/utils";
import { base32 } from "multiformats/bases/base32";
import { base64, base64url } from "multiformats/bases/base64";
export default abstract class Multibase {
abstract toBytes(): Uint8Array;
static decodeString(data: string): Uint8Array {
let bytes: Uint8Array;
if (data[0] === "z") {
bytes = base58btc.decode(data);
} else if (data[0] === "f") {
bytes = Uint8Array.from(hexToBytes(data.substring(1)));
} else if (data[0] === "b") {
let str = data.substring(1).toUpperCase();
while (str.length % 4 !== 0) {
str += "=";
}
bytes = base32.decode(str);
} else if (data[0] === "u") {
let str = data.substring(1);
while (str.length % 4 !== 0) {
str += "=";
}
bytes = base64url.decode(str);
} else if (data[0] === ":") {
bytes = utf8ToBytes(data);
} else {
throw new Error(`Multibase encoding ${data[0]} not supported`);
}
return bytes;
}
toHex(): string {
return `f${bytesToHex(this.toBytes())}`;
}
toBase32(): string {
return `b${base32.encode(this.toBytes()).replace(/=/g, "").toLowerCase()}`;
}
toBase64Url(): string {
return `u${base64.encode(this.toBytes())}`;
}
toBase58(): string {
return `z${base58btc.encode(this.toBytes())}`;
}
toString(): string {
return this.toBase58();
}
}

58
src/multihash.ts Normal file
View File

@ -0,0 +1,58 @@
import { base64url } from "multiformats/bases/base64";
import { base32 } from "multiformats/bases/base32";
import { equalBytes } from "@noble/curves/abstract/utils";
import { cidTypeBridge } from "#constants.js";
export class Multihash {
fullBytes: Uint8Array;
constructor(fullBytes: Uint8Array) {
this.fullBytes = fullBytes;
}
get functionType(): number {
return this.fullBytes[0];
}
get hashBytes(): Uint8Array {
return this.fullBytes.subarray(1);
}
static fromBase64Url(hash: string): Multihash {
while (hash.length % 4 !== 0) {
hash += "=";
}
const bytes = base64url.decode(hash);
return new Multihash(new Uint8Array(bytes));
}
toBase64Url(): string {
return base64url.encode(this.fullBytes);
}
toBase32(): string {
return base32.encode(this.fullBytes).replace(/=/g, "").toLowerCase();
}
toString(): string {
return this.functionType === cidTypeBridge
? new TextDecoder().decode(this.fullBytes)
: this.toBase64Url();
}
equals(other: any): boolean {
if (!(other instanceof Multihash)) {
return false;
}
return equalBytes(this.fullBytes, other.fullBytes);
}
get hashCode(): number {
return (
this.fullBytes[0] +
this.fullBytes[1] * 256 +
this.fullBytes[2] * 256 * 256 +
this.fullBytes[3] * 256 * 256 * 256
);
}
}

76
src/node.ts Normal file
View File

@ -0,0 +1,76 @@
import { Multihash } from "./multihash.js";
import NodeId from "./nodeId.js";
import { S5Config } from "./types.js";
import Unpacker from "./serialization/unpack.js";
import Packer from "./serialization/pack.js";
import StorageLocation from "./storage.js";
export async function readStorageLocationsFromDB({
hash,
config,
}: {
hash: Multihash;
config: S5Config;
}): Promise<Map<number, Map<NodeId, Map<number, any>>>> {
const map = new Map<number, Map<NodeId, Map<number, any>>>();
const bytes = await config.db.get(stringifyHash(hash));
if (bytes === null) {
return map;
}
const unpacker = Unpacker.fromPacked(bytes);
const mapLength = unpacker.unpackMapLength();
for (let i = 0; i < mapLength; i++) {
const type = unpacker.unpackInt() as number;
const innerMap = new Map<NodeId, Map<number, any>>();
map.set(type, innerMap);
const innerMapLength = unpacker.unpackMapLength();
for (let j = 0; j < innerMapLength; j++) {
const nodeId = new NodeId(unpacker.unpackBinary());
innerMap.set(nodeId, new Map(unpacker.unpackMap() as [number, any][]));
}
}
return map;
}
export async function addStorageLocation({
hash,
nodeId,
location,
message,
config,
}: {
hash: Multihash;
nodeId: NodeId;
location: StorageLocation;
message?: Uint8Array;
config: S5Config;
}) {
const map = this.readStorageLocationsFromDB(hash);
const innerMap =
map.get(location.type) || new Map<NodeId, Map<number, any>>();
map.set(location.type, innerMap);
const locationMap = new Map<number, any>([
[1, location.parts],
// [2, location.binaryParts],
[3, location.expiry],
[4, message],
]);
innerMap.set(nodeId, locationMap);
await config.cacheDb.put(
stringifyHash(hash),
new Packer().pack(map).takeBytes(),
);
}
export function stringifyBytes(data: Uint8Array) {
return String.fromCharCode(...data);
}
function stringifyHash(hash: Multihash) {
return stringifyBytes(hash.fullBytes);
}
export function stringifyNode(node: NodeId) {
return stringifyBytes(node.bytes);
}

38
src/nodeId.ts Normal file
View File

@ -0,0 +1,38 @@
import { base58btc } from "multiformats/bases/base58";
import { equalBytes } from "@noble/curves/abstract/utils";
export default class NodeId {
bytes: Uint8Array;
constructor(bytes: Uint8Array) {
this.bytes = bytes;
}
static decode(nodeId: string): NodeId {
return new NodeId(base58btc.decode(nodeId));
}
equals(other: any): boolean {
if (!(other instanceof NodeId)) {
return false;
}
return equalBytes(this.bytes, other.bytes);
}
get hashCode(): number {
return (
this.bytes[0] +
this.bytes[1] * 256 +
this.bytes[2] * 256 * 256 +
this.bytes[3] * 256 * 256 * 256
);
}
toBase58(): string {
return base58btc.encode(this.bytes);
}
toString(): string {
return this.toBase58();
}
}

93
src/peer/tcp.ts Normal file
View File

@ -0,0 +1,93 @@
import { Logger, Peer } from "../types.js";
import NodeId from "../nodeId.js";
import * as net from "net";
import { URL } from "url";
import { decodeEndian } from "../util.js";
export class TcpPeer implements Peer {
connectionUris: Array<URL>;
isConnected: boolean = false;
challenge: Uint8Array;
private _socket: net.Socket;
constructor(_socket: net.Socket, connectionUris: URL[]) {
this.connectionUris = connectionUris.map((uri) => new URL(uri.toString()));
this.challenge = new Uint8Array();
this._socket = _socket;
}
private _id?: NodeId;
get id(): NodeId {
return this._id as NodeId;
}
set id(value: NodeId) {
this._id = value;
}
sendMessage(message: Uint8Array): void {
this._socket.write(message);
}
renderLocationUri(): string {
return this.connectionUris.length === 0
? (this._socket.remoteAddress as string)
: this.connectionUris[0].toString();
}
listenForMessages(
callback: (event: any) => Promise<void>,
{
onDone,
onError,
logger,
}: { onDone?: any; onError?: (...args: any[]) => void; logger: Logger },
): void {
const listener = (data: Uint8Array) => {
let pos = 0;
while (pos < data.length) {
const lengthBuffer = data.slice(pos, pos + 4);
const length = decodeEndian(lengthBuffer);
if (data.length < pos + 4 + length) {
console.log(`Ignore message, invalid length (from ${this.id})`);
return;
}
try {
const message = data.slice(pos + 4, pos + 4 + length);
callback(message).catch((e) => {
logger.catched(`Error in callback: ${e}`, this.id.toBase58());
});
} catch (e) {
logger.catched(`Caught an exception: ${e}`, this.id.toBase58());
}
pos += length + 4;
}
};
this._socket.on("data", listener);
if (onDone) {
this._socket.on("end", onDone);
}
if (onError) {
this._socket.on("error", onError);
}
}
}
export async function connect(port: number, host: string): Promise<net.Socket> {
return new Promise((resolve, reject) => {
const socket = net.connect(port, host, () => {
resolve(socket);
});
socket.on("error", (err) => {
reject(err);
});
});
}

70
src/peer/webSocket.ts Normal file
View File

@ -0,0 +1,70 @@
import { Logger, Peer } from "../types.js";
import NodeId from "../nodeId.js";
import { URL } from "url";
import { WebSocket } from "ws";
export class WebSocketPeer implements Peer {
connectionUris: Array<URL>;
isConnected: boolean = false;
challenge: Uint8Array;
private _socket: WebSocket;
constructor(_socket: WebSocket, connectionUris: URL[]) {
this.connectionUris = connectionUris.map((uri) => new URL(uri.toString()));
this.challenge = new Uint8Array(); // Initialize as needed
this._socket = _socket;
}
private _id?: NodeId;
get id(): NodeId {
return this._id as NodeId;
}
set id(value: NodeId) {
this._id = value;
}
sendMessage(message: Uint8Array): void {
this._socket.send(message);
}
renderLocationUri(): string {
return "WebSocket client";
}
listenForMessages(
callback: (event: any) => Promise<void>,
{
onDone,
onError,
logger,
}: { onDone?: any; onError?: (...args: any[]) => void; logger: Logger },
): void {
this._socket.addEventListener(
"message",
async (event: MessageEvent<any>) => {
await callback(event.data);
},
);
if (onDone) {
this._socket.addEventListener("close", onDone);
}
if (onError) {
this._socket.addEventListener("error", onError);
}
}
}
export async function connect(uri: string): Promise<WebSocket> {
return new Promise((resolve, reject) => {
const socket = new WebSocket(uri);
socket.addEventListener("open", () => {
resolve(socket);
});
socket.addEventListener("error", (err) => {
reject(err);
});
});
}

273
src/serialization/pack.ts Normal file
View File

@ -0,0 +1,273 @@
import NodeId from "../nodeId.js";
export default class Packer {
private _bufSize: number;
// @ts-ignore
private _buf: Buffer;
// @ts-ignore
private _d: DataView;
private _offset: number = 0;
private _builder: Buffer[] = [];
private _strCodec = new TextEncoder(); // UTF-8 TextEncoder
constructor(bufSize: number = 64) {
this._bufSize = bufSize;
this._newBuf(this._bufSize);
}
private _newBuf(size: number) {
this._buf = Buffer.alloc(size);
this._d = new DataView(this._buf.buffer, this._buf.byteOffset);
this._offset = 0;
}
private _nextBuf() {
this._flushBuf();
this._bufSize *= 2;
this._newBuf(this._bufSize);
}
private _flushBuf() {
this._builder.push(this._buf.slice(0, this._offset));
}
private _putBytes(bytes: Buffer | Uint8Array) {
const length = bytes.length;
if (this._buf.length - this._offset < length) {
this._nextBuf();
}
if (this._offset === 0) {
this._builder.push(Buffer.from(bytes));
} else {
this._buf.set(bytes, this._offset);
this._offset += length;
}
}
public packNull() {
if (this._buf.length - this._offset < 1) {
this._nextBuf();
}
this._d.setUint8(this._offset++, 0xc0);
}
public packBool(v: boolean | null) {
if (this._buf.length - this._offset < 1) {
this._nextBuf();
}
if (v === null) {
this._d.setUint8(this._offset++, 0xc0);
} else {
this._d.setUint8(this._offset++, v ? 0xc3 : 0xc2);
}
}
public packInt(v: number | null) {
if (this._buf.length - this._offset < 9) {
this._nextBuf();
}
if (v === null) {
this._d.setUint8(this._offset++, 0xc0);
} else if (v >= 0) {
if (v <= 127) {
this._d.setUint8(this._offset++, v);
} else if (v <= 0xff) {
this._d.setUint8(this._offset++, 0xcc);
this._d.setUint8(this._offset++, v);
} else if (v <= 0xffff) {
this._d.setUint8(this._offset++, 0xcd);
this._d.setUint16(this._offset, v);
this._offset += 2;
} else if (v <= 0xffffffff) {
this._d.setUint8(this._offset++, 0xce);
this._d.setUint32(this._offset, v);
this._offset += 4;
} else {
this._d.setUint8(this._offset++, 0xcf);
this._d.setBigUint64(this._offset, BigInt(v));
this._offset += 8;
}
} else {
if (v >= -32) {
this._d.setInt8(this._offset++, v);
} else if (v >= -128) {
this._d.setUint8(this._offset++, 0xd0);
this._d.setInt8(this._offset++, v);
} else if (v >= -32768) {
this._d.setUint8(this._offset++, 0xd1);
this._d.setInt16(this._offset, v);
this._offset += 2;
} else if (v >= -2147483648) {
this._d.setUint8(this._offset++, 0xd2);
this._d.setInt32(this._offset, v);
this._offset += 4;
} else {
this._d.setUint8(this._offset++, 0xd3);
this._d.setBigInt64(this._offset, BigInt(v));
this._offset += 8;
}
}
}
public packDouble(v: number | null) {
if (this._buf.length - this._offset < 9) {
this._nextBuf();
}
if (v === null) {
this._d.setUint8(this._offset++, 0xc0);
return;
}
this._d.setUint8(this._offset++, 0xcb);
this._d.setFloat64(this._offset, v);
this._offset += 8;
}
public packString(v: string | null) {
if (this._buf.length - this._offset < 5) {
this._nextBuf();
}
if (v === null) {
this._d.setUint8(this._offset++, 0xc0);
return;
}
const encoded = this._strCodec.encode(v);
const length = encoded.length;
if (length <= 31) {
this._d.setUint8(this._offset++, 0xa0 | length);
} else if (length <= 0xff) {
this._d.setUint8(this._offset++, 0xd9);
this._d.setUint8(this._offset++, length);
} else if (length <= 0xffff) {
this._d.setUint8(this._offset++, 0xda);
this._d.setUint16(this._offset, length);
this._offset += 2;
} else if (length <= 0xffffffff) {
this._d.setUint8(this._offset++, 0xdb);
this._d.setUint32(this._offset, length);
this._offset += 4;
} else {
throw new Error("Max String length is 0xFFFFFFFF");
}
this._putBytes(Buffer.from(encoded));
}
public packStringEmptyIsNull(v: string | null) {
if (v === null || v === "") {
this.packNull();
} else {
this.packString(v);
}
}
public packBinary(buffer: Buffer | null) {
if (this._buf.length - this._offset < 5) {
this._nextBuf();
}
if (buffer === null) {
this._d.setUint8(this._offset++, 0xc0);
return;
}
const length = buffer.length;
if (length <= 0xff) {
this._d.setUint8(this._offset++, 0xc4);
this._d.setUint8(this._offset++, length);
} else if (length <= 0xffff) {
this._d.setUint8(this._offset++, 0xc5);
this._d.setUint16(this._offset, length);
this._offset += 2;
} else if (length <= 0xffffffff) {
this._d.setUint8(this._offset++, 0xc6);
this._d.setUint32(this._offset, length);
this._offset += 4;
} else {
throw new Error("Max binary length is 0xFFFFFFFF");
}
this._putBytes(buffer);
}
public packListLength(length: number | null) {
if (this._buf.length - this._offset < 5) {
this._nextBuf();
}
if (length === null) {
this._d.setUint8(this._offset++, 0xc0);
} else if (length <= 0xf) {
this._d.setUint8(this._offset++, 0x90 | length);
} else if (length <= 0xffff) {
this._d.setUint8(this._offset++, 0xdc);
this._d.setUint16(this._offset, length);
this._offset += 2;
} else if (length <= 0xffffffff) {
this._d.setUint8(this._offset++, 0xdd);
this._d.setUint32(this._offset, length);
this._offset += 4;
} else {
throw new Error("Max list length is 0xFFFFFFFF");
}
}
public packMapLength(length: number | null) {
if (this._buf.length - this._offset < 5) {
this._nextBuf();
}
if (length === null) {
this._d.setUint8(this._offset++, 0xc0);
} else if (length <= 0xf) {
this._d.setUint8(this._offset++, 0x80 | length);
} else if (length <= 0xffff) {
this._d.setUint8(this._offset++, 0xde);
this._d.setUint16(this._offset, length);
this._offset += 2;
} else if (length <= 0xffffffff) {
this._d.setUint8(this._offset++, 0xdf);
this._d.setUint32(this._offset, length);
this._offset += 4;
} else {
throw new Error("Max map length is 0xFFFFFFFF");
}
}
public takeBytes(): Buffer {
if (this._builder.length === 0) {
return this._buf.slice(0, this._offset);
}
this._flushBuf();
return Buffer.concat(this._builder);
}
public pack(v: any) {
if (v === null) {
this.packNull();
} else if (typeof v === "number") {
if (Number.isInteger(v)) {
this.packInt(v);
} else {
this.packDouble(v);
}
} else if (typeof v === "boolean") {
this.packBool(v);
} else if (typeof v === "string") {
this.packString(v);
} else if (v instanceof Uint8Array) {
this.packBinary(Buffer.from(v));
} else if (Array.isArray(v)) {
this.packListLength(v.length);
for (const item of v) {
this.pack(item);
}
} else if (v instanceof Map) {
this.packMapLength(v.size);
for (const [key, value] of v.entries()) {
this.pack(key);
this.pack(value);
}
} else if (v instanceof NodeId) {
this.pack(v.bytes);
} else {
throw new Error(`Could not pack ${v.constructor.name}`);
}
return this;
}
}

194
src/serialization/unpack.ts Normal file
View File

@ -0,0 +1,194 @@
export default class Unpacker {
private _list: Buffer;
private _offset: number = 0;
private _d: DataView;
constructor(list: Buffer) {
this._list = list;
this._d = new DataView(list.buffer, list.byteOffset);
}
public static fromPacked(data: Uint8Array) {
return new Unpacker(Buffer.from(data));
}
public unpackBool(): boolean | null {
const b = this._d.getUint8(this._offset++);
if (b === 0xc2) return false;
if (b === 0xc3) return true;
if (b === 0xc0) return null;
throw this._formatException("bool", b);
}
public unpackInt(): number | null {
const b = this._d.getUint8(this._offset++);
if (b <= 0x7f || (b >= 0xe0 && b <= 0xff)) {
return b;
} else if (b === 0xcc) {
return this._d.getUint8(this._offset++);
} else if (b === 0xcd) {
this._offset += 2;
return this._d.getUint16(this._offset - 2);
} else if (b === 0xce) {
this._offset += 4;
return this._d.getUint32(this._offset - 4);
} else if (b === 0xcf) {
this._offset += 8;
const high = this._d.getUint32(this._offset - 8);
const low = this._d.getUint32(this._offset - 4);
return high * 0x100000000 + low;
} else if (b === 0xd0) {
return this._d.getInt8(this._offset++);
} else if (b === 0xd1) {
this._offset += 2;
return this._d.getInt16(this._offset - 2);
} else if (b === 0xd2) {
this._offset += 4;
return this._d.getInt32(this._offset - 4);
} else if (b === 0xd3) {
this._offset += 8;
const high = this._d.getInt32(this._offset - 8);
const low = this._d.getUint32(this._offset - 4);
return high * 0x100000000 + low;
} else if (b === 0xc0) {
return null;
} else {
throw this._formatException("integer", b);
}
}
public unpackDouble(): number | null {
const b = this._d.getUint8(this._offset++);
if (b === 0xca) {
this._offset += 4;
return this._d.getFloat32(this._offset - 4);
} else if (b === 0xcb) {
this._offset += 8;
return this._d.getFloat64(this._offset - 8);
} else if (b === 0xc0) {
return null;
} else {
throw this._formatException("double", b);
}
}
public unpackString(): string | null {
const b = this._d.getUint8(this._offset++);
let len: number;
if ((b & 0xe0) === 0xa0) {
len = b & 0x1f;
} else if (b === 0xd9) {
len = this._d.getUint8(this._offset++);
} else if (b === 0xda) {
this._offset += 2;
len = this._d.getUint16(this._offset - 2);
} else if (b === 0xdb) {
this._offset += 4;
len = this._d.getUint32(this._offset - 4);
} else if (b === 0xc0) {
return null;
} else {
throw this._formatException("String", b);
}
const str = this._list.toString("utf-8", this._offset, this._offset + len);
this._offset += len;
return str;
}
public unpackBinary(): Buffer {
const b = this._d.getUint8(this._offset++);
let len: number;
if (b === 0xc4) {
len = this._d.getUint8(this._offset++);
} else if (b === 0xc5) {
this._offset += 2;
len = this._d.getUint16(this._offset - 2);
} else if (b === 0xc6) {
this._offset += 4;
len = this._d.getUint32(this._offset - 4);
} else if (b === 0xc0) {
len = 0;
} else {
throw this._formatException("Binary", b);
}
const data = this._list.slice(this._offset, this._offset + len);
this._offset += len;
return data;
}
public unpackList(): any[] {
const length = this.unpackListLength();
return Array.from({ length }, () => this._unpack());
}
public unpackMap(): { [key: string]: any } {
const length = this.unpackMapLength();
const obj: { [key: string]: any } = {};
for (let i = 0; i < length; i++) {
const key = this._unpack();
obj[key as string] = this._unpack();
}
return obj;
}
public unpackListLength(): number {
const b = this._d.getUint8(this._offset++);
if ((b & 0xf0) === 0x90) {
return b & 0xf;
} else if (b === 0xdc) {
this._offset += 2;
return this._d.getUint16(this._offset - 2);
} else if (b === 0xdd) {
this._offset += 4;
return this._d.getUint32(this._offset - 4);
} else if (b === 0xc0) {
return 0;
} else {
throw this._formatException("List length", b);
}
}
public unpackMapLength(): number {
const b = this._d.getUint8(this._offset++);
if ((b & 0xf0) === 0x80) {
return b & 0xf;
} else if (b === 0xde) {
this._offset += 2;
return this._d.getUint16(this._offset - 2);
} else if (b === 0xdf) {
this._offset += 4;
return this._d.getUint32(this._offset - 4);
} else if (b === 0xc0) {
return 0;
} else {
throw this._formatException("Map length", b);
}
}
private _unpack(): any {
const b = this._d.getUint8(this._offset);
if (b <= 0x7f || (b >= 0xe0 && b <= 0xff)) {
return this.unpackInt();
} else if (b === 0xc2 || b === 0xc3 || b === 0xc0) {
return this.unpackBool();
} else if (b === 0xca || b === 0xcb) {
return this.unpackDouble();
} else if ((b & 0xe0) === 0xa0 || b === 0xd9 || b === 0xda || b === 0xdb) {
return this.unpackString();
} else if (b === 0xc4 || b === 0xc5 || b === 0xc6) {
return this.unpackBinary();
} else if ((b & 0xf0) === 0x90 || b === 0xdc || b === 0xdd) {
return this.unpackList();
} else if ((b & 0xf0) === 0x80 || b === 0xde || b === 0xdf) {
return this.unpackMap();
} else {
throw this._formatException("Unknown", b);
}
}
// Implement other methods here, following the same pattern as unpackBool and unpackInt
private _formatException(type: string, b: number) {
return new Error(
`Try to unpack ${type} value, but it's not a ${type}, byte = ${b}`,
);
}
}

528
src/service/p2p.ts Normal file
View File

@ -0,0 +1,528 @@
import { Multihash } from "../multihash.js";
import NodeId from "../nodeId.js";
import { equalBytes } from "@noble/curves/abstract/utils";
import { Logger, Peer, S5Config, SignedMessage } from "../types.js";
import KeyPairEd25519 from "../ed25519.js";
import * as crypto from "crypto";
import {
mkeyEd25519,
protocolMethodAnnouncePeers,
protocolMethodHandshakeDone,
protocolMethodHandshakeOpen,
protocolMethodHashQuery,
protocolMethodSignedMessage,
recordTypeRegistryEntry,
recordTypeStorageLocation,
storageLocationTypeFull,
} from "../constants.js";
import defer from "p-defer";
import { calculateScore, decodeEndian, encodeEndian } from "#util.js";
import Packer from "#serialization/pack.js";
import Unpacker from "#serialization/unpack.js";
import { ed25519 } from "@noble/curves/ed25519";
import { AbstractLevel, AbstractSublevel } from "abstract-level";
import StorageLocation from "#storage.js";
import { addStorageLocation, stringifyNode } from "#node.js";
import { URL } from "url";
import { Buffer } from "buffer";
import { connect as tcpConnect, TcpPeer } from "../peer/tcp.js";
import { connect as wsConnect, WebSocketPeer } from "../peer/webSocket.js";
export class P2PService {
get peers(): Map<string, Peer> {
return this._peers;
}
private config: S5Config;
private logger: Logger;
private nodeKeyPair: KeyPairEd25519;
private localNodeId?: NodeId;
private networkId?: string;
private _peers: Map<string, Peer> = new Map();
private reconnectDelay: Map<string, number> = new Map();
private selfConnectionUris: Array<URL> = [];
private nodesDb?: AbstractSublevel<
AbstractLevel<Uint8Array, string, Uint8Array>,
Uint8Array,
string,
Uint8Array
>;
private hashQueryRoutingTable: Map<Multihash, Set<NodeId>> = new Map();
constructor(config: S5Config) {
this.config = config;
this.networkId = config?.p2p?.network;
this.nodeKeyPair = config.keyPair;
this.logger = config.logger;
config.services.p2p = this;
}
async init(): Promise<void> {
this.localNodeId = new NodeId(this.nodeKeyPair.publicKey); // Define the NodeId constructor
this.nodesDb = this.config.db.sublevel<string, Uint8Array>("s5-nodes", {});
}
async start(): Promise<void> {
const initialPeers = this.config?.p2p?.peers?.initial || [];
for (const p of initialPeers) {
this.connectToNode([new URL(p)]);
}
}
async onNewPeer(peer: Peer, verifyId: boolean): Promise<void> {
peer.challenge = crypto.randomBytes(32);
const initialAuthPayloadPacker = new Packer();
initialAuthPayloadPacker.packInt(protocolMethodHandshakeOpen);
initialAuthPayloadPacker.packBinary(Buffer.from(peer.challenge));
if (this.networkId) {
initialAuthPayloadPacker.packString(this.networkId);
}
const completer = defer<void>();
const supportedFeatures = 3; // 0b00000011
peer.listenForMessages(
async (event: Uint8Array) => {
let u = Unpacker.fromPacked(event);
const method = u.unpackInt();
if (method === protocolMethodHandshakeOpen) {
const p = new Packer();
p.packInt(protocolMethodHandshakeDone);
p.packBinary(u.unpackBinary());
let peerNetworkId: string | null = null;
try {
peerNetworkId = u.unpackString();
} catch {}
if (this.networkId && peerNetworkId !== this.networkId) {
throw `Peer is in different network: ${peerNetworkId}`;
}
p.packInt(supportedFeatures);
p.packInt(this.selfConnectionUris.length);
for (const uri of this.selfConnectionUris) {
p.packString(uri.toString());
}
// TODO Protocol version
// p.packInt(protocolVersion);
peer.sendMessage(await this.signMessageSimple(p.takeBytes()));
return;
} else if (method === recordTypeRegistryEntry) {
const sre =
this.config.services.registry.deserializeRegistryEntry(event);
await this.config.services.registry.set(sre, false, peer);
return;
} else if (method === recordTypeStorageLocation) {
const hash = new Multihash(event.subarray(1, 34));
const type = event[34];
const expiry = decodeEndian(event.subarray(35, 39));
const partCount = event[39];
const parts: string[] = [];
let cursor = 40;
for (let i = 0; i < partCount; i++) {
const length = decodeEndian(event.subarray(cursor, cursor + 2));
cursor += 2;
parts.push(
new TextDecoder().decode(event.subarray(cursor, cursor + length)),
);
cursor += length;
}
cursor++;
const publicKey = event.subarray(cursor, cursor + 33);
const signature = event.subarray(cursor + 33);
if (publicKey[0] !== mkeyEd25519) {
throw `Unsupported public key type ${mkeyEd25519}`;
}
if (
!ed25519.verify(
signature,
event.subarray(0, cursor),
publicKey.subarray(1),
)
) {
return;
}
const nodeId = new NodeId(publicKey);
await addStorageLocation({
hash,
nodeId,
location: new StorageLocation(type, parts, expiry),
message: event,
config: this.config,
});
const list =
this.hashQueryRoutingTable.get(hash) || new Set<NodeId>();
for (const peerId of list) {
if (peerId.equals(nodeId)) {
continue;
}
if (peerId.equals(peer.id)) {
continue;
}
if (this._peers.has(peerId.toString())) {
try {
this._peers.get(peerId.toString())?.sendMessage(event);
} catch (e) {
this.logger.catched(e);
}
}
}
this.hashQueryRoutingTable.delete(hash);
}
if (method === protocolMethodSignedMessage) {
const sm = await this.unpackAndVerifySignature(u);
u = Unpacker.fromPacked(sm.message);
const method2 = u.unpackInt();
if (method2 === protocolMethodHandshakeDone) {
const challenge = u.unpackBinary();
if (!equalBytes(peer.challenge, challenge)) {
throw "Invalid challenge";
}
const pId = sm.nodeId;
if (!verifyId) {
peer.id = pId;
} else {
if (!peer.id.equals(pId)) {
throw "Invalid peer id on initial list";
}
}
peer.isConnected = true;
const supportedFeatures = u.unpackInt();
if (supportedFeatures !== 3) {
throw "Remote node does not support required features";
}
this._peers.set(peer.id.toString(), peer);
this.reconnectDelay.set(peer.id.toString(), 1);
const connectionUrisCount = u.unpackInt() as number;
peer.connectionUris = [];
for (let i = 0; i < connectionUrisCount; i++) {
peer.connectionUris.push(new URL(u.unpackString() as string));
}
this.logger.info(
`[+] ${peer.id.toString()} (${peer
.renderLocationUri()
.toString()})`,
);
this.sendPublicPeersToPeer(peer, Array.from(this._peers.values()));
for (const p of this._peers.values()) {
if (p.id.equals(peer.id)) continue;
if (p.isConnected) {
this.sendPublicPeersToPeer(p, [peer]);
}
}
return;
} else if (method2 === protocolMethodAnnouncePeers) {
const length = u.unpackInt() as number;
for (let i = 0; i < length; i++) {
const peerIdBinary = u.unpackBinary();
const id = new NodeId(peerIdBinary);
const isConnected = u.unpackBool() as boolean;
const connectionUrisCount = u.unpackInt() as number;
const connectionUris: URL[] = [];
for (let i = 0; i < connectionUrisCount; i++) {
connectionUris.push(new URL(u.unpackString() as string));
}
if (connectionUris.length > 0) {
// TODO Fully support multiple connection uris
const uri = new URL(connectionUris[0].toString());
uri.username = id.toBase58();
if (
!this.reconnectDelay.has(
NodeId.decode(uri.username).toString(),
)
) {
this.connectToNode([uri]);
}
}
}
}
} /* else if (method === protocolMethodRegistryQuery) {
const pk = u.unpackBinary();
const sre = node.registry.getFromDB(pk);
if (sre !== null) {
peer.sendMessage(node.registry.serializeRegistryEntry(sre));
}
}*/
},
{
onDone: async () => {
try {
if (this._peers.has(peer.id.toString())) {
this._peers.delete(peer.id.toString());
this.logger.info(
`[-] ${peer.id.toString()} (${peer
.renderLocationUri()
.toString()})`,
);
}
} catch (_) {
this.logger.info(`[-] ${peer.renderLocationUri()}`);
}
completer.reject("onDone");
},
onError: (e) => {
this.logger.warn(`${peer.id}: ${e}`);
},
logger: this.logger,
},
);
peer.sendMessage(initialAuthPayloadPacker.takeBytes());
return completer.promise;
}
async prepareProvideMessage(
hash: Multihash,
location: StorageLocation,
): Promise<Uint8Array> {
const list: number[] = [
recordTypeStorageLocation,
...hash.fullBytes,
location.type,
...encodeEndian(location.expiry, 4),
location.parts.length,
];
for (const part of location.parts) {
const bytes = new TextEncoder().encode(part);
list.push(...encodeEndian(bytes.length, 2));
list.push(...Array.from(bytes));
}
list.push(0);
const signature = ed25519.sign(
new Uint8Array(list),
this.nodeKeyPair.extractBytes(),
);
return new Uint8Array([
...list,
...Array.from(this.nodeKeyPair.publicKey),
...Array.from(signature),
]);
}
async sendPublicPeersToPeer(peer: Peer, peersToSend: Peer[]): Promise<void> {
const p = new Packer();
p.packInt(protocolMethodAnnouncePeers);
p.packInt(peersToSend.length);
for (const pts of peersToSend) {
p.packBinary(Buffer.from(pts.id.bytes));
p.packBool(pts.isConnected);
p.packInt(pts.connectionUris.length);
for (const uri of pts.connectionUris) {
p.packString(uri.toString());
}
}
peer.sendMessage(await this.signMessageSimple(p.takeBytes()));
}
async getNodeScore(nodeId: NodeId): Promise<number> {
if (nodeId.equals(this.localNodeId)) {
return 1;
}
const node = await this.nodesDb?.get(stringifyNode(nodeId));
if (!node) {
return 0.5;
}
const map = Unpacker.fromPacked(node).unpackMap();
return calculateScore(map.get(1), map.get(2));
}
private async _vote(nodeId: NodeId, upvote: boolean): Promise<void> {
const node = await this.nodesDb?.get(stringifyNode(nodeId));
const map = node
? Unpacker.fromPacked(node).unpackMap()
: new Map<number, number>(
Object.entries({ 1: 0, 2: 0 }).map(([k, v]) => [+k, v]),
);
if (upvote) {
map.set(1, (map.get(1) ?? 0) + 1);
} else {
map.set(2, (map.get(2) ?? 0) + 1);
}
await this.nodesDb?.put(
stringifyNode(nodeId),
new Packer().pack(map).takeBytes(),
);
}
async upvote(nodeId: NodeId): Promise<void> {
await this._vote(nodeId, true);
}
async downvote(nodeId: NodeId): Promise<void> {
await this._vote(nodeId, false);
}
// TODO add a bit of randomness with multiple options
async sortNodesByScore(nodes: NodeId[]): Promise<NodeId[]> {
const nodePromises = nodes.map(
(item): [NodeId, Promise<number> | number] => [
item,
this.getNodeScore(item),
],
);
await Promise.all(nodePromises.map((item) => item[1]));
for (let i = 0; i < nodePromises.length; i++) {
nodePromises[i][1] = await nodePromises[i][1];
}
return nodePromises
.sort((a: [NodeId, any], b: [NodeId, any]) => b[1] - a[1])
.map((item) => item[0]);
}
async signMessageSimple(message: Uint8Array): Promise<Uint8Array> {
const packer = new Packer();
const signature = ed25519.sign(this.nodeKeyPair.extractBytes(), message);
packer.packInt(protocolMethodSignedMessage);
packer.packBinary(Buffer.from(this.localNodeId!.bytes));
packer.packBinary(Buffer.from(signature));
packer.packBinary(Buffer.from(message));
return packer.takeBytes();
}
async unpackAndVerifySignature(u: Unpacker): Promise<SignedMessage> {
const nodeId = new NodeId(u.unpackBinary());
const signature = u.unpackBinary();
const message = u.unpackBinary();
const isValid = ed25519.verify(
signature,
message,
nodeId.bytes.subarray(1),
);
if (!isValid) {
throw new Error("Invalid signature found");
}
return {
nodeId: nodeId,
message: message,
};
}
sendHashRequest(
hash: Multihash,
types: number[] = [storageLocationTypeFull],
): void {
const p = new Packer();
p.packInt(protocolMethodHashQuery);
p.packBinary(Buffer.from(hash.fullBytes));
p.pack(types);
// TODO Maybe add int for hop count (or not because privacy concerns)
const req = p.takeBytes();
for (const peer of this._peers.values()) {
peer.sendMessage(req);
}
}
async connectToNode(connectionUris: URL[], retried = false): Promise<void> {
const unsupported = new URL("http://0.0.0.0");
unsupported.protocol = "unsupported";
const connectionUri =
connectionUris.find((uri) => ["ws:", "wss:"].includes(uri.protocol)) ||
connectionUris.find((uri) => uri.protocol === "tcp:") ||
unsupported;
if (connectionUri.protocol === "unsupported") {
throw new Error(
`None of the available connection URIs are supported (${connectionUris})`,
);
}
const protocol = connectionUri.protocol;
if (!connectionUri.username) {
throw new Error("Connection URI does not contain node id");
}
const id = NodeId.decode(connectionUri.username);
this.reconnectDelay.set(
id.toString(),
this.reconnectDelay.get(id.toString()) || 1,
);
if (id.equals(this.localNodeId)) {
return;
}
try {
this.logger.verbose(`[connect] ${connectionUri}`);
if (protocol === "tcp:") {
const ip = connectionUri.hostname;
const port = parseInt(connectionUri.port);
const socket = await tcpConnect(port, ip);
const peer = new TcpPeer(socket, [connectionUri]);
peer.id = id;
await this.onNewPeer(peer, true);
} else {
const channel = await wsConnect(connectionUri.toString());
const peer = new WebSocketPeer(channel, [connectionUri]);
peer.id = id;
await this.onNewPeer(peer, true);
}
} catch (e) {
if (retried) {
return;
}
retried = true;
this.logger.catched(e);
const delay = this.reconnectDelay.get(id.toString())!;
this.reconnectDelay.set(id.toString(), delay * 2);
await new Promise((resolve) => setTimeout(resolve, delay * 1000));
await this.connectToNode(connectionUris, retried);
}
}
}

240
src/service/registry.ts Normal file
View File

@ -0,0 +1,240 @@
import { Logger, Peer, S5Config } from "#types.js";
import { AbstractLevel, AbstractSublevel } from "abstract-level";
import {
mkeyEd25519,
protocolMethodRegistryQuery,
recordTypeRegistryEntry,
registryMaxDataSize,
} from "#constants.js";
import { Multihash } from "#multihash.js";
import { base64url } from "multiformats/bases/base64";
import { decodeEndian, encodeEndian } from "#util.js";
import { ed25519 } from "@noble/curves/ed25519";
import Packer from "#serialization/pack.js";
import { Buffer } from "buffer";
import { EventEmitter } from "events";
import KeyPairEd25519 from "#ed25519.js";
import { stringifyBytes } from "#node.js";
interface SignedRegistryEntry {
pk: Uint8Array; // public key with multicodec prefix
revision: number; // revision number of this entry, maximum is (256^8)-1
data: Uint8Array; // data stored in this entry, can have a maximum length of 48 bytes
signature: Uint8Array; // signature of this registry entry
}
export class RegistryService {
private db?: AbstractSublevel<
AbstractLevel<Uint8Array, string, Uint8Array>,
Uint8Array,
string,
Uint8Array
>;
private config: S5Config;
private logger: Logger;
private streams: Map<string, EventEmitter> = new Map<string, EventEmitter>();
constructor(config: S5Config) {
this.config = config;
this.logger = this.config.logger;
}
async init(): Promise<void> {
this.db = this.config.db.sublevel<string, Uint8Array>("s5-registry-db", {});
}
async set(
sre: SignedRegistryEntry,
trusted: boolean = false,
receivedFrom?: Peer,
): Promise<void> {
this.logger.verbose(
`[registry] set ${base64url.encode(sre.pk)} ${sre.revision} (${
receivedFrom?.id
})`,
);
if (!trusted) {
if (sre.pk.length !== 33) {
throw new Error("Invalid pubkey");
}
if (sre.pk[0] !== mkeyEd25519) {
throw new Error("Only ed25519 keys are supported");
}
if (sre.revision < 0 || sre.revision > 281474976710656) {
throw new Error("Invalid revision");
}
if (sre.data.length > registryMaxDataSize) {
throw new Error("Data too long");
}
const isValid = verifyRegistryEntry(sre);
if (!isValid) {
throw new Error("Invalid signature found");
}
}
const existingEntry = await this.getFromDB(sre.pk);
if (existingEntry) {
if (receivedFrom) {
if (existingEntry.revision === sre.revision) {
return;
} else if (existingEntry.revision > sre.revision) {
const updateMessage = serializeRegistryEntry(existingEntry);
receivedFrom.sendMessage(updateMessage);
return;
}
}
if (existingEntry.revision >= sre.revision) {
throw new Error("Revision number too low");
}
}
const key = new Multihash(sre.pk);
this.streams.get(key.toString())?.emit("event", sre);
this.db?.put(stringifyBytes(sre.pk), serializeRegistryEntry(sre));
this.broadcastEntry(sre, receivedFrom);
}
// TODO: Clean this table after some time
// TODO: If there are more than X peers, only broadcast to subscribed nodes (routing table) and shard-nodes (256)
broadcastEntry(sre: SignedRegistryEntry, receivedFrom?: Peer): void {
this.logger.verbose("[registry] broadcastEntry");
const updateMessage = serializeRegistryEntry(sre);
for (const p of Object.values(this.config.services.p2p.peers)) {
if (receivedFrom == null || p.id !== receivedFrom.id) {
p.sendMessage(updateMessage);
}
}
}
sendRegistryRequest(pk: Uint8Array): void {
const p = new Packer();
p.packInt(protocolMethodRegistryQuery);
p.packBinary(Buffer.from(pk));
const req = p.takeBytes();
// TODO: Use shard system if there are more than X peers
for (const peer of Object.values(this.config.services.p2p.peers)) {
peer.sendMessage(req);
}
}
async get(pk: Uint8Array): Promise<SignedRegistryEntry | null> {
const key = new Multihash(pk);
if (this.streams.has(key.toString())) {
this.logger.verbose(`[registry] get (subbed) ${key}`);
let res = await this.getFromDB(pk);
if (res !== null) {
return res;
}
this.sendRegistryRequest(pk);
await new Promise((resolve) => setTimeout(resolve, 200));
return this.getFromDB(pk);
} else {
this.sendRegistryRequest(pk);
this.streams.set(key.toString(), new EventEmitter());
let res = await this.getFromDB(pk);
if (res === null) {
this.logger.verbose(`[registry] get (clean) ${key}`);
for (let i = 0; i < 200; i++) {
await new Promise((resolve) => setTimeout(resolve, 10));
if ((await this.getFromDB(pk)) !== null) break;
}
} else {
this.logger.verbose(`[registry] get (cached) ${key}`);
await new Promise((resolve) => setTimeout(resolve, 200));
}
return this.getFromDB(pk);
}
}
private async setEntryHelper(
keyPair: KeyPairEd25519,
data: Uint8Array,
): Promise<void> {
const revision = Math.round(Date.now() / 1000);
const sre: SignedRegistryEntry = await this.signRegistryEntry({
kp: keyPair,
data,
revision,
});
await this.set(sre);
}
async signRegistryEntry({
kp,
data,
revision,
}: {
kp: KeyPairEd25519;
data: Uint8Array;
revision: number;
}): Promise<SignedRegistryEntry> {
const list = new Uint8Array([
recordTypeRegistryEntry,
...encodeEndian(revision, 8),
data.length,
...data,
]);
const signature = ed25519.sign(list, kp.extractBytes());
return {
pk: kp.publicKey,
revision,
data,
signature: new Uint8Array(signature),
};
}
async getFromDB(pk: Uint8Array): Promise<SignedRegistryEntry | null> {
const val = await this.db?.get(stringifyBytes(pk));
if (val) {
return this.deserializeRegistryEntry(val);
}
return null;
}
public deserializeRegistryEntry(event: Uint8Array): SignedRegistryEntry {
const dataLength = event[42];
return {
pk: event.slice(1, 34),
revision: decodeEndian(event.slice(34, 42)),
data: event.slice(43, 43 + dataLength),
signature: event.slice(43 + dataLength),
};
}
}
function verifyRegistryEntry(sre: SignedRegistryEntry): boolean {
const list: Uint8Array = Uint8Array.from([
recordTypeRegistryEntry,
...encodeEndian(sre.revision, 8),
sre.data.length, // 1 byte
...sre.data,
]);
return ed25519.verify(list, sre.signature, sre.pk.slice(1));
}
function serializeRegistryEntry(sre: SignedRegistryEntry): Uint8Array {
return Uint8Array.from([
recordTypeRegistryEntry,
...sre.pk,
...encodeEndian(sre.revision, 8),
sre.data.length,
...sre.data,
...sre.signature,
]);
}

31
src/storage.ts Normal file
View File

@ -0,0 +1,31 @@
export default class StorageLocation {
type: number;
parts: string[];
binaryParts: Uint8Array[] = [];
expiry: number; // Unix timestamp in seconds
providerMessage?: Uint8Array; // Made optional, similar to `late` in Dart
constructor(type: number, parts: string[], expiry: number) {
this.type = type;
this.parts = parts;
this.expiry = expiry;
}
get bytesUrl(): string {
return this.parts[0];
}
get outboardBytesUrl(): string {
if (this.parts.length === 1) {
return `${this.parts[0]}.obao`;
}
return this.parts[1];
}
toString(): string {
const expiryDate = new Date(this.expiry * 1000);
return `StorageLocation(${this.type}, ${
this.parts
}, expiry: ${expiryDate.toISOString()})`;
}
}

58
src/types.ts Normal file
View File

@ -0,0 +1,58 @@
import NodeId from "./nodeId.js";
import KeyPairEd25519 from "#ed25519.js";
import { AbstractLevel } from "abstract-level";
import { P2PService } from "./service/p2p.js";
import { RegistryService } from "./service/registry.js";
export interface Peer {
id: NodeId;
connectionUris: Array<URL>;
isConnected: boolean;
challenge: Uint8Array;
sendMessage(message: Uint8Array): void;
listenForMessages(
callback: (event: any) => Promise<void>,
{
onDone,
onError,
logger,
}: {
onDone?: any;
onError?: (...args: any[]) => void;
logger: Logger;
},
): void;
renderLocationUri(): string;
}
export interface Logger {
info(s: string): void;
verbose(s: string): void;
warn(s: string): void;
error(s: string): void;
catched(e: any, context?: string | null): void;
}
export interface S5Config {
p2p?: {
network: string;
peers?: {
initial?: string[];
};
};
keyPair: KeyPairEd25519;
logger: Logger;
db: AbstractLevel<Uint8Array, string, Uint8Array>;
cacheDb: AbstractLevel<Uint8Array, string, Uint8Array>;
services: {
p2p: P2PService;
registry: RegistryService;
};
}
export interface SignedMessage {
nodeId: NodeId;
message: Uint8Array;
}

30
src/util.ts Normal file
View File

@ -0,0 +1,30 @@
export function decodeEndian(bytes: Uint8Array): number {
let total = 0;
for (let i = 0; i < bytes.length; i++) {
total += bytes[i] * Math.pow(256, i);
}
return total;
}
export function encodeEndian(value: number, length: number): Uint8Array {
const res = new Uint8Array(length);
for (let i = 0; i < length; i++) {
res[i] = value & 0xff;
value = value >> 8;
}
return res;
}
export function calculateScore(
goodResponses: number,
badResponses: number,
): number {
const totalVotes = goodResponses + badResponses;
if (totalVotes === 0) return 0.5;
const average = goodResponses / totalVotes;
return average - (average - 0.5) * Math.pow(2, -Math.log(totalVotes + 1));
}