Refactor data store and implement HEAD Offset

This commit is contained in:
Felix Geisendörfer 2013-05-08 13:52:09 +02:00
parent 9deaf0fa5a
commit 575d0000e4
3 changed files with 134 additions and 95 deletions

View File

@ -9,127 +9,140 @@ import (
"os" "os"
"path" "path"
"sort" "sort"
"strings" "sync"
"time" "time"
) )
const defaultFilePerm = 0666
type DataStore struct { type DataStore struct {
dir string dir string
maxSize int64 maxSize int64
// infoLocksLock locks the infosLocks map
infoLocksLock *sync.Mutex
// infoLocks locks the .info files
infoLocks map[string]*sync.RWMutex
} }
func newDataStore(dir string, maxSize int64) *DataStore { func newDataStore(dir string, maxSize int64) *DataStore {
store := &DataStore{dir: dir, maxSize: maxSize} store := &DataStore{
dir: dir,
maxSize: maxSize,
infoLocksLock: &sync.Mutex{},
infoLocks: make(map[string]*sync.RWMutex),
}
go store.gcLoop() go store.gcLoop()
return store return store
} }
func (s *DataStore) CreateFile(id string, size int64, meta map[string]string) error { // infoLock returns the lock for the .info file of the given file id.
file, err := os.OpenFile(s.filePath(id), os.O_CREATE|os.O_WRONLY, 0666) func (s *DataStore) infoLock(id string) *sync.RWMutex {
if err != nil { s.infoLocksLock.Lock()
return err defer s.infoLocksLock.Unlock()
}
defer file.Close()
// @TODO Refactor DataStore to support meta argument properly. Needs to be lock := s.infoLocks[id]
// defined in tus protocol first. if lock == nil {
entry := logEntry{Meta: &metaEntry{ lock = &sync.RWMutex{}
Size: size, s.infoLocks[id] = lock
}} }
return s.appendFileLog(id, entry) return lock
} }
func (s *DataStore) WriteFileChunk(id string, start int64, src io.Reader) error { func (s *DataStore) CreateFile(id string, finalLength int64, meta map[string]string) error {
file, err := os.OpenFile(s.filePath(id), os.O_WRONLY, 0666) file, err := os.OpenFile(s.filePath(id), os.O_CREATE|os.O_WRONLY, defaultFilePerm)
if err != nil { if err != nil {
return err return err
} }
defer file.Close() defer file.Close()
if n, err := file.Seek(start, os.SEEK_SET); err != nil { s.infoLock(id).Lock()
defer s.infoLock(id).Unlock()
return s.writeInfo(id, FileInfo{FinalLength: finalLength, Meta: meta})
}
func (s *DataStore) WriteFileChunk(id string, offset int64, src io.Reader) error {
file, err := os.OpenFile(s.filePath(id), os.O_WRONLY, defaultFilePerm)
if err != nil {
return err return err
} else if n != start { }
defer file.Close()
if n, err := file.Seek(offset, os.SEEK_SET); err != nil {
return err
} else if n != offset {
return errors.New("WriteFileChunk: seek failure") return errors.New("WriteFileChunk: seek failure")
} }
n, err := io.Copy(file, src) n, err := io.Copy(file, src)
if n > 0 { if n > 0 {
entry := logEntry{Chunk: &chunkEntry{Start: start, End: start + n - 1}} if err := s.setOffset(id, offset+n); err != nil {
if err := s.appendFileLog(id, entry); err != nil {
return err return err
} }
} }
return err return err
} }
func (s *DataStore) GetFileMeta(id string) (*fileMeta, error) {
// @TODO stream the file / limit log file size?
data, err := ioutil.ReadFile(s.logPath(id))
if err != nil {
return nil, err
}
lines := strings.Split(string(data), "\n")
// last line is always empty, lets skip it
lines = lines[:len(lines)-1]
meta := &fileMeta{
Chunks: make(chunkSet, 0, len(lines)),
}
for _, line := range lines {
entry := logEntry{}
if err := json.Unmarshal([]byte(line), &entry); err != nil {
return nil, err
}
if entry.Chunk != nil {
meta.Chunks.Add(chunk{Start: entry.Chunk.Start, End: entry.Chunk.End})
}
if entry.Meta != nil {
meta.ContentType = entry.Meta.ContentType
meta.ContentDisposition = entry.Meta.ContentDisposition
meta.Size = entry.Meta.Size
}
}
return meta, nil
}
func (s *DataStore) ReadFile(id string) (io.ReadCloser, error) { func (s *DataStore) ReadFile(id string) (io.ReadCloser, error) {
file, err := os.Open(s.filePath(id)) return os.Open(s.filePath(id))
if err != nil {
return nil, err
}
return file, nil
} }
func (s *DataStore) appendFileLog(id string, entry interface{}) error { func (s *DataStore) GetInfo(id string) (FileInfo, error) {
logPath := s.logPath(id) s.infoLock(id).RLock()
logFile, err := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666) defer s.infoLock(id).RUnlock()
if err != nil {
return err
}
defer logFile.Close()
data, err := json.Marshal(entry) return s.getInfo(id)
}
// getInfo is the same as GetInfo, but does not apply any locks, requiring
// the caller to take care of this.
func (s *DataStore) getInfo(id string) (FileInfo, error) {
info := FileInfo{}
data, err := ioutil.ReadFile(s.jsonPath(id))
if err != nil {
return info, err
}
err = json.Unmarshal(data, &info)
return info, err
}
func (s *DataStore) writeInfo(id string, info FileInfo) error {
data, err := json.Marshal(info)
if err != nil { if err != nil {
return err return err
} }
if _, err := logFile.WriteString(string(data) + "\n"); err != nil { return ioutil.WriteFile(s.jsonPath(id), data, defaultFilePerm)
}
// setOffset updates the offset of a file, unless the current offset on disk is
// already greater.
func (s *DataStore) setOffset(id string, offset int64) error {
s.infoLock(id).Lock()
defer s.infoLock(id).Unlock()
info, err := s.getInfo(id)
if err != nil {
return err return err
} }
return nil
// never decrement the offset
if info.Offset >= offset {
return nil
}
info.Offset = offset
return s.writeInfo(id, info)
} }
func (s *DataStore) filePath(id string) string { func (s *DataStore) filePath(id string) string {
return path.Join(s.dir, id) + ".bin" return path.Join(s.dir, id) + ".bin"
} }
func (s *DataStore) logPath(id string) string { func (s *DataStore) jsonPath(id string) string {
return path.Join(s.dir, id) + ".log" return path.Join(s.dir, id) + ".json"
} }
// TODO: This works for now, but it would be better if we would trigger gc() // TODO: This works for now, but it would be better if we would trigger gc()
@ -240,23 +253,8 @@ func (s sortableFiles) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }
type fileMeta struct { type FileInfo struct {
ContentType string Offset int64
ContentDisposition string FinalLength int64
Size int64 Meta map[string]string
Chunks chunkSet
}
type logEntry struct {
Chunk *chunkEntry `json:",omitempty"`
Meta *metaEntry `json:",omitempty"`
}
type chunkEntry struct {
Start, End int64
}
type metaEntry struct {
Size int64
ContentType string
ContentDisposition string
} }

View File

@ -93,10 +93,13 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "PATCH" { if r.Method == "PATCH" {
h.patchFile(w, r, id) h.patchFile(w, r, id)
return return
} else if r.Method == "HEAD" {
h.headFile(w, r, id)
return
} }
// handle invalid method // handle invalid method
allowed := "PATCH" allowed := "HEAD,PATCH"
w.Header().Set("Allow", allowed) w.Header().Set("Allow", allowed)
err := errors.New(r.Method + " used against file creation url. Allowed: " + allowed) err := errors.New(r.Method + " used against file creation url. Allowed: " + allowed)
h.err(err, w, http.StatusMethodNotAllowed) h.err(err, w, http.StatusMethodNotAllowed)
@ -136,13 +139,25 @@ func (h *Handler) patchFile(w http.ResponseWriter, r *http.Request, id string) {
return return
} }
// @TODO Reject if offset > current offset
// @TODO Test offset < current offset
err = h.store.WriteFileChunk(id, offset, r.Body) err = h.store.WriteFileChunk(id, offset, r.Body)
if err != nil {
// @TODO handle 404 properly (goes for all h.err calls)
h.err(err, w, http.StatusInternalServerError)
return
}
}
func (h *Handler) headFile(w http.ResponseWriter, r *http.Request, id string) {
info, err := h.store.GetInfo(id)
if err != nil { if err != nil {
h.err(err, w, http.StatusInternalServerError) h.err(err, w, http.StatusInternalServerError)
return return
} }
fmt.Printf("success\n") w.Header().Set("Offset", fmt.Sprintf("%d", info.Offset))
} }
func getPositiveIntHeader(r *http.Request, key string) (int64, error) { func getPositiveIntHeader(r *http.Request, key string) (int64, error) {

View File

@ -128,7 +128,7 @@ var Protocol_Core_Tests = []struct {
{ {
Method: "PUT", Method: "PUT",
ExpectStatusCode: http.StatusMethodNotAllowed, ExpectStatusCode: http.StatusMethodNotAllowed,
ExpectHeaders: map[string]string{"Allow": "PATCH"}, ExpectHeaders: map[string]string{"Allow": "HEAD,PATCH"},
}, },
}, },
}, },
@ -176,6 +176,31 @@ var Protocol_Core_Tests = []struct {
}, },
}, },
}, },
{
Description: "Resume",
FinalLength: 11,
ExpectFileContent: "hello world",
Requests: []TestRequest{
{
Method: "PATCH",
Headers: map[string]string{"Offset": "0"},
Body: "hello",
ExpectStatusCode: http.StatusOK,
},
{
Method: "HEAD",
ExpectStatusCode: http.StatusOK,
ExpectHeaders: map[string]string{"Offset": "5"},
},
{
Method: "PATCH",
Headers: map[string]string{"Offset": "5"},
Body: " world",
ExpectStatusCode: http.StatusOK,
},
},
},
// @TODO Test applying PATCH at offset > current offset (error)
} }
func TestProtocol_Core(t *testing.T) { func TestProtocol_Core(t *testing.T) {
@ -187,7 +212,8 @@ Tests:
t.Logf("test: %s", test.Description) t.Logf("test: %s", test.Description)
location := createFile(setup, test.FinalLength) location := createFile(setup, test.FinalLength)
for _, request := range test.Requests { for i, request := range test.Requests {
t.Logf("- request #%d: %s", i+1, request.Method)
request.Url = location request.Url = location
if err := request.Do(); err != nil { if err := request.Do(); err != nil {
t.Error(err) t.Error(err)