diff --git a/bao/plugin.go b/bao/plugin.go index 1f89c6f..d24c0a3 100644 --- a/bao/plugin.go +++ b/bao/plugin.go @@ -3,6 +3,8 @@ package bao import ( "context" + "github.com/docker/go-units" + "git.lumeweb.com/LumeWeb/portal/bao/proto" "github.com/google/uuid" "github.com/hashicorp/go-plugin" @@ -11,10 +13,13 @@ import ( var _ Bao = (*BaoGRPC)(nil) +const VERIFY_CHUNK_SIZE = 256 * units.KiB + type Bao interface { NewHasher() uuid.UUID Hash(id uuid.UUID, data []byte) bool Finish(id uuid.UUID) Result + Verify(data []byte, offset uint64, proof []byte, hash []byte) bool } type BaoPlugin struct { @@ -73,3 +78,13 @@ func (b *BaoGRPC) Finish(id uuid.UUID) Result { return Result{Hash: ret.Hash, Proof: ret.Proof} } + +func (b *BaoGRPC) Verify(data []byte, offset uint64, proof []byte, hash []byte) bool { + ret, err := b.client.Verify(context.Background(), &proto.VerifyRequest{Data: data, Offset: offset, Proof: proof, Hash: hash}) + + if err != nil { + panic(err) + } + + return ret.Status +} diff --git a/bao/proto/bao.proto b/bao/proto/bao.proto index 89221fa..fc0641a 100644 --- a/bao/proto/bao.proto +++ b/bao/proto/bao.proto @@ -30,9 +30,21 @@ message FinishResponse { bytes proof = 2; } +message VerifyRequest { + bytes data = 1; + uint64 offset = 2; + bytes proof = 3; + bytes hash = 4; +} + +message VerifyResponse { + bool status = 1; +} + service Bao { rpc NewHasher(NewHasherRequest) returns (NewHasherResponse); rpc Hash(HashRequest) returns (HashResponse); rpc Finish(FinishRequest) returns (FinishResponse); + rpc Verify(VerifyRequest) returns (VerifyResponse); } diff --git a/bao/rust/Cargo.lock b/bao/rust/Cargo.lock index f6e678c..d5d88da 100644 --- a/bao/rust/Cargo.lock +++ b/bao/rust/Cargo.lock @@ -634,6 +634,7 @@ name = "rust" version = "0.1.0" dependencies = [ "abao", + "anyhow", "portpicker", "prost", "tokio", diff --git a/bao/rust/Cargo.toml b/bao/rust/Cargo.toml index 86142c4..04d0e5c 100644 --- a/bao/rust/Cargo.toml +++ b/bao/rust/Cargo.toml @@ -13,3 +13,4 @@ tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros","sign portpicker = "0.1.1" uuid = { version = "1.7.0", features = ["v4"] } abao = { git = "https://github.com/LumeWeb/abao.git", branch = "feature/inner_mut", default-features = false, features = ["group_size_256k", "tokio", "tokio_io"] } +anyhow = "1.0.79" diff --git a/bao/rust/src/main.rs b/bao/rust/src/main.rs index 0117ef3..c6db817 100644 --- a/bao/rust/src/main.rs +++ b/bao/rust/src/main.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::io::{Cursor, Write}; +use std::io::{Cursor, Read, Seek, SeekFrom, Write}; use std::net::SocketAddr; use std::sync::Arc; @@ -12,7 +12,7 @@ use uuid::Uuid; use bao::bao_server::Bao; use tokio::signal::unix::{signal, SignalKind}; -use crate::bao::{bao_server, FinishRequest, FinishResponse, HashRequest, HashResponse, NewHasherRequest, NewHasherResponse}; +use crate::bao::{bao_server, FinishRequest, FinishResponse, HashRequest, HashResponse, NewHasherRequest, NewHasherResponse, VerifyRequest, VerifyResponse}; #[path = "proto/bao.rs"] mod bao; @@ -33,7 +33,7 @@ impl Bao for BaoService { let id = Uuid::new_v4(); { let mut state = self.state.write().await; - state.hashers.insert(id,Arc::new( Mutex::new(encoder))); + state.hashers.insert(id, Arc::new(Mutex::new(encoder))); } Ok(Response::new(NewHasherResponse { id: id.to_string(), @@ -69,10 +69,89 @@ impl Bao for BaoService { proof, })) } + + async fn verify(&self, request: Request) -> Result, Status> { + let req = request.get_ref(); + let res = verify_internal( + req.data.clone(), + req.offset, + req.proof.clone(), + from_vec_to_array(req.hash.clone() + ); + + if res.is_err() { + Ok(Response::new(VerifyResponse { + status: false, + })) + } else { + Ok(Response::new(VerifyResponse { + status: true, + })) + } + } +} + +fn verify_internal( + chunk_bytes: Vec, + offset: u64, + bao_outboard_bytes: Vec, + blake3_hash: [u8; 32], +) -> anyhow::Result { + let mut slice_stream = abao::encode::SliceExtractor::new_outboard( + FakeSeeker::new(&chunk_bytes[..]), + Cursor::new(&bao_outboard_bytes), + offset, + 262144, + ); + + let mut decode_stream = abao::decode::SliceDecoder::new( + &mut slice_stream, + &abao::Hash::from(blake3_hash), + offset, + 262144, + ); + let mut decoded = Vec::new(); + decode_stream.read_to_end(&mut decoded)?; + + Ok(1) +} + +fn from_vec_to_array(v: Vec) -> [T; N] { + core::convert::TryInto::try_into(v) + .unwrap_or_else(|v: Vec| panic!("Expected a Vec of length {} but it was {}", N, v.len())) +} + +struct FakeSeeker { + reader: R, + bytes_read: u64, +} + +impl FakeSeeker { + fn new(reader: R) -> Self { + Self { + reader, + bytes_read: 0, + } + } +} + +impl Read for FakeSeeker { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let n = self.reader.read(buf)?; + self.bytes_read += n as u64; + Ok(n) + } +} + +impl Seek for FakeSeeker { + fn seek(&mut self, _: SeekFrom) -> std::io::Result { + // Do nothing and return the current position. + Ok(self.bytes_read) + } } impl BaoService { - fn new(state: Arc>) -> Self { + fn new(state: Arc>) -> Self { BaoService { state } } } @@ -106,11 +185,11 @@ async fn main() -> Result<(), Box> { // Sending a signal through the channel to initiate shutdown. // If the receiver is dropped, we don't care about the error. - let _ = tx.send(()); + let _ = tx.send(()); }); Server::builder() - .max_frame_size( (1 << 24) - 1) + .max_frame_size((1 << 24) - 1) .add_service(bao_server::BaoServer::new(BaoService::new(global_state.clone()))) .add_service(health_reporter.1) .serve_with_shutdown(addr, async {