refactor: move to new golang bao implementation
This commit is contained in:
parent
2f514c02be
commit
aa702ffd02
File diff suppressed because it is too large
Load Diff
|
@ -1,28 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "bao"
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
abao = { version = "0.2.0", features = ["group_size_256k", "tokio_io"], default-features = false }
|
|
||||||
anyhow = "1.0.71"
|
|
||||||
async-stream = "0.3.5"
|
|
||||||
async-trait = "0.1.68"
|
|
||||||
atomic-counter = "1.0.1"
|
|
||||||
futures = "0.3.28"
|
|
||||||
gag = "1.0.0"
|
|
||||||
hyper = "0.14.26"
|
|
||||||
log = "0.4.17"
|
|
||||||
parking_lot = "0.12.1"
|
|
||||||
portpicker = "0.1.1"
|
|
||||||
prost = "0.11.9"
|
|
||||||
serde = { version = "1.0.163", features = ["derive"] }
|
|
||||||
thiserror = "1.0.40"
|
|
||||||
tokio = { version = "1.28.1", features = ["rt", "rt-multi-thread"] }
|
|
||||||
tokio-stream = "0.1.14"
|
|
||||||
tonic = "0.9.2"
|
|
||||||
tonic-health = "0.9.2"
|
|
||||||
tower = "0.4.13"
|
|
||||||
|
|
||||||
[build-dependencies]
|
|
||||||
tonic-build = "0.9.2"
|
|
173
bao/bao.go
173
bao/bao.go
|
@ -1,165 +1,44 @@
|
||||||
package bao
|
package bao
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"github.com/hashicorp/go-plugin"
|
|
||||||
"io"
|
"io"
|
||||||
"io/fs"
|
"lukechampine.com/blake3"
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate protoc --proto_path=proto/ bao.proto --go_out=proto --go_opt=paths=source_relative --go-grpc_out=proto --go-grpc_opt=paths=source_relative
|
const (
|
||||||
|
chunkSize = 1024
|
||||||
|
)
|
||||||
|
|
||||||
//go:embed target/release/bao
|
func ComputeTree(reader io.Reader, size int64) ([]byte, [32]byte, error) {
|
||||||
var baoPlugin []byte
|
bufSize := baoOutboardSize(int(size))
|
||||||
var baoInstance Bao
|
buf := bufferAt{buf: make([]byte, bufSize)}
|
||||||
|
|
||||||
type Bao interface {
|
hash, err := blake3.BaoEncode(&buf, bufio.NewReader(reader), size, true)
|
||||||
Init() (uint32, error)
|
if err != nil {
|
||||||
Write(id uint32, data []byte) error
|
return nil, [32]byte{}, err
|
||||||
Finalize(id uint32) ([]byte, error)
|
}
|
||||||
Destroy(id uint32) error
|
|
||||||
ComputeFile(path string) ([]byte, error)
|
return buf.buf, hash, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func baoOutboardSize(dataLen int) int {
|
||||||
baoExec, err := os.CreateTemp("", "lumeportal")
|
if dataLen == 0 {
|
||||||
|
return 8
|
||||||
_, err = baoExec.Write(baoPlugin)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Error:", err.Error())
|
|
||||||
}
|
}
|
||||||
|
chunks := (dataLen + chunkSize - 1) / chunkSize
|
||||||
err = baoExec.Sync()
|
cvs := 2*chunks - 2 // no I will not elaborate
|
||||||
if err != nil {
|
return 8 + cvs*32
|
||||||
log.Fatalf("Error:", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
err = baoExec.Chmod(fs.ModePerm)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Error:", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
err = baoExec.Close()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Error:", err.Error())
|
|
||||||
}
|
|
||||||
pluginMap := map[string]plugin.Plugin{
|
|
||||||
"bao": &BAOPlugin{},
|
|
||||||
}
|
|
||||||
client := plugin.NewClient(&plugin.ClientConfig{
|
|
||||||
HandshakeConfig: plugin.HandshakeConfig{
|
|
||||||
ProtocolVersion: 1,
|
|
||||||
MagicCookieKey: "foo",
|
|
||||||
MagicCookieValue: "bar",
|
|
||||||
},
|
|
||||||
Plugins: pluginMap,
|
|
||||||
Cmd: exec.Command("sh", "-c", baoExec.Name()),
|
|
||||||
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
|
|
||||||
})
|
|
||||||
|
|
||||||
// Connect via RPC
|
|
||||||
rpcClient, err := client.Client()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Error:", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request the plugin
|
|
||||||
raw, err := rpcClient.Dispense("bao")
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Error:", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
baoInstance = raw.(Bao)
|
|
||||||
|
|
||||||
signalCh := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-signalCh
|
|
||||||
err := os.Remove(baoExec.Name())
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Error:", err.Error())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComputeTreeStreaming(reader io.Reader) ([]byte, error) {
|
type bufferAt struct {
|
||||||
instance, err := baoInstance.Init()
|
buf []byte
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
b := make([]byte, 512)
|
|
||||||
for {
|
|
||||||
n, err := reader.Read(b)
|
|
||||||
|
|
||||||
if n > 0 {
|
|
||||||
err := write(instance, &b)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
var result []byte
|
|
||||||
if err == io.EOF {
|
|
||||||
result, err = finalize(instance)
|
|
||||||
if err == nil {
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComputeTreeFile(file *os.File) ([]byte, error) {
|
func (b *bufferAt) WriteAt(p []byte, off int64) (int, error) {
|
||||||
tree, err := baoInstance.ComputeFile(file.Name())
|
if copy(b.buf[off:], p) != len(p) {
|
||||||
if err != nil {
|
panic("bad buffer size")
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
return len(p), nil
|
||||||
return tree, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func write(instance uint32, bytes *[]byte) error {
|
|
||||||
err := baoInstance.Write(instance, *bytes)
|
|
||||||
if err != nil {
|
|
||||||
derr := destroy(instance)
|
|
||||||
if derr != nil {
|
|
||||||
return derr
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
derr := destroy(instance)
|
|
||||||
if derr != nil {
|
|
||||||
return derr
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func finalize(instance uint32) ([]byte, error) {
|
|
||||||
result, err := baoInstance.Finalize(instance)
|
|
||||||
if err != nil {
|
|
||||||
derr := destroy(instance)
|
|
||||||
if derr != nil {
|
|
||||||
return nil, derr
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
func destroy(instance uint32) error {
|
|
||||||
return baoInstance.Destroy(instance)
|
|
||||||
}
|
}
|
||||||
|
|
15
bao/build.rs
15
bao/build.rs
|
@ -1,15 +0,0 @@
|
||||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
tonic_build::configure()
|
|
||||||
.build_server(true)
|
|
||||||
.out_dir("src/proto")
|
|
||||||
.compile_well_known_types(true)
|
|
||||||
.include_file("mod.rs")
|
|
||||||
.type_attribute(".", "#[derive(serde::Deserialize)]")
|
|
||||||
.type_attribute(".", "#[derive(serde::Serialize)]")
|
|
||||||
.compile(&[
|
|
||||||
"proto/grpc_stdio.proto",
|
|
||||||
"proto/bao.proto"
|
|
||||||
], &["bao"])
|
|
||||||
.unwrap();
|
|
||||||
Ok(())
|
|
||||||
}
|
|
|
@ -1,55 +0,0 @@
|
||||||
package bao
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/bao/proto"
|
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
|
||||||
"github.com/golang/protobuf/ptypes/wrappers"
|
|
||||||
)
|
|
||||||
|
|
||||||
// GRPCClient is an implementation of KV that talks over RPC.
|
|
||||||
type GRPCClient struct{ client proto.BaoClient }
|
|
||||||
|
|
||||||
func (g *GRPCClient) Init() (uint32, error) {
|
|
||||||
init, err := g.client.Init(context.Background(), &empty.Empty{})
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return init.Value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *GRPCClient) Write(id uint32, data []byte) error {
|
|
||||||
_, err := g.client.Write(context.Background(), &proto.WriteRequest{Id: id, Data: data})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *GRPCClient) Finalize(id uint32) ([]byte, error) {
|
|
||||||
tree, err := g.client.Finalize(context.Background(), &wrappers.UInt32Value{Value: id})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tree.Value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *GRPCClient) Destroy(id uint32) error {
|
|
||||||
_, err := g.client.Destroy(context.Background(), &wrappers.UInt32Value{Value: id})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (g *GRPCClient) ComputeFile(path string) ([]byte, error) {
|
|
||||||
tree, err := g.client.ComputeFile(context.Background(), &wrappers.StringValue{Value: path})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tree.Value, nil
|
|
||||||
}
|
|
|
@ -1,21 +0,0 @@
|
||||||
package bao
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/bao/proto"
|
|
||||||
"github.com/hashicorp/go-plugin"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type BAOPlugin struct {
|
|
||||||
plugin.Plugin
|
|
||||||
Impl Bao
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *BAOPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *BAOPlugin) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
|
|
||||||
return &GRPCClient{client: proto.NewBaoClient(c)}, nil
|
|
||||||
}
|
|
|
@ -1,21 +0,0 @@
|
||||||
syntax = "proto3";
|
|
||||||
|
|
||||||
import "google/protobuf/empty.proto";
|
|
||||||
import "google/protobuf/wrappers.proto";
|
|
||||||
|
|
||||||
option go_package = "git.lumeweb.com/LumeWeb/portal/bao/proto";
|
|
||||||
|
|
||||||
package bao;
|
|
||||||
|
|
||||||
message WriteRequest {
|
|
||||||
uint32 id = 1;
|
|
||||||
bytes data = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
service bao {
|
|
||||||
rpc Init (google.protobuf.Empty) returns (google.protobuf.UInt32Value);
|
|
||||||
rpc Write(WriteRequest) returns (google.protobuf.Empty);
|
|
||||||
rpc Finalize (google.protobuf.UInt32Value) returns (google.protobuf.BytesValue);
|
|
||||||
rpc Destroy (google.protobuf.UInt32Value) returns (google.protobuf.Empty);
|
|
||||||
rpc ComputeFile (google.protobuf.StringValue) returns (google.protobuf.BytesValue);
|
|
||||||
}
|
|
|
@ -1,33 +0,0 @@
|
||||||
// Copyright (c) HashiCorp, Inc.
|
|
||||||
// SPDX-License-Identifier: MPL-2.0
|
|
||||||
|
|
||||||
syntax = "proto3";
|
|
||||||
package plugin;
|
|
||||||
option go_package = "plugin";
|
|
||||||
|
|
||||||
import "google/protobuf/empty.proto";
|
|
||||||
|
|
||||||
// GRPCStdio is a service that is automatically run by the plugin process
|
|
||||||
// to stream any stdout/err data so that it can be mirrored on the plugin
|
|
||||||
// host side.
|
|
||||||
service GRPCStdio {
|
|
||||||
// StreamStdio returns a stream that contains all the stdout/stderr.
|
|
||||||
// This RPC endpoint must only be called ONCE. Once stdio data is consumed
|
|
||||||
// it is not sent again.
|
|
||||||
//
|
|
||||||
// Callers should connect early to prevent blocking on the plugin process.
|
|
||||||
rpc StreamStdio(google.protobuf.Empty) returns (stream StdioData);
|
|
||||||
}
|
|
||||||
|
|
||||||
// StdioData is a single chunk of stdout or stderr data that is streamed
|
|
||||||
// from GRPCStdio.
|
|
||||||
message StdioData {
|
|
||||||
enum Channel {
|
|
||||||
INVALID = 0;
|
|
||||||
STDOUT = 1;
|
|
||||||
STDERR = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
Channel channel = 1;
|
|
||||||
bytes data = 2;
|
|
||||||
}
|
|
|
@ -1,34 +0,0 @@
|
||||||
use hyper::http::uri::InvalidUri;
|
|
||||||
use thiserror::Error as ThisError;
|
|
||||||
use tokio::sync::mpsc::error::SendError;
|
|
||||||
use tonic::transport::Error as TonicError;
|
|
||||||
use std::fmt::{Debug};
|
|
||||||
|
|
||||||
pub fn into_status(err: Error) -> tonic::Status {
|
|
||||||
tonic::Status::unknown(format!("{}", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, ThisError)]
|
|
||||||
pub enum Error {
|
|
||||||
#[error("Error with IO: {0}")]
|
|
||||||
Io(#[from] std::io::Error),
|
|
||||||
#[error("Error with tonic (gRPC) transport: {0}")]
|
|
||||||
TonicTransport(#[from] TonicError),
|
|
||||||
#[error("Error parsing string into a network address: {0}")]
|
|
||||||
AddrParser(#[from] std::net::AddrParseError),
|
|
||||||
#[error("Error sending on a mpsc channel: {0}")]
|
|
||||||
Send(String),
|
|
||||||
#[error("Invalid Uri: {0}")]
|
|
||||||
InvalidUri(#[from] InvalidUri),
|
|
||||||
#[error(transparent)]
|
|
||||||
Other(#[from] anyhow::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> From<SendError<T>> for Error {
|
|
||||||
fn from(_err: SendError<T>) -> Self {
|
|
||||||
Self::Send(format!(
|
|
||||||
"unable to send {} on a mpsc channel",
|
|
||||||
std::any::type_name::<T>()
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,98 +0,0 @@
|
||||||
// Copied from: https://github.com/hashicorp/go-plugin/blob/master/grpc_controller.go
|
|
||||||
use anyhow::{Context, Result};
|
|
||||||
use async_stream::stream;
|
|
||||||
use futures::stream::Stream;
|
|
||||||
use gag::BufferRedirect;
|
|
||||||
use std::io::Read;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use tokio::time::{sleep, Duration};
|
|
||||||
use tokio_stream::StreamExt;
|
|
||||||
use tonic::{async_trait, Request, Response, Status};
|
|
||||||
use crate::proto::google::protobuf::Empty;
|
|
||||||
use crate::proto::plugin::grpc_stdio_server::{GrpcStdio, GrpcStdioServer};
|
|
||||||
use crate::proto::plugin::stdio_data::Channel;
|
|
||||||
use crate::proto::plugin::{StdioData};
|
|
||||||
use crate::grpc::error::into_status;
|
|
||||||
|
|
||||||
const CONSOLE_POLL_SLEEP_MILLIS: u64 = 500;
|
|
||||||
|
|
||||||
pub fn new_server() -> GrpcStdioServer<GrpcStdioImpl> {
|
|
||||||
GrpcStdioServer::new(GrpcStdioImpl {})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct GrpcStdioImpl {}
|
|
||||||
|
|
||||||
impl GrpcStdioImpl {
|
|
||||||
fn new_combined_stream() -> Result<<Self as GrpcStdio>::StreamStdioStream, Status> {
|
|
||||||
log::trace!("new_inner_stream called. Asked for a stream of stdout and stderr");
|
|
||||||
log::info!("Gagging stdout and stderr to a buffer for redirection to plugin's host.",);
|
|
||||||
|
|
||||||
let stdoutbuf = BufferRedirect::stdout()
|
|
||||||
.context("Failed to create a BufferRedirec from stdout")
|
|
||||||
.map_err(|e| e.into())
|
|
||||||
.map_err(into_status)?;
|
|
||||||
let stdout_stream = GrpcStdioImpl::new_stream("stdout", Channel::Stdout as i32, stdoutbuf);
|
|
||||||
|
|
||||||
let stderrbuf = BufferRedirect::stderr()
|
|
||||||
.context("Failed to create a BufferRedirec from stderr")
|
|
||||||
.map_err(|e| e.into())
|
|
||||||
.map_err(into_status)?;
|
|
||||||
let stderr_stream = GrpcStdioImpl::new_stream("stderr", Channel::Stderr as i32, stderrbuf);
|
|
||||||
|
|
||||||
let merged_stream = stdout_stream.merge(stderr_stream);
|
|
||||||
|
|
||||||
Ok(Box::pin(merged_stream))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_stream(
|
|
||||||
stream_name: &'static str,
|
|
||||||
channel: i32,
|
|
||||||
mut redirected_buf: BufferRedirect,
|
|
||||||
) -> impl Stream<Item = Result<StdioData, Status>> {
|
|
||||||
stream! {
|
|
||||||
loop {
|
|
||||||
log::trace!("beginning next iteration of {} reading and streaming...", stream_name);
|
|
||||||
let mut readbuf = String::new();
|
|
||||||
match redirected_buf.read_to_string(&mut readbuf) {
|
|
||||||
Ok(len) => match len{
|
|
||||||
0 => {
|
|
||||||
log::trace!("{} had zero bytes. Sleeping to avoid polling...", stream_name);
|
|
||||||
sleep(Duration::from_millis(CONSOLE_POLL_SLEEP_MILLIS)).await;
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
log::trace!("Sending {} {} bytes of data: {}", stream_name, len, readbuf);
|
|
||||||
yield Ok(StdioData{
|
|
||||||
channel,
|
|
||||||
data: readbuf.into_bytes(),
|
|
||||||
});
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
log::error!("Error reading {} data: {:?}", stream_name, e);
|
|
||||||
yield Err(Status::unknown(format!("Error reading from Stderr of plugin's process: {:?}", e)));
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl GrpcStdio for GrpcStdioImpl {
|
|
||||||
type StreamStdioStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<StdioData, Status>> + Send + 'static>>;
|
|
||||||
|
|
||||||
async fn stream_stdio(
|
|
||||||
&self,
|
|
||||||
_req: Request<Empty>,
|
|
||||||
) -> Result<Response<Self::StreamStdioStream>, Status> {
|
|
||||||
log::trace!("stream_stdio called.");
|
|
||||||
|
|
||||||
let s = GrpcStdioImpl::new_combined_stream()?;
|
|
||||||
|
|
||||||
log::trace!("stream_stdio responding with a stream of StdioData.",);
|
|
||||||
|
|
||||||
Ok(Response::new(s))
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,2 +0,0 @@
|
||||||
pub mod grpc_stdio;
|
|
||||||
pub mod error;
|
|
151
bao/src/main.rs
151
bao/src/main.rs
|
@ -1,151 +0,0 @@
|
||||||
#![feature(async_fn_in_trait)]
|
|
||||||
#![allow(incomplete_features)]
|
|
||||||
|
|
||||||
use io::Read;
|
|
||||||
use std::collections::hash_map::Entry;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fs::{File};
|
|
||||||
use std::io;
|
|
||||||
use std::io::{Cursor, Write};
|
|
||||||
use std::sync::{Arc};
|
|
||||||
|
|
||||||
use abao::encode::Encoder;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use atomic_counter::{AtomicCounter, ConsistentCounter};
|
|
||||||
use parking_lot::Mutex;
|
|
||||||
use tonic::{Request, Response, Status};
|
|
||||||
use tonic::transport::Server;
|
|
||||||
use tonic_health::server::HealthReporter;
|
|
||||||
|
|
||||||
use crate::proto::bao::bao_server::{Bao, BaoServer};
|
|
||||||
use crate::proto::bao::WriteRequest;
|
|
||||||
use crate::proto::google::protobuf::{BytesValue, Empty, StringValue, UInt32Value};
|
|
||||||
use crate::unique_port::UniquePort;
|
|
||||||
|
|
||||||
mod proto;
|
|
||||||
mod unique_port;
|
|
||||||
mod grpc;
|
|
||||||
|
|
||||||
async fn driver_service_status(mut reporter: HealthReporter) {
|
|
||||||
reporter.set_serving::<BaoServer<BaoService>>().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
let mut uport = UniquePort::default();
|
|
||||||
let port = uport.get_unused_port().expect("No ports free");
|
|
||||||
println!("{}", format!("1|1|tcp|127.0.0.1:{}|grpc", port));
|
|
||||||
|
|
||||||
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
|
|
||||||
|
|
||||||
health_reporter.set_serving::<BaoServer<BaoService>>().await;
|
|
||||||
|
|
||||||
tokio::spawn(driver_service_status(health_reporter.clone()));
|
|
||||||
|
|
||||||
let addr = format!("127.0.0.1:{}", port).parse().unwrap();
|
|
||||||
let bao_service = BaoService::default();
|
|
||||||
let server = BaoServer::new(bao_service);
|
|
||||||
Server::builder()
|
|
||||||
.add_service(health_service)
|
|
||||||
.add_service(server)
|
|
||||||
.add_service(grpc::grpc_stdio::new_server())
|
|
||||||
.serve(addr)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
|
||||||
pub struct BaoService {
|
|
||||||
requests: Arc<Mutex<HashMap<u32, Encoder<Cursor<Vec<u8>>>>>>,
|
|
||||||
counter: ConsistentCounter,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Bao for BaoService {
|
|
||||||
async fn init(&self, _request: Request<Empty>) -> Result<Response<UInt32Value>, Status> {
|
|
||||||
let next_id = self.counter.inc() as u32;
|
|
||||||
let tree = Vec::new();
|
|
||||||
let cursor = Cursor::new(tree);
|
|
||||||
let encoder = Encoder::new_outboard(cursor);
|
|
||||||
|
|
||||||
let mut req = self.requests.lock();
|
|
||||||
req.insert(next_id, encoder);
|
|
||||||
|
|
||||||
Ok(Response::new(UInt32Value { value: next_id }))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write(&self, request: Request<WriteRequest>) -> Result<Response<Empty>, Status> {
|
|
||||||
let r = request.into_inner();
|
|
||||||
let mut req = self.requests.lock();
|
|
||||||
if let Some(encoder) = req.get_mut(&r.id) {
|
|
||||||
encoder.write(&r.data)?;
|
|
||||||
} else {
|
|
||||||
return Err(Status::invalid_argument("invalid id"));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Response::new(Empty::default()))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn finalize(
|
|
||||||
&self,
|
|
||||||
request: Request<UInt32Value>,
|
|
||||||
) -> Result<Response<BytesValue>, Status> {
|
|
||||||
let r = request.into_inner();
|
|
||||||
let mut req = self.requests.lock();
|
|
||||||
match req.entry(r.value) {
|
|
||||||
Entry::Occupied(mut entry) => {
|
|
||||||
let encoder = entry.get_mut();
|
|
||||||
let ret = encoder.finalize().unwrap();
|
|
||||||
let bytes = ret.as_bytes().to_vec();
|
|
||||||
Ok(Response::new(BytesValue { value: bytes }))
|
|
||||||
}
|
|
||||||
Entry::Vacant(_) => {
|
|
||||||
Err(Status::invalid_argument("invalid id"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
async fn destroy(&self, request: Request<UInt32Value>) -> Result<Response<Empty>, Status> {
|
|
||||||
let r = request.into_inner();
|
|
||||||
let mut req = self.requests.lock();
|
|
||||||
if req.remove(&r.value).is_none() {
|
|
||||||
return Err(Status::invalid_argument("invalid id"));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Response::new(Empty::default()))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn compute_file(&self, request: Request<StringValue>) -> Result<Response<BytesValue>, Status> {
|
|
||||||
let r = request.into_inner();
|
|
||||||
let tree = Vec::new();
|
|
||||||
let cursor = Cursor::new(tree);
|
|
||||||
let mut encoder = Encoder::new_outboard(cursor);
|
|
||||||
let mut input = File::open(r.value)?;
|
|
||||||
|
|
||||||
copy_reader_to_writer(&mut input, &mut encoder)?;
|
|
||||||
|
|
||||||
let ret = encoder.finalize().unwrap();
|
|
||||||
let bytes = ret.as_bytes().to_vec();
|
|
||||||
Ok(Response::new(BytesValue { value: bytes }))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn copy_reader_to_writer(
|
|
||||||
reader: &mut impl Read,
|
|
||||||
writer: &mut impl Write,
|
|
||||||
) -> io::Result<u64> {
|
|
||||||
// At least 16 KiB is necessary to use AVX-512 with BLAKE3.
|
|
||||||
let mut buf = [0; 65536];
|
|
||||||
let mut written = 0;
|
|
||||||
loop {
|
|
||||||
let len = match reader.read(&mut buf) {
|
|
||||||
Ok(0) => return Ok(written),
|
|
||||||
Ok(len) => len,
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
};
|
|
||||||
writer.write_all(&buf[..len])?;
|
|
||||||
written += len as u64;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,45 +0,0 @@
|
||||||
use portpicker::Port;
|
|
||||||
|
|
||||||
pub struct UniquePort {
|
|
||||||
vended_ports: Vec<Port>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UniquePort {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
vended_ports: vec![],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_unused_port(&mut self) -> Option<Port> {
|
|
||||||
let mut counter = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
counter += 1;
|
|
||||||
if counter > 1000 {
|
|
||||||
// no luck in 1000 tries? Give up!
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
match portpicker::pick_unused_port() {
|
|
||||||
None => return None,
|
|
||||||
Some(p) => {
|
|
||||||
if self.vended_ports.contains(&p) {
|
|
||||||
log::trace!("Skipped port: {} because it is in the list of previously vended ports: {:?}", p, self.vended_ports);
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
log::trace!("Vending port: {}", p);
|
|
||||||
self.vended_ports.push(p);
|
|
||||||
return Some(p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for UniquePort {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -23,7 +23,7 @@ func (f *FilesController) PostUpload() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := files.Upload(file, nil)
|
upload, err := files.Upload(file, meta.Size)
|
||||||
|
|
||||||
if internalError(ctx, err) {
|
if internalError(ctx, err) {
|
||||||
return
|
return
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -30,7 +30,7 @@ require (
|
||||||
gorm.io/driver/mysql v1.5.0
|
gorm.io/driver/mysql v1.5.0
|
||||||
gorm.io/driver/sqlite v1.5.0
|
gorm.io/driver/sqlite v1.5.0
|
||||||
gorm.io/gorm v1.25.0
|
gorm.io/gorm v1.25.0
|
||||||
lukechampine.com/blake3 v1.1.7
|
lukechampine.com/blake3 v1.2.0
|
||||||
lukechampine.com/frand v1.4.2
|
lukechampine.com/frand v1.4.2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -1949,6 +1949,10 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
|
||||||
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
|
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
|
||||||
lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0=
|
lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0=
|
||||||
lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
|
lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
|
||||||
|
lukechampine.com/blake3 v1.1.8-0.20230515191509-85fa20df84c1 h1:NTVGxhcZ8DAwAZnyd4Q0L1dpdnVU3B3PkplDgNd/Jek=
|
||||||
|
lukechampine.com/blake3 v1.1.8-0.20230515191509-85fa20df84c1/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
|
||||||
|
lukechampine.com/blake3 v1.2.0 h1:xBP4eLyBtfzoNTBaeECFxxoLyPX7hD1xntcqc1xywQE=
|
||||||
|
lukechampine.com/blake3 v1.2.0/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
|
||||||
lukechampine.com/frand v1.4.2 h1:RzFIpOvkMXuPMBb9maa4ND4wjBn71E1Jpf8BzJHMaVw=
|
lukechampine.com/frand v1.4.2 h1:RzFIpOvkMXuPMBb9maa4ND4wjBn71E1Jpf8BzJHMaVw=
|
||||||
lukechampine.com/frand v1.4.2/go.mod h1:4S/TM2ZgrKejMcKMbeLjISpJMO+/eZ1zu3vYX9dtj3s=
|
lukechampine.com/frand v1.4.2/go.mod h1:4S/TM2ZgrKejMcKMbeLjISpJMO+/eZ1zu3vYX9dtj3s=
|
||||||
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
|
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package files
|
package files
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -13,8 +12,6 @@ import (
|
||||||
"git.lumeweb.com/LumeWeb/portal/shared"
|
"git.lumeweb.com/LumeWeb/portal/shared"
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
"io"
|
"io"
|
||||||
"lukechampine.com/blake3"
|
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var client *resty.Client
|
var client *resty.Client
|
||||||
|
@ -26,40 +23,18 @@ func Init() {
|
||||||
client.SetDisableWarn(true)
|
client.SetDisableWarn(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
|
func Upload(r io.ReadSeeker, size int64) (model.Upload, error) {
|
||||||
var upload model.Upload
|
var upload model.Upload
|
||||||
|
|
||||||
if r == nil && file == nil {
|
tree, hashBytes, err := bao.ComputeTree(r, size)
|
||||||
return upload, errors.New("invalid upload mode")
|
|
||||||
}
|
|
||||||
|
|
||||||
hasher := blake3.New(32, nil)
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
_, err = io.Copy(hasher, r)
|
|
||||||
} else {
|
|
||||||
_, err = io.Copy(hasher, file)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
hashBytes := hasher.Sum(nil)
|
|
||||||
|
|
||||||
hashHex := hex.EncodeToString(hashBytes[:])
|
hashHex := hex.EncodeToString(hashBytes[:])
|
||||||
|
|
||||||
if err != nil {
|
_, err = r.Seek(0, io.SeekStart)
|
||||||
return upload, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
_, err = r.Seek(0, io.SeekStart)
|
|
||||||
} else {
|
|
||||||
_, err = file.Seek(0, io.SeekStart)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return upload, err
|
return upload, err
|
||||||
|
@ -83,37 +58,11 @@ func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
|
||||||
return upload, errors.New("file already exists in network, but missing in database")
|
return upload, errors.New("file already exists in network, but missing in database")
|
||||||
}
|
}
|
||||||
|
|
||||||
var tree []byte
|
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
tree, err = bao.ComputeTreeStreaming(bufio.NewReader(r))
|
|
||||||
} else {
|
|
||||||
tree, err = bao.ComputeTreeFile(file)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if r != nil {
|
ret, err := client.R().SetBody(r).Put(fmt.Sprintf("/worker/objects/%s", hashHex))
|
||||||
_, err = r.Seek(0, io.SeekStart)
|
|
||||||
} else {
|
|
||||||
_, err = file.Seek(0, io.SeekStart)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return upload, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var body interface{}
|
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
body = r
|
|
||||||
} else {
|
|
||||||
body = file
|
|
||||||
}
|
|
||||||
|
|
||||||
ret, err := client.R().SetBody(body).Put(fmt.Sprintf("/worker/objects/%s", hashHex))
|
|
||||||
if ret.StatusCode() != 200 {
|
if ret.StatusCode() != 200 {
|
||||||
err = errors.New(string(ret.Body()))
|
err = errors.New(string(ret.Body()))
|
||||||
return upload, err
|
return upload, err
|
||||||
|
|
|
@ -14,8 +14,8 @@ import (
|
||||||
"github.com/tus/tusd/pkg/filestore"
|
"github.com/tus/tusd/pkg/filestore"
|
||||||
tusd "github.com/tus/tusd/pkg/handler"
|
tusd "github.com/tus/tusd/pkg/handler"
|
||||||
"github.com/tus/tusd/pkg/memorylocker"
|
"github.com/tus/tusd/pkg/memorylocker"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const TUS_API_PATH = "/files/tus"
|
const TUS_API_PATH = "/files/tus"
|
||||||
|
@ -136,7 +136,7 @@ func tusWorker(upload *tusd.Upload) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = files.Upload(nil, file.(*os.File))
|
_, err = files.Upload(file.(io.ReadSeeker), info.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue