tusd/pkg/azurestore/azurestore.go

237 lines
5.4 KiB
Go
Raw Normal View History

package azurestore
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"strings"
"github.com/tus/tusd/internal/uid"
"github.com/tus/tusd/pkg/handler"
)
type AzureStore struct {
Service AzService
ObjectPrefix string
Container string
}
type AzUpload struct {
ID string
InfoBlob AzBlob
BlockBlob AzBlob
InfoHandler *handler.FileInfo
}
func New(service AzService) *AzureStore {
return &AzureStore{
Service: service,
}
}
// UseIn sets this store as the core data store in the passed composer and adds
// all possible extension to it.
func (store AzureStore) UseIn(composer *handler.StoreComposer) {
composer.UseCore(store)
composer.UseTerminater(store)
composer.UseLengthDeferrer(store)
}
func (store AzureStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
if info.ID == "" {
info.ID = uid.Uid()
}
if info.Size > MaxBlockBlobSize {
return nil, fmt.Errorf("azurestore: max upload of %v bytes exceeded MaxBlockBlobSize of %v bytes",
info.Size, MaxBlockBlobSize)
}
blockBlob, err := store.Service.NewBlob(ctx, store.keyWithPrefix(info.ID))
if err != nil {
return nil, err
}
infoFile := store.keyWithPrefix(store.infoPath(info.ID))
infoBlob, err := store.Service.NewBlob(ctx, infoFile)
if err != nil {
return nil, err
}
info.Storage = map[string]string{
"Type": "azurestore",
"Container": store.Container,
"Key": store.keyWithPrefix(info.ID),
}
azUpload := &AzUpload{
ID: info.ID,
InfoHandler: &info,
InfoBlob: infoBlob,
BlockBlob: blockBlob,
}
err = azUpload.writeInfo(ctx)
if err != nil {
return nil, fmt.Errorf("azurestore: unable to create InfoHandler file:\n%s", err)
}
return azUpload, nil
}
func (store AzureStore) GetUpload(ctx context.Context, id string) (handler.Upload, error) {
info := handler.FileInfo{}
infoFile := store.keyWithPrefix(store.infoPath(id))
infoBlob, err := store.Service.NewBlob(ctx, infoFile)
if err != nil {
return nil, err
}
// Download the info file from Azure Storage
data, err := infoBlob.Download(ctx)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &info); err != nil {
return nil, err
}
if info.Size > MaxBlockBlobSize {
return nil, fmt.Errorf("azurestore: max upload of %v bytes exceeded MaxBlockBlobSize of %v bytes",
info.Size, MaxBlockBlobSize)
}
blockBlob, err := store.Service.NewBlob(ctx, store.keyWithPrefix(info.ID))
if err != nil {
return nil, err
}
offset, err := blockBlob.GetOffset(ctx)
if err != nil {
// Unpack the error and see if it is a handler.ErrNotFound by comparing the
// error code. If it matches, we ignore the error, otherwise we return the error.
if handlerErr, ok := err.(handler.Error); !ok || handlerErr.ErrorCode != handler.ErrNotFound.ErrorCode {
return nil, err
}
}
info.Offset = offset
return &AzUpload{
ID: id,
InfoHandler: &info,
InfoBlob: infoBlob,
BlockBlob: blockBlob,
}, nil
}
func (store AzureStore) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
return upload.(*AzUpload)
}
func (store AzureStore) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload {
return upload.(*AzUpload)
}
func (upload *AzUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
r := bufio.NewReader(src)
buf := new(bytes.Buffer)
n, err := r.WriteTo(buf)
if err != nil {
return 0, err
}
chunkSize := int64(binary.Size(buf.Bytes()))
if chunkSize > MaxBlockBlobChunkSize {
return 0, fmt.Errorf("azurestore: Chunk of size %v too large. Max chunk size is %v", chunkSize, MaxBlockBlobChunkSize)
}
re := bytes.NewReader(buf.Bytes())
err = upload.BlockBlob.Upload(ctx, re)
if err != nil {
return 0, err
}
upload.InfoHandler.Offset += n
return n, nil
}
func (upload *AzUpload) GetInfo(ctx context.Context) (handler.FileInfo, error) {
info := handler.FileInfo{}
if upload.InfoHandler != nil {
return *upload.InfoHandler, nil
}
data, err := upload.InfoBlob.Download(ctx)
if err != nil {
return info, err
}
if err := json.Unmarshal(data, &info); err != nil {
return info, err
}
upload.InfoHandler = &info
return info, nil
}
// Get the uploaded file from the Azure storage
func (upload *AzUpload) GetReader(ctx context.Context) (io.Reader, error) {
b, err := upload.BlockBlob.Download(ctx)
if err != nil {
return nil, err
}
return bytes.NewReader(b), nil
}
// Finish the file upload and commit the block list
func (upload *AzUpload) FinishUpload(ctx context.Context) error {
return upload.BlockBlob.Commit(ctx)
}
func (upload *AzUpload) Terminate(ctx context.Context) error {
// Delete info file
err := upload.InfoBlob.Delete(ctx)
if err != nil {
return err
}
// Delete file
return upload.BlockBlob.Delete(ctx)
}
func (upload *AzUpload) DeclareLength(ctx context.Context, length int64) error {
upload.InfoHandler.Size = length
upload.InfoHandler.SizeIsDeferred = false
return upload.writeInfo(ctx)
}
func (store AzureStore) infoPath(id string) string {
return id + InfoBlobSuffix
}
func (upload *AzUpload) writeInfo(ctx context.Context) error {
data, err := json.Marshal(upload.InfoHandler)
if err != nil {
return err
}
reader := bytes.NewReader(data)
return upload.InfoBlob.Upload(ctx, reader)
}
func (store *AzureStore) keyWithPrefix(key string) string {
prefix := store.ObjectPrefix
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
return prefix + key
}