feat: add support for verifying data in bao service

This commit is contained in:
Derrick Hammer 2024-02-25 07:21:20 -05:00
parent ea8c50edc7
commit de3b226df5
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
5 changed files with 114 additions and 6 deletions

View File

@ -3,6 +3,8 @@ package bao
import ( import (
"context" "context"
"github.com/docker/go-units"
"git.lumeweb.com/LumeWeb/portal/bao/proto" "git.lumeweb.com/LumeWeb/portal/bao/proto"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hashicorp/go-plugin" "github.com/hashicorp/go-plugin"
@ -11,10 +13,13 @@ import (
var _ Bao = (*BaoGRPC)(nil) var _ Bao = (*BaoGRPC)(nil)
const VERIFY_CHUNK_SIZE = 256 * units.KiB
type Bao interface { type Bao interface {
NewHasher() uuid.UUID NewHasher() uuid.UUID
Hash(id uuid.UUID, data []byte) bool Hash(id uuid.UUID, data []byte) bool
Finish(id uuid.UUID) Result Finish(id uuid.UUID) Result
Verify(data []byte, offset uint64, proof []byte, hash []byte) bool
} }
type BaoPlugin struct { type BaoPlugin struct {
@ -73,3 +78,13 @@ func (b *BaoGRPC) Finish(id uuid.UUID) Result {
return Result{Hash: ret.Hash, Proof: ret.Proof} return Result{Hash: ret.Hash, Proof: ret.Proof}
} }
func (b *BaoGRPC) Verify(data []byte, offset uint64, proof []byte, hash []byte) bool {
ret, err := b.client.Verify(context.Background(), &proto.VerifyRequest{Data: data, Offset: offset, Proof: proof, Hash: hash})
if err != nil {
panic(err)
}
return ret.Status
}

View File

@ -30,9 +30,21 @@ message FinishResponse {
bytes proof = 2; bytes proof = 2;
} }
message VerifyRequest {
bytes data = 1;
uint64 offset = 2;
bytes proof = 3;
bytes hash = 4;
}
message VerifyResponse {
bool status = 1;
}
service Bao { service Bao {
rpc NewHasher(NewHasherRequest) returns (NewHasherResponse); rpc NewHasher(NewHasherRequest) returns (NewHasherResponse);
rpc Hash(HashRequest) returns (HashResponse); rpc Hash(HashRequest) returns (HashResponse);
rpc Finish(FinishRequest) returns (FinishResponse); rpc Finish(FinishRequest) returns (FinishResponse);
rpc Verify(VerifyRequest) returns (VerifyResponse);
} }

1
bao/rust/Cargo.lock generated
View File

@ -634,6 +634,7 @@ name = "rust"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"abao", "abao",
"anyhow",
"portpicker", "portpicker",
"prost", "prost",
"tokio", "tokio",

View File

@ -13,3 +13,4 @@ tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros","sign
portpicker = "0.1.1" portpicker = "0.1.1"
uuid = { version = "1.7.0", features = ["v4"] } uuid = { version = "1.7.0", features = ["v4"] }
abao = { git = "https://github.com/LumeWeb/abao.git", branch = "feature/inner_mut", default-features = false, features = ["group_size_256k", "tokio", "tokio_io"] } abao = { git = "https://github.com/LumeWeb/abao.git", branch = "feature/inner_mut", default-features = false, features = ["group_size_256k", "tokio", "tokio_io"] }
anyhow = "1.0.79"

View File

@ -1,5 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Cursor, Write}; use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -12,7 +12,7 @@ use uuid::Uuid;
use bao::bao_server::Bao; use bao::bao_server::Bao;
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use crate::bao::{bao_server, FinishRequest, FinishResponse, HashRequest, HashResponse, NewHasherRequest, NewHasherResponse}; use crate::bao::{bao_server, FinishRequest, FinishResponse, HashRequest, HashResponse, NewHasherRequest, NewHasherResponse, VerifyRequest, VerifyResponse};
#[path = "proto/bao.rs"] #[path = "proto/bao.rs"]
mod bao; mod bao;
@ -33,7 +33,7 @@ impl Bao for BaoService {
let id = Uuid::new_v4(); let id = Uuid::new_v4();
{ {
let mut state = self.state.write().await; let mut state = self.state.write().await;
state.hashers.insert(id,Arc::new( Mutex::new(encoder))); state.hashers.insert(id, Arc::new(Mutex::new(encoder)));
} }
Ok(Response::new(NewHasherResponse { Ok(Response::new(NewHasherResponse {
id: id.to_string(), id: id.to_string(),
@ -69,10 +69,89 @@ impl Bao for BaoService {
proof, proof,
})) }))
} }
async fn verify(&self, request: Request<VerifyRequest>) -> Result<Response<VerifyResponse>, Status> {
let req = request.get_ref();
let res = verify_internal(
req.data.clone(),
req.offset,
req.proof.clone(),
from_vec_to_array(req.hash.clone()
);
if res.is_err() {
Ok(Response::new(VerifyResponse {
status: false,
}))
} else {
Ok(Response::new(VerifyResponse {
status: true,
}))
}
}
}
fn verify_internal(
chunk_bytes: Vec<u8>,
offset: u64,
bao_outboard_bytes: Vec<u8>,
blake3_hash: [u8; 32],
) -> anyhow::Result<u8> {
let mut slice_stream = abao::encode::SliceExtractor::new_outboard(
FakeSeeker::new(&chunk_bytes[..]),
Cursor::new(&bao_outboard_bytes),
offset,
262144,
);
let mut decode_stream = abao::decode::SliceDecoder::new(
&mut slice_stream,
&abao::Hash::from(blake3_hash),
offset,
262144,
);
let mut decoded = Vec::new();
decode_stream.read_to_end(&mut decoded)?;
Ok(1)
}
fn from_vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
core::convert::TryInto::try_into(v)
.unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", N, v.len()))
}
struct FakeSeeker<R: Read> {
reader: R,
bytes_read: u64,
}
impl<R: Read> FakeSeeker<R> {
fn new(reader: R) -> Self {
Self {
reader,
bytes_read: 0,
}
}
}
impl<R: Read> Read for FakeSeeker<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let n = self.reader.read(buf)?;
self.bytes_read += n as u64;
Ok(n)
}
}
impl<R: Read> Seek for FakeSeeker<R> {
fn seek(&mut self, _: SeekFrom) -> std::io::Result<u64> {
// Do nothing and return the current position.
Ok(self.bytes_read)
}
} }
impl BaoService { impl BaoService {
fn new(state: Arc<RwLock<GlobalState>>) -> Self { fn new(state: Arc<RwLock<GlobalState>>) -> Self {
BaoService { state } BaoService { state }
} }
} }
@ -106,11 +185,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sending a signal through the channel to initiate shutdown. // Sending a signal through the channel to initiate shutdown.
// If the receiver is dropped, we don't care about the error. // If the receiver is dropped, we don't care about the error.
let _ = tx.send(()); let _ = tx.send(());
}); });
Server::builder() Server::builder()
.max_frame_size( (1 << 24) - 1) .max_frame_size((1 << 24) - 1)
.add_service(bao_server::BaoServer::new(BaoService::new(global_state.clone()))) .add_service(bao_server::BaoServer::new(BaoService::new(global_state.clone())))
.add_service(health_reporter.1) .add_service(health_reporter.1)
.serve_with_shutdown(addr, async { .serve_with_shutdown(addr, async {