Compare commits
No commits in common. "673f7c6dfd4cee1386462bb3841ce2742fe96b15" and "35878a2427db1f9cd436dc5c54126b59aa88adc6" have entirely different histories.
673f7c6dfd
...
35878a2427
13
bao/bao.go
13
bao/bao.go
|
@ -23,7 +23,6 @@ type Bao interface {
|
||||||
Write(id uint32, data []byte) error
|
Write(id uint32, data []byte) error
|
||||||
Finalize(id uint32) ([]byte, error)
|
Finalize(id uint32) ([]byte, error)
|
||||||
Destroy(id uint32) error
|
Destroy(id uint32) error
|
||||||
ComputeFile(path string) ([]byte, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -89,7 +88,8 @@ func init() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComputeTreeStreaming(reader io.Reader) ([]byte, error) {
|
func ComputeBaoTree(reader io.Reader) ([]byte, error) {
|
||||||
|
|
||||||
instance, err := baoInstance.Init()
|
instance, err := baoInstance.Init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -119,15 +119,6 @@ func ComputeTreeStreaming(reader io.Reader) ([]byte, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComputeTreeFile(file *os.File) ([]byte, error) {
|
|
||||||
tree, err := baoInstance.ComputeFile(file.Name())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tree, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func write(instance uint32, bytes *[]byte) error {
|
func write(instance uint32, bytes *[]byte) error {
|
||||||
err := baoInstance.Write(instance, *bytes)
|
err := baoInstance.Write(instance, *bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -45,11 +45,3 @@ func (g *GRPCClient) Destroy(id uint32) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (g *GRPCClient) ComputeFile(path string) ([]byte, error) {
|
|
||||||
tree, err := g.client.ComputeFile(context.Background(), &wrappers.StringValue{Value: path})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tree.Value, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -17,5 +17,4 @@ service bao {
|
||||||
rpc Write(WriteRequest) returns (google.protobuf.Empty);
|
rpc Write(WriteRequest) returns (google.protobuf.Empty);
|
||||||
rpc Finalize (google.protobuf.UInt32Value) returns (google.protobuf.BytesValue);
|
rpc Finalize (google.protobuf.UInt32Value) returns (google.protobuf.BytesValue);
|
||||||
rpc Destroy (google.protobuf.UInt32Value) returns (google.protobuf.Empty);
|
rpc Destroy (google.protobuf.UInt32Value) returns (google.protobuf.Empty);
|
||||||
rpc ComputeFile (google.protobuf.StringValue) returns (google.protobuf.BytesValue);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,8 @@
|
||||||
#![feature(async_fn_in_trait)]
|
#![feature(async_fn_in_trait)]
|
||||||
#![allow(incomplete_features)]
|
#![allow(incomplete_features)]
|
||||||
|
|
||||||
use io::Read;
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs::{File};
|
|
||||||
use std::io;
|
|
||||||
use std::io::{Cursor, Write};
|
use std::io::{Cursor, Write};
|
||||||
use std::sync::{Arc};
|
use std::sync::{Arc};
|
||||||
|
|
||||||
|
@ -19,7 +16,7 @@ use tonic_health::server::HealthReporter;
|
||||||
|
|
||||||
use crate::proto::bao::bao_server::{Bao, BaoServer};
|
use crate::proto::bao::bao_server::{Bao, BaoServer};
|
||||||
use crate::proto::bao::WriteRequest;
|
use crate::proto::bao::WriteRequest;
|
||||||
use crate::proto::google::protobuf::{BytesValue, Empty, StringValue, UInt32Value};
|
use crate::proto::google::protobuf::{BytesValue, Empty, UInt32Value};
|
||||||
use crate::unique_port::UniquePort;
|
use crate::unique_port::UniquePort;
|
||||||
|
|
||||||
mod proto;
|
mod proto;
|
||||||
|
@ -67,7 +64,7 @@ impl Bao for BaoService {
|
||||||
let next_id = self.counter.inc() as u32;
|
let next_id = self.counter.inc() as u32;
|
||||||
let tree = Vec::new();
|
let tree = Vec::new();
|
||||||
let cursor = Cursor::new(tree);
|
let cursor = Cursor::new(tree);
|
||||||
let encoder = Encoder::new_outboard(cursor);
|
let encoder = Encoder::new(cursor);
|
||||||
|
|
||||||
let mut req = self.requests.lock();
|
let mut req = self.requests.lock();
|
||||||
req.insert(next_id, encoder);
|
req.insert(next_id, encoder);
|
||||||
|
@ -116,36 +113,4 @@ impl Bao for BaoService {
|
||||||
|
|
||||||
Ok(Response::new(Empty::default()))
|
Ok(Response::new(Empty::default()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn compute_file(&self, request: Request<StringValue>) -> Result<Response<BytesValue>, Status> {
|
|
||||||
let r = request.into_inner();
|
|
||||||
let tree = Vec::new();
|
|
||||||
let cursor = Cursor::new(tree);
|
|
||||||
let mut encoder = Encoder::new_outboard(cursor);
|
|
||||||
let mut input = File::open(r.value)?;
|
|
||||||
|
|
||||||
copy_reader_to_writer(&mut input, &mut encoder)?;
|
|
||||||
|
|
||||||
let ret = encoder.finalize().unwrap();
|
|
||||||
let bytes = ret.as_bytes().to_vec();
|
|
||||||
Ok(Response::new(BytesValue { value: bytes }))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn copy_reader_to_writer(
|
|
||||||
reader: &mut impl Read,
|
|
||||||
writer: &mut impl Write,
|
|
||||||
) -> io::Result<u64> {
|
|
||||||
// At least 16 KiB is necessary to use AVX-512 with BLAKE3.
|
|
||||||
let mut buf = [0; 65536];
|
|
||||||
let mut written = 0;
|
|
||||||
loop {
|
|
||||||
let len = match reader.read(&mut buf) {
|
|
||||||
Ok(0) => return Ok(written),
|
|
||||||
Ok(len) => len,
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
};
|
|
||||||
writer.write_all(&buf[..len])?;
|
|
||||||
written += len as u64;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ func (f *FilesController) PostUpload() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := files.Upload(file, nil)
|
upload, err := files.Upload(file)
|
||||||
|
|
||||||
if internalError(ctx, err) {
|
if internalError(ctx, err) {
|
||||||
return
|
return
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
"io"
|
"io"
|
||||||
"lukechampine.com/blake3"
|
"lukechampine.com/blake3"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var client *resty.Client
|
var client *resty.Client
|
||||||
|
@ -24,23 +23,12 @@ func Init() {
|
||||||
client.SetDisableWarn(true)
|
client.SetDisableWarn(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
|
func Upload(r io.ReadSeeker) (model.Upload, error) {
|
||||||
var upload model.Upload
|
var upload model.Upload
|
||||||
|
|
||||||
if r == nil && file == nil {
|
|
||||||
return upload, errors.New("invalid upload mode")
|
|
||||||
}
|
|
||||||
|
|
||||||
hasher := blake3.New(32, nil)
|
hasher := blake3.New(32, nil)
|
||||||
|
|
||||||
var err error
|
_, err := io.Copy(hasher, r)
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
_, err = io.Copy(hasher, r)
|
|
||||||
} else {
|
|
||||||
_, err = io.Copy(hasher, file)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
@ -53,12 +41,7 @@ func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
_, err = r.Seek(0, io.SeekStart)
|
_, err = r.Seek(0, io.SeekStart)
|
||||||
} else {
|
|
||||||
_, err = file.Seek(0, io.SeekStart)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
@ -81,37 +64,18 @@ func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
|
||||||
return upload, errors.New("file already exists in network, but missing in database")
|
return upload, errors.New("file already exists in network, but missing in database")
|
||||||
}
|
}
|
||||||
|
|
||||||
var tree []byte
|
tree, err := bao.ComputeBaoTree(bufio.NewReader(r))
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
tree, err = bao.ComputeTreeStreaming(bufio.NewReader(r))
|
|
||||||
} else {
|
|
||||||
tree, err = bao.ComputeTreeFile(file)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
_, err = r.Seek(0, io.SeekStart)
|
_, err = r.Seek(0, io.SeekStart)
|
||||||
} else {
|
|
||||||
_, err = file.Seek(0, io.SeekStart)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var body interface{}
|
ret, err := client.R().SetBody(r).Put(fmt.Sprintf("/worker/objects/%s", hashHex))
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
body = r
|
|
||||||
} else {
|
|
||||||
body = file
|
|
||||||
}
|
|
||||||
|
|
||||||
ret, err := client.R().SetBody(body).Put(fmt.Sprintf("/worker/objects/%s", hashHex))
|
|
||||||
if ret.StatusCode() != 200 {
|
if ret.StatusCode() != 200 {
|
||||||
err = errors.New(string(ret.Body()))
|
err = errors.New(string(ret.Body()))
|
||||||
return upload, err
|
return upload, err
|
||||||
|
|
4
tus.go
4
tus.go
|
@ -13,8 +13,8 @@ import (
|
||||||
"github.com/tus/tusd/pkg/filestore"
|
"github.com/tus/tusd/pkg/filestore"
|
||||||
tusd "github.com/tus/tusd/pkg/handler"
|
tusd "github.com/tus/tusd/pkg/handler"
|
||||||
"github.com/tus/tusd/pkg/memorylocker"
|
"github.com/tus/tusd/pkg/memorylocker"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const TUS_API_PATH = "/files/tus"
|
const TUS_API_PATH = "/files/tus"
|
||||||
|
@ -133,7 +133,7 @@ func tusWorker(upload *tusd.Upload) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = files.Upload(nil, file.(*os.File))
|
_, err = files.Upload(file.(io.ReadSeeker))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue