Compare commits
3 Commits
435445dda5
...
35878a2427
Author | SHA1 | Date |
---|---|---|
Derrick Hammer | 35878a2427 | |
Derrick Hammer | 3005be6fec | |
Derrick Hammer | a8d2ad3393 |
File diff suppressed because it is too large
Load Diff
|
@ -5,10 +5,24 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
abao = { version = "0.2.0", features = ["group_size_256k", "tokio_io"], default-features = false }
|
abao = { version = "0.2.0", features = ["group_size_256k", "tokio_io"], default-features = false }
|
||||||
wasmedge-bindgen = "0.4.1"
|
anyhow = "1.0.71"
|
||||||
wasmedge-bindgen-macro = "0.4.1"
|
async-stream = "0.3.5"
|
||||||
|
async-trait = "0.1.68"
|
||||||
|
atomic-counter = "1.0.1"
|
||||||
|
futures = "0.3.28"
|
||||||
|
gag = "1.0.0"
|
||||||
|
hyper = "0.14.26"
|
||||||
|
log = "0.4.17"
|
||||||
|
parking_lot = "0.12.1"
|
||||||
|
portpicker = "0.1.1"
|
||||||
|
prost = "0.11.9"
|
||||||
|
serde = { version = "1.0.163", features = ["derive"] }
|
||||||
|
thiserror = "1.0.40"
|
||||||
|
tokio = { version = "1.28.1", features = ["rt", "rt-multi-thread"] }
|
||||||
|
tokio-stream = "0.1.14"
|
||||||
|
tonic = "0.9.2"
|
||||||
|
tonic-health = "0.9.2"
|
||||||
|
tower = "0.4.13"
|
||||||
|
|
||||||
[lib]
|
[build-dependencies]
|
||||||
name = "bao"
|
tonic-build = "0.9.2"
|
||||||
path = "src/lib.rs"
|
|
||||||
crate-type = ["cdylib"]
|
|
||||||
|
|
153
bao/bao.go
153
bao/bao.go
|
@ -2,46 +2,96 @@ package bao
|
||||||
|
|
||||||
import (
|
import (
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"errors"
|
"github.com/hashicorp/go-plugin"
|
||||||
"github.com/second-state/WasmEdge-go/wasmedge"
|
|
||||||
bindgen "github.com/second-state/wasmedge-bindgen/host/go"
|
|
||||||
"io"
|
"io"
|
||||||
|
"io/fs"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:embed target/wasm32-wasi/release/bao.wasm
|
//go:generate protoc --proto_path=proto/ bao.proto --go_out=proto --go_opt=paths=source_relative --go-grpc_out=proto --go-grpc_opt=paths=source_relative
|
||||||
var wasm []byte
|
|
||||||
|
|
||||||
var conf *wasmedge.Configure
|
//go:embed target/release/bao
|
||||||
|
var baoPlugin []byte
|
||||||
|
var baoInstance Bao
|
||||||
|
|
||||||
|
type Bao interface {
|
||||||
|
Init() (uint32, error)
|
||||||
|
Write(id uint32, data []byte) error
|
||||||
|
Finalize(id uint32) ([]byte, error)
|
||||||
|
Destroy(id uint32) error
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
wasmedge.SetLogErrorLevel()
|
baoExec, err := os.CreateTemp("", "lumeportal")
|
||||||
conf = wasmedge.NewConfigure(wasmedge.WASI)
|
|
||||||
|
_, err = baoExec.Write(baoPlugin)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error:", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = baoExec.Sync()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error:", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = baoExec.Chmod(fs.ModePerm)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error:", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = baoExec.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error:", err.Error())
|
||||||
|
}
|
||||||
|
pluginMap := map[string]plugin.Plugin{
|
||||||
|
"bao": &BAOPlugin{},
|
||||||
|
}
|
||||||
|
client := plugin.NewClient(&plugin.ClientConfig{
|
||||||
|
HandshakeConfig: plugin.HandshakeConfig{
|
||||||
|
ProtocolVersion: 1,
|
||||||
|
MagicCookieKey: "foo",
|
||||||
|
MagicCookieValue: "bar",
|
||||||
|
},
|
||||||
|
Plugins: pluginMap,
|
||||||
|
Cmd: exec.Command("sh", "-c", baoExec.Name()),
|
||||||
|
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Connect via RPC
|
||||||
|
rpcClient, err := client.Client()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error:", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request the plugin
|
||||||
|
raw, err := rpcClient.Dispense("bao")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error:", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
baoInstance = raw.(Bao)
|
||||||
|
|
||||||
|
signalCh := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-signalCh
|
||||||
|
err := os.Remove(baoExec.Name())
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error:", err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComputeBaoTree(reader io.Reader) ([]byte, error) {
|
func ComputeBaoTree(reader io.Reader) ([]byte, error) {
|
||||||
var vm = wasmedge.NewVMWithConfig(conf)
|
|
||||||
var wasi = vm.GetImportModule(wasmedge.WASI)
|
|
||||||
wasi.InitWasi(
|
|
||||||
os.Args[1:], // The args
|
|
||||||
os.Environ(), // The envs
|
|
||||||
[]string{".:."}, // The mapping preopens
|
|
||||||
)
|
|
||||||
err := vm.LoadWasmBuffer(wasm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = vm.Validate()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
bg := bindgen.New(vm)
|
instance, err := baoInstance.Init()
|
||||||
bg.Instantiate()
|
|
||||||
|
|
||||||
_, _, err = bg.Execute("init")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bg.Release()
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +100,7 @@ func ComputeBaoTree(reader io.Reader) ([]byte, error) {
|
||||||
n, err := reader.Read(b)
|
n, err := reader.Read(b)
|
||||||
|
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
err := write(*bg, &b)
|
err := write(instance, &b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -59,7 +109,7 @@ func ComputeBaoTree(reader io.Reader) ([]byte, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var result []byte
|
var result []byte
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
result, err = finalize(*bg)
|
result, err = finalize(instance)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
@ -69,37 +119,38 @@ func ComputeBaoTree(reader io.Reader) ([]byte, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func write(bg bindgen.Bindgen, bytes *[]byte) error {
|
func write(instance uint32, bytes *[]byte) error {
|
||||||
_, _, err := bg.Execute("write", *bytes)
|
err := baoInstance.Write(instance, *bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bg.Release()
|
derr := destroy(instance)
|
||||||
|
if derr != nil {
|
||||||
|
return derr
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
derr := destroy(instance)
|
||||||
|
if derr != nil {
|
||||||
|
return derr
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func finalize(bg bindgen.Bindgen) ([]byte, error) {
|
func finalize(instance uint32) ([]byte, error) {
|
||||||
var byteResult []byte
|
result, err := baoInstance.Finalize(instance)
|
||||||
|
|
||||||
result, _, err := bg.Execute("finalize")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bg.Release()
|
derr := destroy(instance)
|
||||||
|
if derr != nil {
|
||||||
|
return nil, derr
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate over each element in the result slice
|
return result, nil
|
||||||
for _, elem := range result {
|
|
||||||
// Type assert the element to []byte
|
|
||||||
byteSlice, ok := elem.([]byte)
|
|
||||||
if !ok {
|
|
||||||
// If the element is not a byte slice, return an error
|
|
||||||
return nil, errors.New("result element is not a byte slice")
|
|
||||||
}
|
}
|
||||||
|
func destroy(instance uint32) error {
|
||||||
// Concatenate the byte slice to the byteResult slice
|
return baoInstance.Destroy(instance)
|
||||||
byteResult = append(byteResult, byteSlice...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return byteResult, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
tonic_build::configure()
|
||||||
|
.build_server(true)
|
||||||
|
.out_dir("src/proto")
|
||||||
|
.compile_well_known_types(true)
|
||||||
|
.include_file("mod.rs")
|
||||||
|
.type_attribute(".", "#[derive(serde::Deserialize)]")
|
||||||
|
.type_attribute(".", "#[derive(serde::Serialize)]")
|
||||||
|
.compile(&[
|
||||||
|
"proto/grpc_stdio.proto",
|
||||||
|
"proto/bao.proto"
|
||||||
|
], &["bao"])
|
||||||
|
.unwrap();
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
package bao
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/bao/proto"
|
||||||
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
|
"github.com/golang/protobuf/ptypes/wrappers"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GRPCClient is an implementation of KV that talks over RPC.
|
||||||
|
type GRPCClient struct{ client proto.BaoClient }
|
||||||
|
|
||||||
|
func (g *GRPCClient) Init() (uint32, error) {
|
||||||
|
init, err := g.client.Init(context.Background(), &empty.Empty{})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return init.Value, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GRPCClient) Write(id uint32, data []byte) error {
|
||||||
|
_, err := g.client.Write(context.Background(), &proto.WriteRequest{Id: id, Data: data})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GRPCClient) Finalize(id uint32) ([]byte, error) {
|
||||||
|
tree, err := g.client.Finalize(context.Background(), &wrappers.UInt32Value{Value: id})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tree.Value, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GRPCClient) Destroy(id uint32) error {
|
||||||
|
_, err := g.client.Destroy(context.Background(), &wrappers.UInt32Value{Value: id})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
package bao
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/bao/proto"
|
||||||
|
"github.com/hashicorp/go-plugin"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BAOPlugin struct {
|
||||||
|
plugin.Plugin
|
||||||
|
Impl Bao
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *BAOPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BAOPlugin) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
|
||||||
|
return &GRPCClient{client: proto.NewBaoClient(c)}, nil
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
import "google/protobuf/empty.proto";
|
||||||
|
import "google/protobuf/wrappers.proto";
|
||||||
|
|
||||||
|
option go_package = "git.lumeweb.com/LumeWeb/portal/bao/proto";
|
||||||
|
|
||||||
|
package bao;
|
||||||
|
|
||||||
|
message WriteRequest {
|
||||||
|
uint32 id = 1;
|
||||||
|
bytes data = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
service bao {
|
||||||
|
rpc Init (google.protobuf.Empty) returns (google.protobuf.UInt32Value);
|
||||||
|
rpc Write(WriteRequest) returns (google.protobuf.Empty);
|
||||||
|
rpc Finalize (google.protobuf.UInt32Value) returns (google.protobuf.BytesValue);
|
||||||
|
rpc Destroy (google.protobuf.UInt32Value) returns (google.protobuf.Empty);
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
syntax = "proto3";
|
||||||
|
package plugin;
|
||||||
|
option go_package = "plugin";
|
||||||
|
|
||||||
|
import "google/protobuf/empty.proto";
|
||||||
|
|
||||||
|
// GRPCStdio is a service that is automatically run by the plugin process
|
||||||
|
// to stream any stdout/err data so that it can be mirrored on the plugin
|
||||||
|
// host side.
|
||||||
|
service GRPCStdio {
|
||||||
|
// StreamStdio returns a stream that contains all the stdout/stderr.
|
||||||
|
// This RPC endpoint must only be called ONCE. Once stdio data is consumed
|
||||||
|
// it is not sent again.
|
||||||
|
//
|
||||||
|
// Callers should connect early to prevent blocking on the plugin process.
|
||||||
|
rpc StreamStdio(google.protobuf.Empty) returns (stream StdioData);
|
||||||
|
}
|
||||||
|
|
||||||
|
// StdioData is a single chunk of stdout or stderr data that is streamed
|
||||||
|
// from GRPCStdio.
|
||||||
|
message StdioData {
|
||||||
|
enum Channel {
|
||||||
|
INVALID = 0;
|
||||||
|
STDOUT = 1;
|
||||||
|
STDERR = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
Channel channel = 1;
|
||||||
|
bytes data = 2;
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
use hyper::http::uri::InvalidUri;
|
||||||
|
use thiserror::Error as ThisError;
|
||||||
|
use tokio::sync::mpsc::error::SendError;
|
||||||
|
use tonic::transport::Error as TonicError;
|
||||||
|
use std::fmt::{Debug};
|
||||||
|
|
||||||
|
pub fn into_status(err: Error) -> tonic::Status {
|
||||||
|
tonic::Status::unknown(format!("{}", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, ThisError)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("Error with IO: {0}")]
|
||||||
|
Io(#[from] std::io::Error),
|
||||||
|
#[error("Error with tonic (gRPC) transport: {0}")]
|
||||||
|
TonicTransport(#[from] TonicError),
|
||||||
|
#[error("Error parsing string into a network address: {0}")]
|
||||||
|
AddrParser(#[from] std::net::AddrParseError),
|
||||||
|
#[error("Error sending on a mpsc channel: {0}")]
|
||||||
|
Send(String),
|
||||||
|
#[error("Invalid Uri: {0}")]
|
||||||
|
InvalidUri(#[from] InvalidUri),
|
||||||
|
#[error(transparent)]
|
||||||
|
Other(#[from] anyhow::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> From<SendError<T>> for Error {
|
||||||
|
fn from(_err: SendError<T>) -> Self {
|
||||||
|
Self::Send(format!(
|
||||||
|
"unable to send {} on a mpsc channel",
|
||||||
|
std::any::type_name::<T>()
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
// Copied from: https://github.com/hashicorp/go-plugin/blob/master/grpc_controller.go
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use async_stream::stream;
|
||||||
|
use futures::stream::Stream;
|
||||||
|
use gag::BufferRedirect;
|
||||||
|
use std::io::Read;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use tokio::time::{sleep, Duration};
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tonic::{async_trait, Request, Response, Status};
|
||||||
|
use crate::proto::google::protobuf::Empty;
|
||||||
|
use crate::proto::plugin::grpc_stdio_server::{GrpcStdio, GrpcStdioServer};
|
||||||
|
use crate::proto::plugin::stdio_data::Channel;
|
||||||
|
use crate::proto::plugin::{StdioData};
|
||||||
|
use crate::grpc::error::into_status;
|
||||||
|
|
||||||
|
const CONSOLE_POLL_SLEEP_MILLIS: u64 = 500;
|
||||||
|
|
||||||
|
pub fn new_server() -> GrpcStdioServer<GrpcStdioImpl> {
|
||||||
|
GrpcStdioServer::new(GrpcStdioImpl {})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct GrpcStdioImpl {}
|
||||||
|
|
||||||
|
impl GrpcStdioImpl {
|
||||||
|
fn new_combined_stream() -> Result<<Self as GrpcStdio>::StreamStdioStream, Status> {
|
||||||
|
log::trace!("new_inner_stream called. Asked for a stream of stdout and stderr");
|
||||||
|
log::info!("Gagging stdout and stderr to a buffer for redirection to plugin's host.",);
|
||||||
|
|
||||||
|
let stdoutbuf = BufferRedirect::stdout()
|
||||||
|
.context("Failed to create a BufferRedirec from stdout")
|
||||||
|
.map_err(|e| e.into())
|
||||||
|
.map_err(into_status)?;
|
||||||
|
let stdout_stream = GrpcStdioImpl::new_stream("stdout", Channel::Stdout as i32, stdoutbuf);
|
||||||
|
|
||||||
|
let stderrbuf = BufferRedirect::stderr()
|
||||||
|
.context("Failed to create a BufferRedirec from stderr")
|
||||||
|
.map_err(|e| e.into())
|
||||||
|
.map_err(into_status)?;
|
||||||
|
let stderr_stream = GrpcStdioImpl::new_stream("stderr", Channel::Stderr as i32, stderrbuf);
|
||||||
|
|
||||||
|
let merged_stream = stdout_stream.merge(stderr_stream);
|
||||||
|
|
||||||
|
Ok(Box::pin(merged_stream))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_stream(
|
||||||
|
stream_name: &'static str,
|
||||||
|
channel: i32,
|
||||||
|
mut redirected_buf: BufferRedirect,
|
||||||
|
) -> impl Stream<Item = Result<StdioData, Status>> {
|
||||||
|
stream! {
|
||||||
|
loop {
|
||||||
|
log::trace!("beginning next iteration of {} reading and streaming...", stream_name);
|
||||||
|
let mut readbuf = String::new();
|
||||||
|
match redirected_buf.read_to_string(&mut readbuf) {
|
||||||
|
Ok(len) => match len{
|
||||||
|
0 => {
|
||||||
|
log::trace!("{} had zero bytes. Sleeping to avoid polling...", stream_name);
|
||||||
|
sleep(Duration::from_millis(CONSOLE_POLL_SLEEP_MILLIS)).await;
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
log::trace!("Sending {} {} bytes of data: {}", stream_name, len, readbuf);
|
||||||
|
yield Ok(StdioData{
|
||||||
|
channel,
|
||||||
|
data: readbuf.into_bytes(),
|
||||||
|
});
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Error reading {} data: {:?}", stream_name, e);
|
||||||
|
yield Err(Status::unknown(format!("Error reading from Stderr of plugin's process: {:?}", e)));
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl GrpcStdio for GrpcStdioImpl {
|
||||||
|
type StreamStdioStream =
|
||||||
|
Pin<Box<dyn Stream<Item = Result<StdioData, Status>> + Send + 'static>>;
|
||||||
|
|
||||||
|
async fn stream_stdio(
|
||||||
|
&self,
|
||||||
|
_req: Request<Empty>,
|
||||||
|
) -> Result<Response<Self::StreamStdioStream>, Status> {
|
||||||
|
log::trace!("stream_stdio called.");
|
||||||
|
|
||||||
|
let s = GrpcStdioImpl::new_combined_stream()?;
|
||||||
|
|
||||||
|
log::trace!("stream_stdio responding with a stream of StdioData.",);
|
||||||
|
|
||||||
|
Ok(Response::new(s))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,2 @@
|
||||||
|
pub mod grpc_stdio;
|
||||||
|
pub mod error;
|
|
@ -1,32 +0,0 @@
|
||||||
use abao::encode::Encoder;
|
|
||||||
use std::io::{Cursor, Write};
|
|
||||||
#[allow(unused_imports)]
|
|
||||||
use wasmedge_bindgen::*;
|
|
||||||
use wasmedge_bindgen_macro::*;
|
|
||||||
|
|
||||||
static mut TREE: Option<Vec<u8>> = None;
|
|
||||||
static mut CURSOR: Option<Cursor<Vec<u8>>> = None;
|
|
||||||
static mut ENCODER: Option<Encoder<Cursor<Vec<u8>>>> = None;
|
|
||||||
|
|
||||||
#[wasmedge_bindgen]
|
|
||||||
pub unsafe fn init() {
|
|
||||||
TREE = Option::Some(Vec::new());
|
|
||||||
CURSOR = Option::Some(Cursor::new(TREE.take().unwrap()));
|
|
||||||
ENCODER = Option::Some(Encoder::new_outboard(CURSOR.take().unwrap()));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[wasmedge_bindgen]
|
|
||||||
pub unsafe fn write(v: Vec<u8>) -> Result<u64, String> {
|
|
||||||
let encoder = ENCODER.take().unwrap();
|
|
||||||
let bytes_written = encoder.to_owned().write(&v).map_err(|e| e.to_string())?;
|
|
||||||
ENCODER = Some(encoder); // Restore the value
|
|
||||||
Ok(bytes_written as u64)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[wasmedge_bindgen]
|
|
||||||
pub unsafe fn finalize() -> Vec<u8> {
|
|
||||||
let mut encoder = ENCODER.take().unwrap();
|
|
||||||
let bytes = encoder.finalize().unwrap().as_bytes().to_vec();
|
|
||||||
ENCODER = Some(encoder); // Restore the value
|
|
||||||
bytes
|
|
||||||
}
|
|
|
@ -0,0 +1,116 @@
|
||||||
|
#![feature(async_fn_in_trait)]
|
||||||
|
#![allow(incomplete_features)]
|
||||||
|
|
||||||
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::io::{Cursor, Write};
|
||||||
|
use std::sync::{Arc};
|
||||||
|
|
||||||
|
use abao::encode::Encoder;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use atomic_counter::{AtomicCounter, ConsistentCounter};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use tonic::{Request, Response, Status};
|
||||||
|
use tonic::transport::Server;
|
||||||
|
use tonic_health::server::HealthReporter;
|
||||||
|
|
||||||
|
use crate::proto::bao::bao_server::{Bao, BaoServer};
|
||||||
|
use crate::proto::bao::WriteRequest;
|
||||||
|
use crate::proto::google::protobuf::{BytesValue, Empty, UInt32Value};
|
||||||
|
use crate::unique_port::UniquePort;
|
||||||
|
|
||||||
|
mod proto;
|
||||||
|
mod unique_port;
|
||||||
|
mod grpc;
|
||||||
|
|
||||||
|
async fn driver_service_status(mut reporter: HealthReporter) {
|
||||||
|
reporter.set_serving::<BaoServer<BaoService>>().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let mut uport = UniquePort::default();
|
||||||
|
let port = uport.get_unused_port().expect("No ports free");
|
||||||
|
println!("{}", format!("1|1|tcp|127.0.0.1:{}|grpc", port));
|
||||||
|
|
||||||
|
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
|
||||||
|
|
||||||
|
health_reporter.set_serving::<BaoServer<BaoService>>().await;
|
||||||
|
|
||||||
|
tokio::spawn(driver_service_status(health_reporter.clone()));
|
||||||
|
|
||||||
|
let addr = format!("127.0.0.1:{}", port).parse().unwrap();
|
||||||
|
let bao_service = BaoService::default();
|
||||||
|
let server = BaoServer::new(bao_service);
|
||||||
|
Server::builder()
|
||||||
|
.add_service(health_service)
|
||||||
|
.add_service(server)
|
||||||
|
.add_service(grpc::grpc_stdio::new_server())
|
||||||
|
.serve(addr)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct BaoService {
|
||||||
|
requests: Arc<Mutex<HashMap<u32, Encoder<Cursor<Vec<u8>>>>>>,
|
||||||
|
counter: ConsistentCounter,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Bao for BaoService {
|
||||||
|
async fn init(&self, _request: Request<Empty>) -> Result<Response<UInt32Value>, Status> {
|
||||||
|
let next_id = self.counter.inc() as u32;
|
||||||
|
let tree = Vec::new();
|
||||||
|
let cursor = Cursor::new(tree);
|
||||||
|
let encoder = Encoder::new(cursor);
|
||||||
|
|
||||||
|
let mut req = self.requests.lock();
|
||||||
|
req.insert(next_id, encoder);
|
||||||
|
|
||||||
|
Ok(Response::new(UInt32Value { value: next_id }))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write(&self, request: Request<WriteRequest>) -> Result<Response<Empty>, Status> {
|
||||||
|
let r = request.into_inner();
|
||||||
|
let mut req = self.requests.lock();
|
||||||
|
if let Some(encoder) = req.get_mut(&r.id) {
|
||||||
|
encoder.write(&r.data)?;
|
||||||
|
} else {
|
||||||
|
return Err(Status::invalid_argument("invalid id"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Response::new(Empty::default()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn finalize(
|
||||||
|
&self,
|
||||||
|
request: Request<UInt32Value>,
|
||||||
|
) -> Result<Response<BytesValue>, Status> {
|
||||||
|
let r = request.into_inner();
|
||||||
|
let mut req = self.requests.lock();
|
||||||
|
match req.entry(r.value) {
|
||||||
|
Entry::Occupied(mut entry) => {
|
||||||
|
let encoder = entry.get_mut();
|
||||||
|
let ret = encoder.finalize().unwrap();
|
||||||
|
let bytes = ret.as_bytes().to_vec();
|
||||||
|
Ok(Response::new(BytesValue { value: bytes }))
|
||||||
|
}
|
||||||
|
Entry::Vacant(_) => {
|
||||||
|
Err(Status::invalid_argument("invalid id"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async fn destroy(&self, request: Request<UInt32Value>) -> Result<Response<Empty>, Status> {
|
||||||
|
let r = request.into_inner();
|
||||||
|
let mut req = self.requests.lock();
|
||||||
|
if req.remove(&r.value).is_none() {
|
||||||
|
return Err(Status::invalid_argument("invalid id"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Response::new(Empty::default()))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
use portpicker::Port;
|
||||||
|
|
||||||
|
pub struct UniquePort {
|
||||||
|
vended_ports: Vec<Port>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UniquePort {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
vended_ports: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_unused_port(&mut self) -> Option<Port> {
|
||||||
|
let mut counter = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
counter += 1;
|
||||||
|
if counter > 1000 {
|
||||||
|
// no luck in 1000 tries? Give up!
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
match portpicker::pick_unused_port() {
|
||||||
|
None => return None,
|
||||||
|
Some(p) => {
|
||||||
|
if self.vended_ports.contains(&p) {
|
||||||
|
log::trace!("Skipped port: {} because it is in the list of previously vended ports: {:?}", p, self.vended_ports);
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
log::trace!("Vending port: {}", p);
|
||||||
|
self.vended_ports.push(p);
|
||||||
|
return Some(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for UniquePort {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
2
db/db.go
2
db/db.go
|
@ -53,7 +53,7 @@ func Init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Automatically migrate the database schema based on the model definitions.
|
// Automatically migrate the database schema based on the model definitions.
|
||||||
err = db.Migrator().AutoMigrate(&model.Account{}, &model.Key{}, &model.KeyChallenge{}, &model.LoginSession{}, &model.Upload{}, &model.Pin{})
|
err = db.Migrator().AutoMigrate(&model.Account{}, &model.Key{}, &model.KeyChallenge{}, &model.LoginSession{}, &model.Upload{}, &model.Pin{}, &model.Tus{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Database setup failed database type: %s \n", err))
|
panic(fmt.Errorf("Database setup failed database type: %s \n", err))
|
||||||
}
|
}
|
||||||
|
|
18
go.mod
18
go.mod
|
@ -5,16 +5,18 @@ go 1.18
|
||||||
require (
|
require (
|
||||||
github.com/go-playground/validator/v10 v10.13.0
|
github.com/go-playground/validator/v10 v10.13.0
|
||||||
github.com/go-resty/resty/v2 v2.7.0
|
github.com/go-resty/resty/v2 v2.7.0
|
||||||
|
github.com/golang-queue/queue v0.1.3
|
||||||
|
github.com/golang/protobuf v1.5.3
|
||||||
|
github.com/hashicorp/go-plugin v1.4.9
|
||||||
github.com/iris-contrib/swagger v0.0.0-20230311205341-32127a753a68
|
github.com/iris-contrib/swagger v0.0.0-20230311205341-32127a753a68
|
||||||
github.com/joomcode/errorx v1.1.0
|
github.com/joomcode/errorx v1.1.0
|
||||||
github.com/kataras/iris/v12 v12.2.0
|
github.com/kataras/iris/v12 v12.2.0
|
||||||
github.com/kataras/jwt v0.1.8
|
github.com/kataras/jwt v0.1.8
|
||||||
github.com/multiformats/go-multibase v0.2.0
|
github.com/multiformats/go-multibase v0.2.0
|
||||||
github.com/second-state/WasmEdge-go v0.12.0
|
|
||||||
github.com/second-state/wasmedge-bindgen v0.4.1
|
|
||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
github.com/spf13/viper v1.15.0
|
github.com/spf13/viper v1.15.0
|
||||||
github.com/swaggo/swag v1.16.1
|
github.com/swaggo/swag v1.16.1
|
||||||
|
github.com/tus/tusd v1.11.0
|
||||||
gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe
|
gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe
|
||||||
go.sia.tech/core v0.1.12-0.20230503202148-581dd00ac1d2
|
go.sia.tech/core v0.1.12-0.20230503202148-581dd00ac1d2
|
||||||
go.sia.tech/jape v0.9.0
|
go.sia.tech/jape v0.9.0
|
||||||
|
@ -24,6 +26,7 @@ require (
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
golang.org/x/crypto v0.8.0
|
golang.org/x/crypto v0.8.0
|
||||||
golang.org/x/term v0.8.0
|
golang.org/x/term v0.8.0
|
||||||
|
google.golang.org/protobuf v1.30.0
|
||||||
gorm.io/driver/mysql v1.5.0
|
gorm.io/driver/mysql v1.5.0
|
||||||
gorm.io/driver/sqlite v1.5.0
|
gorm.io/driver/sqlite v1.5.0
|
||||||
gorm.io/gorm v1.25.0
|
gorm.io/gorm v1.25.0
|
||||||
|
@ -42,9 +45,11 @@ require (
|
||||||
github.com/andybalholm/brotli v1.0.5 // indirect
|
github.com/andybalholm/brotli v1.0.5 // indirect
|
||||||
github.com/aymerick/douceur v0.2.0 // indirect
|
github.com/aymerick/douceur v0.2.0 // indirect
|
||||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||||
|
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40 // indirect
|
||||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||||
github.com/dchest/threefish v0.0.0-20120919164726-3ecf4c494abf // indirect
|
github.com/dchest/threefish v0.0.0-20120919164726-3ecf4c494abf // indirect
|
||||||
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect
|
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect
|
||||||
|
github.com/fatih/color v1.13.0 // indirect
|
||||||
github.com/fatih/structs v1.1.0 // indirect
|
github.com/fatih/structs v1.1.0 // indirect
|
||||||
github.com/felixge/httpsnoop v1.0.3 // indirect
|
github.com/felixge/httpsnoop v1.0.3 // indirect
|
||||||
github.com/flosch/pongo2/v4 v4.0.2 // indirect
|
github.com/flosch/pongo2/v4 v4.0.2 // indirect
|
||||||
|
@ -61,14 +66,16 @@ require (
|
||||||
github.com/gobwas/httphead v0.1.0 // indirect
|
github.com/gobwas/httphead v0.1.0 // indirect
|
||||||
github.com/gobwas/pool v0.2.1 // indirect
|
github.com/gobwas/pool v0.2.1 // indirect
|
||||||
github.com/gobwas/ws v1.2.0 // indirect
|
github.com/gobwas/ws v1.2.0 // indirect
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
github.com/goccy/go-json v0.9.11 // indirect
|
||||||
github.com/golang/snappy v0.0.4 // indirect
|
github.com/golang/snappy v0.0.4 // indirect
|
||||||
github.com/google/uuid v1.3.0 // indirect
|
github.com/google/uuid v1.3.0 // indirect
|
||||||
github.com/gorilla/css v1.0.0 // indirect
|
github.com/gorilla/css v1.0.0 // indirect
|
||||||
github.com/gorilla/websocket v1.5.0 // indirect
|
github.com/gorilla/websocket v1.5.0 // indirect
|
||||||
github.com/gotd/contrib v0.17.0 // indirect
|
github.com/gotd/contrib v0.17.0 // indirect
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 // indirect
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 // indirect
|
||||||
|
github.com/hashicorp/go-hclog v1.2.0 // indirect
|
||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||||
|
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect
|
||||||
github.com/iris-contrib/go.uuid v2.0.0+incompatible // indirect
|
github.com/iris-contrib/go.uuid v2.0.0+incompatible // indirect
|
||||||
github.com/iris-contrib/schema v0.0.6 // indirect
|
github.com/iris-contrib/schema v0.0.6 // indirect
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
|
@ -88,9 +95,12 @@ require (
|
||||||
github.com/magiconair/properties v1.8.7 // indirect
|
github.com/magiconair/properties v1.8.7 // indirect
|
||||||
github.com/mailgun/raymond/v2 v2.0.48 // indirect
|
github.com/mailgun/raymond/v2 v2.0.48 // indirect
|
||||||
github.com/mailru/easyjson v0.7.7 // indirect
|
github.com/mailru/easyjson v0.7.7 // indirect
|
||||||
|
github.com/mattn/go-colorable v0.1.12 // indirect
|
||||||
|
github.com/mattn/go-isatty v0.0.17 // indirect
|
||||||
github.com/mattn/go-sqlite3 v1.14.16 // indirect
|
github.com/mattn/go-sqlite3 v1.14.16 // indirect
|
||||||
github.com/mediocregopher/radix/v3 v3.8.1 // indirect
|
github.com/mediocregopher/radix/v3 v3.8.1 // indirect
|
||||||
github.com/microcosm-cc/bluemonday v1.0.23 // indirect
|
github.com/microcosm-cc/bluemonday v1.0.23 // indirect
|
||||||
|
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 // indirect
|
||||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||||
github.com/mr-tron/base58 v1.2.0 // indirect
|
github.com/mr-tron/base58 v1.2.0 // indirect
|
||||||
github.com/multiformats/go-base32 v0.1.0 // indirect
|
github.com/multiformats/go-base32 v0.1.0 // indirect
|
||||||
|
@ -98,6 +108,7 @@ require (
|
||||||
github.com/nats-io/nats.go v1.25.0 // indirect
|
github.com/nats-io/nats.go v1.25.0 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.4 // indirect
|
github.com/nats-io/nkeys v0.4.4 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
github.com/oklog/run v1.0.0 // indirect
|
||||||
github.com/pelletier/go-toml/v2 v2.0.7 // indirect
|
github.com/pelletier/go-toml/v2 v2.0.7 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||||
|
@ -147,7 +158,6 @@ require (
|
||||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||||
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
||||||
google.golang.org/grpc v1.55.0 // indirect
|
google.golang.org/grpc v1.55.0 // indirect
|
||||||
google.golang.org/protobuf v1.30.0 // indirect
|
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|
3
main.go
3
main.go
|
@ -78,7 +78,8 @@ func main() {
|
||||||
|
|
||||||
tus := initTus()
|
tus := initTus()
|
||||||
|
|
||||||
app.Any(API_PATH+"{fileparam:path}", iris.FromStd(http.StripPrefix(API_PATH, tus)))
|
v1.Any(TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH+"/", tus)))
|
||||||
|
v1.Post(TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH, tus)))
|
||||||
|
|
||||||
swaggerConfig := swagger.Config{
|
swaggerConfig := swagger.Config{
|
||||||
// The url pointing to API definition.
|
// The url pointing to API definition.
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Tus struct {
|
||||||
|
gorm.Model
|
||||||
|
UploadID string `gorm:"primaryKey"`
|
||||||
|
Path string
|
||||||
|
Hash string
|
||||||
|
}
|
|
@ -0,0 +1,165 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/cid"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/service/files"
|
||||||
|
"github.com/golang-queue/queue"
|
||||||
|
"github.com/tus/tusd/pkg/filestore"
|
||||||
|
tusd "github.com/tus/tusd/pkg/handler"
|
||||||
|
"github.com/tus/tusd/pkg/memorylocker"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
const TUS_API_PATH = "/files/tus"
|
||||||
|
|
||||||
|
const HASH_META_HEADER = "blake3-hash"
|
||||||
|
|
||||||
|
var tusQueue *queue.Queue
|
||||||
|
var store *filestore.FileStore
|
||||||
|
var composer *tusd.StoreComposer
|
||||||
|
|
||||||
|
func initTus() *tusd.Handler {
|
||||||
|
store = &filestore.FileStore{
|
||||||
|
Path: "/tmp",
|
||||||
|
}
|
||||||
|
|
||||||
|
composer := tusd.NewStoreComposer()
|
||||||
|
composer.UseCore(store)
|
||||||
|
composer.UseConcater(store)
|
||||||
|
composer.UseLocker(memorylocker.New())
|
||||||
|
composer.UseTerminater(store)
|
||||||
|
|
||||||
|
handler, err := tusd.NewHandler(tusd.Config{
|
||||||
|
BasePath: "/api/v1" + TUS_API_PATH,
|
||||||
|
StoreComposer: composer,
|
||||||
|
PreUploadCreateCallback: func(hook tusd.HookEvent) error {
|
||||||
|
hash := hook.Upload.MetaData[HASH_META_HEADER]
|
||||||
|
|
||||||
|
if len(hash) == 0 {
|
||||||
|
return errors.New("missing blake3-hash metadata")
|
||||||
|
}
|
||||||
|
|
||||||
|
var upload model.Upload
|
||||||
|
result := db.Get().Where(&model.Upload{Hash: hash}).First(&upload)
|
||||||
|
if (result.Error != nil && result.Error.Error() != "record not found") || result.RowsAffected > 0 {
|
||||||
|
hashBytes, err := hex.DecodeString(hash)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cidString, err := cid.Encode(hashBytes, uint64(hook.Upload.Size))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := json.Marshal(UploadResponse{Cid: cidString})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tusd.NewHTTPError(errors.New(string(resp)), 304)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
PreFinishResponseCallback: func(hook tusd.HookEvent) error {
|
||||||
|
tusEntry := &model.Tus{
|
||||||
|
Path: hook.Upload.Storage["Path"],
|
||||||
|
Hash: hook.Upload.MetaData[HASH_META_HEADER],
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.Get().Create(tusEntry).Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
||||||
|
upload, err := store.GetUpload(nil, hook.Upload.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tusWorker(&upload)
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
tusQueue = queue.NewPool(5)
|
||||||
|
|
||||||
|
go tusStartup()
|
||||||
|
|
||||||
|
return handler
|
||||||
|
}
|
||||||
|
|
||||||
|
func tusStartup() {
|
||||||
|
result := map[int]model.Tus{}
|
||||||
|
db.Get().Table("tus").Take(&result)
|
||||||
|
|
||||||
|
for _, item := range result {
|
||||||
|
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
||||||
|
upload, err := store.GetUpload(nil, item.UploadID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tusWorker(&upload)
|
||||||
|
}); err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func tusWorker(upload *tusd.Upload) error {
|
||||||
|
info, err := (*upload).GetInfo(nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
file, err := (*upload).GetReader(nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = files.Upload(file.(io.ReadSeeker))
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := info.MetaData[HASH_META_HEADER]
|
||||||
|
|
||||||
|
var tusUpload model.Tus
|
||||||
|
ret := db.Get().Where(&model.Tus{Hash: hash}).First(&tusUpload)
|
||||||
|
if ret.Error != nil && ret.Error.Error() != "record not found" {
|
||||||
|
log.Print(ret.Error)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = db.Get().Delete(&tusUpload)
|
||||||
|
|
||||||
|
err = composer.Terminater.AsTerminatableUpload(*upload).Terminate(context.Background())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type UploadResponse struct {
|
||||||
|
Cid string `json:"cid"`
|
||||||
|
}
|
Loading…
Reference in New Issue