Compare commits

...

2 Commits

7 changed files with 102 additions and 13 deletions

View File

@ -23,6 +23,7 @@ type Bao interface {
Write(id uint32, data []byte) error
Finalize(id uint32) ([]byte, error)
Destroy(id uint32) error
ComputeFile(path string) ([]byte, error)
}
func init() {
@ -88,8 +89,7 @@ func init() {
}
func ComputeBaoTree(reader io.Reader) ([]byte, error) {
func ComputeTreeStreaming(reader io.Reader) ([]byte, error) {
instance, err := baoInstance.Init()
if err != nil {
return nil, err
@ -119,6 +119,15 @@ func ComputeBaoTree(reader io.Reader) ([]byte, error) {
}
}
func ComputeTreeFile(file *os.File) ([]byte, error) {
tree, err := baoInstance.ComputeFile(file.Name())
if err != nil {
return nil, err
}
return tree, nil
}
func write(instance uint32, bytes *[]byte) error {
err := baoInstance.Write(instance, *bytes)
if err != nil {

View File

@ -45,3 +45,11 @@ func (g *GRPCClient) Destroy(id uint32) error {
return nil
}
func (g *GRPCClient) ComputeFile(path string) ([]byte, error) {
tree, err := g.client.ComputeFile(context.Background(), &wrappers.StringValue{Value: path})
if err != nil {
return nil, err
}
return tree.Value, nil
}

View File

@ -17,4 +17,5 @@ service bao {
rpc Write(WriteRequest) returns (google.protobuf.Empty);
rpc Finalize (google.protobuf.UInt32Value) returns (google.protobuf.BytesValue);
rpc Destroy (google.protobuf.UInt32Value) returns (google.protobuf.Empty);
rpc ComputeFile (google.protobuf.StringValue) returns (google.protobuf.BytesValue);
}

View File

@ -1,8 +1,11 @@
#![feature(async_fn_in_trait)]
#![allow(incomplete_features)]
use io::Read;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fs::{File};
use std::io;
use std::io::{Cursor, Write};
use std::sync::{Arc};
@ -16,7 +19,7 @@ use tonic_health::server::HealthReporter;
use crate::proto::bao::bao_server::{Bao, BaoServer};
use crate::proto::bao::WriteRequest;
use crate::proto::google::protobuf::{BytesValue, Empty, UInt32Value};
use crate::proto::google::protobuf::{BytesValue, Empty, StringValue, UInt32Value};
use crate::unique_port::UniquePort;
mod proto;
@ -64,7 +67,7 @@ impl Bao for BaoService {
let next_id = self.counter.inc() as u32;
let tree = Vec::new();
let cursor = Cursor::new(tree);
let encoder = Encoder::new(cursor);
let encoder = Encoder::new_outboard(cursor);
let mut req = self.requests.lock();
req.insert(next_id, encoder);
@ -113,4 +116,36 @@ impl Bao for BaoService {
Ok(Response::new(Empty::default()))
}
async fn compute_file(&self, request: Request<StringValue>) -> Result<Response<BytesValue>, Status> {
let r = request.into_inner();
let tree = Vec::new();
let cursor = Cursor::new(tree);
let mut encoder = Encoder::new_outboard(cursor);
let mut input = File::open(r.value)?;
copy_reader_to_writer(&mut input, &mut encoder)?;
let ret = encoder.finalize().unwrap();
let bytes = ret.as_bytes().to_vec();
Ok(Response::new(BytesValue { value: bytes }))
}
}
fn copy_reader_to_writer(
reader: &mut impl Read,
writer: &mut impl Write,
) -> io::Result<u64> {
// At least 16 KiB is necessary to use AVX-512 with BLAKE3.
let mut buf = [0; 65536];
let mut written = 0;
loop {
let len = match reader.read(&mut buf) {
Ok(0) => return Ok(written),
Ok(len) => len,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
writer.write_all(&buf[..len])?;
written += len as u64;
}
}

View File

@ -23,7 +23,7 @@ func (f *FilesController) PostUpload() {
return
}
upload, err := files.Upload(file)
upload, err := files.Upload(file, nil)
if internalError(ctx, err) {
return

View File

@ -12,6 +12,7 @@ import (
"github.com/go-resty/resty/v2"
"io"
"lukechampine.com/blake3"
"os"
)
var client *resty.Client
@ -23,12 +24,23 @@ func Init() {
client.SetDisableWarn(true)
}
func Upload(r io.ReadSeeker) (model.Upload, error) {
func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
var upload model.Upload
if r == nil && file == nil {
return upload, errors.New("invalid upload mode")
}
hasher := blake3.New(32, nil)
_, err := io.Copy(hasher, r)
var err error
if r != nil {
_, err = io.Copy(hasher, r)
} else {
_, err = io.Copy(hasher, file)
}
if err != nil {
return upload, err
}
@ -41,7 +53,12 @@ func Upload(r io.ReadSeeker) (model.Upload, error) {
return upload, err
}
_, err = r.Seek(0, io.SeekStart)
if r != nil {
_, err = r.Seek(0, io.SeekStart)
} else {
_, err = file.Seek(0, io.SeekStart)
}
if err != nil {
return upload, err
}
@ -64,18 +81,37 @@ func Upload(r io.ReadSeeker) (model.Upload, error) {
return upload, errors.New("file already exists in network, but missing in database")
}
tree, err := bao.ComputeBaoTree(bufio.NewReader(r))
var tree []byte
if r != nil {
tree, err = bao.ComputeTreeStreaming(bufio.NewReader(r))
} else {
tree, err = bao.ComputeTreeFile(file)
}
if err != nil {
return upload, err
}
_, err = r.Seek(0, io.SeekStart)
if r != nil {
_, err = r.Seek(0, io.SeekStart)
} else {
_, err = file.Seek(0, io.SeekStart)
}
if err != nil {
return upload, err
}
ret, err := client.R().SetBody(r).Put(fmt.Sprintf("/worker/objects/%s", hashHex))
var body interface{}
if r != nil {
body = r
} else {
body = file
}
ret, err := client.R().SetBody(body).Put(fmt.Sprintf("/worker/objects/%s", hashHex))
if ret.StatusCode() != 200 {
err = errors.New(string(ret.Body()))
return upload, err

4
tus.go
View File

@ -13,8 +13,8 @@ import (
"github.com/tus/tusd/pkg/filestore"
tusd "github.com/tus/tusd/pkg/handler"
"github.com/tus/tusd/pkg/memorylocker"
"io"
"log"
"os"
)
const TUS_API_PATH = "/files/tus"
@ -133,7 +133,7 @@ func tusWorker(upload *tusd.Upload) error {
return err
}
_, err = files.Upload(file.(io.ReadSeeker))
_, err = files.Upload(nil, file.(*os.File))
if err != nil {
log.Print(err)
return err