refactor: heavily simplify and consolidate implementations to just use light sync updates and not the optimistic_update endpoint, and use a generic callback interface

This commit is contained in:
Derrick Hammer 2023-07-12 17:40:59 -04:00
parent cc6f53e6e8
commit 6408098050
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
11 changed files with 277 additions and 421 deletions

View File

@ -6,14 +6,16 @@ import {
} from "@lodestar/light-client/utils";
import { init } from "@chainsafe/bls/switchable";
import { Mutex } from "async-mutex";
import { fromHexString } from "@chainsafe/ssz";
import { getDefaultClientConfig } from "#util.js";
import isNode from "detect-node";
import { fromHexString, toHexString } from "@chainsafe/ssz";
import { deserializePubkeys, getDefaultClientConfig } from "#util.js";
import { capella, LightClientUpdate } from "#types.js";
import { deserializeSyncCommittee } from "@lodestar/light-client/utils/index.js";
import bls from "@chainsafe/bls/switchable.js";
import { assertValidLightClientUpdate } from "@lodestar/light-client/validation.js";
export interface BaseClientOptions {
prover: IProver;
store?: IStore;
store: IStore;
}
export default abstract class BaseClient {
@ -24,10 +26,10 @@ export default abstract class BaseClient {
(pk) => fromHexString(pk),
);
protected genesisPeriod = computeSyncPeriodAtSlot(this.config.genesis.slot);
private genesisTime = this.config.genesis.time;
protected booted = false;
private syncMutex = new Mutex();
protected options: BaseClientOptions;
private genesisTime = this.config.genesis.time;
private syncMutex = new Mutex();
constructor(options: BaseClientOptions) {
this.options = options;
@ -43,6 +45,10 @@ export default abstract class BaseClient {
return this._latestPeriod === this.getCurrentPeriod();
}
public get store(): IStore {
return this.options.store as IStore;
}
public async sync(): Promise<void> {
await init("herumi");
@ -88,7 +94,6 @@ export default abstract class BaseClient {
protected async subscribe(callback?: (ei: ExecutionInfo) => void) {
setInterval(async () => {
try {
await this._sync();
const ei = await this.getLatestExecution();
if (ei && ei.blockHash !== this.latestBlockHash) {
this.latestBlockHash = ei.blockHash;
@ -100,16 +105,128 @@ export default abstract class BaseClient {
}, POLLING_DELAY);
}
public get store(): IStore {
return this.options.store as IStore;
protected async getLatestExecution(): Promise<ExecutionInfo | null> {
await this._sync();
const update = capella.ssz.LightClientUpdate.deserialize(
this.store.getUpdate(this.latestPeriod),
);
return {
blockHash: toHexString(update.attestedHeader.execution.blockHash),
blockNumber: update.attestedHeader.execution.blockNumber,
};
}
async syncProver(
startPeriod: number,
currentPeriod: number,
startCommittee: Uint8Array[],
): Promise<{ syncCommittee: Uint8Array[]; period: number }> {
for (let period = startPeriod; period < currentPeriod; period += 1) {
try {
const updates = await this.options.prover.getSyncUpdate(
period,
currentPeriod,
);
for (let i = 0; i < updates.length; i++) {
const curPeriod = period + i;
const update = updates[i];
const validOrCommittee = await this.syncUpdateVerifyGetCommittee(
startCommittee,
curPeriod,
update,
);
if (!(validOrCommittee as boolean)) {
console.log(`Found invalid update at period(${curPeriod})`);
return {
syncCommittee: startCommittee,
period: curPeriod,
};
}
await this.options.store.addUpdate(period, update);
startCommittee = validOrCommittee as Uint8Array[];
period = curPeriod;
}
} catch (e) {
console.error(`failed to fetch sync update for period(${period})`);
return {
syncCommittee: startCommittee,
period,
};
}
}
return {
syncCommittee: startCommittee,
period: currentPeriod,
};
}
// committee and prover index of the first honest prover
protected abstract syncFromGenesis(): Promise<Uint8Array[]>;
protected async syncUpdateVerifyGetCommittee(
prevCommittee: Uint8Array[],
period: number,
update: LightClientUpdate,
): Promise<false | Uint8Array[]> {
const updatePeriod = computeSyncPeriodAtSlot(
update.attestedHeader.beacon.slot,
);
if (period !== updatePeriod) {
console.error(
`Expected update with period ${period}, but received ${updatePeriod}`,
);
return false;
}
protected abstract syncFromLastUpdate(
startPeriod?: number,
): Promise<Uint8Array[]>;
const prevCommitteeFast = deserializeSyncCommittee({
pubkeys: prevCommittee,
aggregatePubkey: bls.PublicKey.aggregate(
deserializePubkeys(prevCommittee),
).toBytes(),
});
protected abstract getLatestExecution(): Promise<ExecutionInfo | null>;
try {
// check if the update has valid signatures
await assertValidLightClientUpdate(
this.config.chainConfig,
prevCommitteeFast,
update,
);
return update.nextSyncCommittee.pubkeys;
} catch (e) {
console.error(e);
return false;
}
}
protected async syncFromGenesis(): Promise<Uint8Array[]> {
return this.syncFromLastUpdate(this.genesisPeriod);
}
protected async syncFromLastUpdate(
startPeriod = this.latestPeriod,
): Promise<Uint8Array[]> {
const currentPeriod = this.getCurrentPeriod();
let startCommittee = this.genesisCommittee;
console.debug(
`Sync started from period(${startPeriod}) to period(${currentPeriod})`,
);
const { syncCommittee, period } = await this.syncProver(
startPeriod,
currentPeriod,
startCommittee,
);
if (period === currentPeriod) {
console.debug(
`Sync completed from period(${startPeriod}) to period(${currentPeriod})`,
);
return syncCommittee;
}
throw new Error("no honest prover found");
}
}

View File

@ -1,17 +1,9 @@
import BaseClient, { BaseClientOptions } from "#baseClient.js";
import { ExecutionInfo, IVerifyingProviderConstructor } from "#interfaces.js";
import { DEFAULT_BATCH_SIZE } from "#constants.js";
import { IClientProver } from "#client/prover.js";
import {
getCommitteeHash,
optimisticUpdateFromJSON,
optimisticUpdateVerify,
} from "#util.js";
import { equalBytes } from "@noble/curves/abstract/utils";
import { IProver, IVerifyingProviderConstructor } from "#interfaces.js";
import { IClientVerifyingProvider } from "#client/verifyingProvider.js";
interface Config extends BaseClientOptions {
prover: IClientProver;
prover: IProver;
provider: IVerifyingProviderConstructor<IClientVerifyingProvider>;
rpcHandler: Function;
}
@ -48,74 +40,6 @@ export default class Client extends BaseClient {
}
}
protected async getLatestExecution(): Promise<ExecutionInfo | null> {
const updateJSON = await this.options.prover.callback(
"consensus_optimistic_update",
);
const update = optimisticUpdateFromJSON(updateJSON);
const verify = await optimisticUpdateVerify(
this.latestCommittee as Uint8Array[],
update,
);
if (!verify.correct) {
console.error(`Invalid Optimistic Update: ${verify.reason}`);
return null;
}
console.log(
`Optimistic update verified for slot ${updateJSON.attested_header.beacon.slot}`,
);
return {
blockHash: updateJSON.attested_header.execution.block_hash,
blockNumber: updateJSON.attested_header.execution.block_number,
};
}
protected syncFromGenesis(): Promise<Uint8Array[]> {
return this.syncFromLastUpdate(0);
}
protected async syncFromLastUpdate(
startPeriod = this.latestPeriod,
): Promise<Uint8Array[]> {
const currentPeriod = this.getCurrentPeriod();
let lastCommitteeHash: Uint8Array = getCommitteeHash(this.genesisCommittee);
for (let period = startPeriod + 1; period <= currentPeriod; period++) {
try {
lastCommitteeHash = await this.options.prover.getCommitteeHash(
period,
currentPeriod,
DEFAULT_BATCH_SIZE,
);
} catch (e: any) {
throw new Error(
`failed to fetch committee hash for prover at period(${period}): ${e.meessage}`,
);
}
}
return this.getCommittee(currentPeriod, lastCommitteeHash);
}
private async getCommittee(
period: number,
expectedCommitteeHash: Uint8Array | null,
): Promise<Uint8Array[]> {
if (period === this.genesisPeriod) {
return this.genesisCommittee;
}
if (!expectedCommitteeHash) {
throw new Error("expectedCommitteeHash required");
}
const committee = await this.options.prover.getCommittee(period);
const committeeHash = getCommitteeHash(committee);
if (!equalBytes(committeeHash, expectedCommitteeHash as Uint8Array)) {
throw new Error("prover responded with an incorrect committee");
}
return committee;
}
public async rpcCall(method: string, params: any) {
return this.provider?.rpcMethod(method, params);
}

View File

@ -1,6 +1,7 @@
import Client from "./client.js";
import Prover, { IClientProver, ProverRequestCallback } from "./prover.js";
import Prover, { ProverRequestCallback } from "../prover.js";
import VerifyingProvider from "./verifyingProvider.js";
import Store from "#store.js";
function createDefaultClient(
proverHandler: ProverRequestCallback,
@ -8,6 +9,7 @@ function createDefaultClient(
): Client {
return new Client({
prover: new Prover(proverHandler),
store: new Store(60 * 60),
provider: VerifyingProvider,
rpcHandler,
});
@ -15,5 +17,5 @@ function createDefaultClient(
export { RPCRequest, RPCRequestRaw, RPCResponse } from "./rpc.js";
export { Client, Prover, VerifyingProvider, createDefaultClient };
export { IClientProver, ProverRequestCallback };
export { ProverRequestCallback };
export * from "#interfaces.js";

View File

@ -1,73 +0,0 @@
import {
ConsensusCommitteeHashesRequest,
ConsensusCommitteePeriodRequest,
IProver,
} from "#interfaces.js";
import { CommitteeSSZ, HashesSSZ } from "#ssz.js";
import { LightClientUpdate } from "#types.js";
import * as capella from "@lodestar/types/capella";
export type ProverRequestCallback = (
action: string,
args?: ConsensusCommitteeHashesRequest | ConsensusCommitteePeriodRequest,
) => Promise<any>;
export interface IClientProver extends IProver {
get callback(): ProverRequestCallback;
getCommittee(period: number | "latest"): Promise<Uint8Array[]>;
getSyncUpdate(period: number): Promise<LightClientUpdate>;
getCommitteeHash(
period: number,
currentPeriod: number,
cacheCount: number,
): Promise<Uint8Array>;
}
export default class Prover implements IClientProver {
cachedHashes: Map<number, Uint8Array> = new Map();
constructor(callback: ProverRequestCallback) {
this._callback = callback;
}
private _callback: ProverRequestCallback;
get callback(): ProverRequestCallback {
return this._callback;
}
async getCommittee(period: number | "latest"): Promise<Uint8Array[]> {
const res = await this.callback("consensus_committee_period", { period });
return CommitteeSSZ.deserialize(Uint8Array.from(Object.values(res)));
}
async getSyncUpdate(period: number): Promise<LightClientUpdate> {
const res = await this.callback("consensus_committee_period", { period });
return capella.ssz.LightClientUpdate.deserialize(
Uint8Array.from(Object.values(res)),
);
}
async _getHashes(startPeriod: number, count: number): Promise<Uint8Array[]> {
const res = await this.callback("consensus_committee_hashes", {
start: startPeriod,
count,
});
return HashesSSZ.deserialize(Uint8Array.from(Object.values(res)));
}
async getCommitteeHash(
period: number,
currentPeriod: number,
cacheCount: number,
): Promise<Uint8Array> {
const _count = Math.min(currentPeriod - period + 1, cacheCount);
if (!this.cachedHashes.has(period)) {
const vals = await this._getHashes(period, _count);
for (let i = 0; i < _count; i++) {
this.cachedHashes.set(period + i, vals[i]);
}
}
return this.cachedHashes.get(period)!;
}
}

View File

@ -1,24 +1,21 @@
import { BeaconConfig } from "@lodestar/config";
import { GenesisData, LightClientUpdate } from "#types.js";
import Provider from "#client/verifyingProvider.js";
import { ProverRequestCallback } from "#client/index.js";
import BaseClient from "#baseClient.js";
export interface IProver {
get callback(): ProverRequestCallback;
set client(value: BaseClient);
getSyncUpdate(
startPeriod: number,
period: number,
currentPeriod: number,
cacheCount: number,
): Promise<LightClientUpdate>;
): Promise<LightClientUpdate[]>;
}
export interface IStore {
addUpdate(period: number, update: LightClientUpdate): Promise<void>;
addUpdate(period: number, update: LightClientUpdate): void;
getUpdate(period: number): Uint8Array;
getCommittee(period: number): Uint8Array;
getCommitteeHashes(period: number, count: number): Uint8Array;
hasUpdate(period: number): boolean;
}
export interface IVerifyingProvider {
@ -41,15 +38,7 @@ export interface ExecutionInfo {
blockNumber: number;
}
export interface ConsensusCommitteeHashesRequest {
export interface ConsensusCommitteeUpdateRequest {
start: number;
count: number;
}
export interface ConsensusCommitteePeriodRequest {
period: number | "latest";
}
export interface ConsensusBlockRequest {
block: number;
}

View File

@ -42,144 +42,9 @@ export default class Client extends BaseClient {
this.http.defaults.baseURL = this.beaconUrl;
}
private _latestOptimisticUpdate: any;
get latestOptimisticUpdate(): any {
return this._latestOptimisticUpdate;
}
async sync(): Promise<void> {
await super.sync();
this.subscribe();
}
async syncProver(
startPeriod: number,
currentPeriod: number,
startCommittee: Uint8Array[],
): Promise<{ syncCommittee: Uint8Array[]; period: number }> {
for (let period = startPeriod; period < currentPeriod; period += 1) {
try {
const update = await this.options.prover.getSyncUpdate(
period,
currentPeriod,
DEFAULT_BATCH_SIZE,
);
const validOrCommittee = await this.syncUpdateVerifyGetCommittee(
startCommittee,
period,
update,
);
if (!(validOrCommittee as boolean)) {
console.log(`Found invalid update at period(${period})`);
return {
syncCommittee: startCommittee,
period,
};
}
await this.options.store?.addUpdate(period, update);
startCommittee = validOrCommittee as Uint8Array[];
} catch (e) {
console.error(`failed to fetch sync update for period(${period})`);
return {
syncCommittee: startCommittee,
period,
};
}
}
return {
syncCommittee: startCommittee,
period: currentPeriod,
};
}
protected async getLatestExecution(): Promise<ExecutionInfo | null> {
const updateJSON = await getConsensusOptimisticUpdate();
const update = optimisticUpdateFromJSON(updateJSON);
const verify = await optimisticUpdateVerify(
this.latestCommittee as Uint8Array[],
update,
);
if (!verify.correct) {
// @ts-ignore
console.error(`Invalid Optimistic Update: ${verify?.reason}`);
return null;
}
this._latestOptimisticUpdate = updateJSON;
return {
blockHash: toHexString(update.attestedHeader.execution.blockHash),
blockNumber: update.attestedHeader.execution.blockNumber,
};
}
protected async syncFromGenesis(): Promise<Uint8Array[]> {
return this.syncFromLastUpdate(this.genesisPeriod);
}
protected async syncFromLastUpdate(
startPeriod = this.latestPeriod,
): Promise<Uint8Array[]> {
const currentPeriod = this.getCurrentPeriod();
let startCommittee = this.genesisCommittee;
console.debug(
`Sync started from period(${startPeriod}) to period(${currentPeriod})`,
);
const { syncCommittee, period } = await this.syncProver(
startPeriod,
currentPeriod,
startCommittee,
);
if (period === currentPeriod) {
console.debug(
`Sync completed from period(${startPeriod}) to period(${currentPeriod})`,
);
return syncCommittee;
}
throw new Error("no honest prover found");
}
protected async syncUpdateVerifyGetCommittee(
prevCommittee: Uint8Array[],
period: number,
update: LightClientUpdate,
): Promise<false | Uint8Array[]> {
const updatePeriod = computeSyncPeriodAtSlot(
update.attestedHeader.beacon.slot,
);
if (period !== updatePeriod) {
console.error(
`Expected update with period ${period}, but received ${updatePeriod}`,
);
return false;
}
const prevCommitteeFast = deserializeSyncCommittee({
pubkeys: prevCommittee,
aggregatePubkey: bls.PublicKey.aggregate(
deserializePubkeys(prevCommittee),
).toBytes(),
});
try {
// check if the update has valid signatures
await assertValidLightClientUpdate(
this.config.chainConfig,
prevCommitteeFast,
update,
);
return update.nextSyncCommittee.pubkeys;
} catch (e) {
console.error(e);
return false;
}
}
}

View File

@ -1,11 +1,20 @@
import Client from "./client.js";
import Prover from "./prover.js";
import Store from "./store.js";
import Store from "../store.js";
import Prover from "#prover.js";
import * as capella from "@lodestar/types/capella";
import { consensusClient } from "#util.js";
function createDefaultClient(beaconUrl: string): Client {
return new Client({
store: new Store(),
prover: new Prover(),
prover: new Prover(async (args) => {
const res = await consensusClient.get(
`/eth/v1/beacon/light_client/updates?start_period=${args.start}&count=${args.count}`,
);
return res.data.map((u: any) =>
capella.ssz.LightClientUpdate.fromJson(u.data),
);
}),
beaconUrl,
});
}

View File

@ -1,37 +0,0 @@
import { IProver } from "#interfaces.js";
import { LightClientUpdate } from "#types.js";
import { consensusClient } from "#util.js";
import { AxiosInstance } from "axios";
import * as capella from "@lodestar/types/capella";
export default class Prover implements IProver {
cachedSyncUpdate: Map<number, LightClientUpdate> = new Map();
private http: AxiosInstance = consensusClient;
async _getSyncUpdates(
startPeriod: number,
maxCount: number,
): Promise<LightClientUpdate[]> {
const res = await this.http(
`/eth/v1/beacon/light_client/updates?start_period=${startPeriod}&count=${maxCount}`,
);
return res.data.map((u: any) =>
capella.ssz.LightClientUpdate.fromJson(u.data),
);
}
async getSyncUpdate(
period: number,
currentPeriod: number,
cacheCount: number,
): Promise<LightClientUpdate> {
const _cacheCount = Math.min(currentPeriod - period + 1, cacheCount);
if (!this.cachedSyncUpdate.has(period)) {
const vals = await this._getSyncUpdates(period, _cacheCount);
for (let i = 0; i < _cacheCount; i++) {
this.cachedSyncUpdate.set(period + i, vals[i]);
}
}
return this.cachedSyncUpdate.get(period)!;
}
}

View File

@ -1,57 +0,0 @@
import { digest } from "@chainsafe/as-sha256";
import { CommitteeSSZ, HashesSSZ } from "#ssz.js";
import { IStore } from "#interfaces.js";
import { concatBytes } from "@noble/hashes/utils";
import { LightClientUpdate } from "#types.js";
import * as capella from "@lodestar/types/capella";
export default class Store implements IStore {
store: {
[period: number]: {
update: Uint8Array;
nextCommittee: Uint8Array;
nextCommitteeHash: Uint8Array;
};
} = {};
async addUpdate(period: number, update: LightClientUpdate) {
try {
this.store[period] = {
update: capella.ssz.LightClientUpdate.serialize(update),
nextCommittee: CommitteeSSZ.serialize(update.nextSyncCommittee.pubkeys),
nextCommitteeHash: digest(
concatBytes(...update.nextSyncCommittee.pubkeys),
),
};
} catch (e) {
console.log(e);
}
}
getUpdate(period: number): Uint8Array {
if (period in this.store) return this.store[period].update;
throw new Error(`update unavailable for period ${period}`);
}
getCommittee(period: number): Uint8Array {
if (period < 1)
throw new Error("committee not unavailable for period less than 1");
const predPeriod = period - 1;
if (predPeriod in this.store) return this.store[predPeriod].nextCommittee;
throw new Error(`committee unavailable for period ${predPeriod}`);
}
getCommitteeHashes(period: number, count: number): Uint8Array {
if (period < 1)
throw new Error("committee not unavailable for period less than 1");
const predPeriod = period - 1;
const hashes = new Array(count).fill(0).map((_, i) => {
const p = predPeriod + i;
if (p in this.store) return this.store[p].nextCommitteeHash;
throw new Error(`committee unavailable for period ${p}`);
});
return HashesSSZ.serialize(hashes);
}
}

72
src/prover.ts Normal file
View File

@ -0,0 +1,72 @@
import { ConsensusCommitteeUpdateRequest, IProver } from "#interfaces.js";
import { LightClientUpdate } from "#types.js";
import * as capella from "@lodestar/types/capella";
import BaseClient from "#baseClient.js";
export type ProverRequestCallback = (
args: ConsensusCommitteeUpdateRequest,
) => Promise<any>;
export default class Prover implements IProver {
constructor(callback: ProverRequestCallback) {
this._callback = callback;
}
private _client?: BaseClient;
set client(value: BaseClient) {
this._client = value;
}
private _callback: ProverRequestCallback;
get callback(): ProverRequestCallback {
return this._callback;
}
async getSyncUpdate(
startPeriod: number,
count: number,
): Promise<LightClientUpdate[]> {
let end = startPeriod + count;
let hasStart = this.client.store.hasUpdate(startPeriod);
let hasEnd = this.client.store.hasUpdate(startPeriod + count);
let trueStart = startPeriod;
let trueCount = count;
if (hasStart && !hasEnd) {
for (let i = startPeriod; i <= end; i++) {
if (!this.client.store.hasUpdate(i)) {
trueStart = i;
trueCount = end - i;
}
}
}
const res = await this.callback({
start: trueStart,
count: trueCount,
});
const updates: LightClientUpdate[] = [];
if (trueStart != startPeriod) {
for (let i = 0; i < trueStart - startPeriod; i++) {
updates.push(
capella.ssz.LightClientUpdate.deserialize(
this.client.store.getUpdate(startPeriod + i),
),
);
}
}
for (let i = 0; i < trueCount; i++) {
updates.push(
capella.ssz.LightClientUpdate.deserialize(res[startPeriod + i]),
);
}
return updates;
}
}

45
src/store.ts Normal file
View File

@ -0,0 +1,45 @@
import { digest } from "@chainsafe/as-sha256";
import { CommitteeSSZ, HashesSSZ } from "#ssz.js";
import { IStore } from "#interfaces.js";
import { concatBytes } from "@noble/hashes/utils";
import { LightClientUpdate } from "#types.js";
import * as capella from "@lodestar/types/capella";
import NodeCache from "node-cache";
export interface StoreItem {
update: Uint8Array;
nextCommittee: Uint8Array;
nextCommitteeHash: Uint8Array;
}
export default class Store implements IStore {
private store = new NodeCache();
constructor(expire: number = 0) {
this.store.options.stdTTL = 0;
}
addUpdate(period: number, update: LightClientUpdate) {
try {
this.store.set(period, {
update: capella.ssz.LightClientUpdate.serialize(update),
nextCommittee: CommitteeSSZ.serialize(update.nextSyncCommittee.pubkeys),
nextCommitteeHash: digest(
concatBytes(...update.nextSyncCommittee.pubkeys),
),
});
} catch (e) {
console.log(e);
}
}
getUpdate(period: number): Uint8Array {
if (this.store.has(period)) {
return this.store.get<StoreItem>(period)?.update as Uint8Array;
}
throw new Error(`update unavailable for period ${period}`);
}
hasUpdate(period: number): boolean {
return this.store.has(period);
}
}