diff --git a/src/http/data_store.go b/src/http/data_store.go index 239956e..bd1b09e 100644 --- a/src/http/data_store.go +++ b/src/http/data_store.go @@ -9,127 +9,140 @@ import ( "os" "path" "sort" - "strings" + "sync" "time" ) +const defaultFilePerm = 0666 + type DataStore struct { dir string maxSize int64 + + // infoLocksLock locks the infosLocks map + infoLocksLock *sync.Mutex + // infoLocks locks the .info files + infoLocks map[string]*sync.RWMutex } func newDataStore(dir string, maxSize int64) *DataStore { - store := &DataStore{dir: dir, maxSize: maxSize} + store := &DataStore{ + dir: dir, + maxSize: maxSize, + infoLocksLock: &sync.Mutex{}, + infoLocks: make(map[string]*sync.RWMutex), + } go store.gcLoop() return store } -func (s *DataStore) CreateFile(id string, size int64, meta map[string]string) error { - file, err := os.OpenFile(s.filePath(id), os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return err - } - defer file.Close() +// infoLock returns the lock for the .info file of the given file id. +func (s *DataStore) infoLock(id string) *sync.RWMutex { + s.infoLocksLock.Lock() + defer s.infoLocksLock.Unlock() - // @TODO Refactor DataStore to support meta argument properly. Needs to be - // defined in tus protocol first. - entry := logEntry{Meta: &metaEntry{ - Size: size, - }} - return s.appendFileLog(id, entry) + lock := s.infoLocks[id] + if lock == nil { + lock = &sync.RWMutex{} + s.infoLocks[id] = lock + } + return lock } -func (s *DataStore) WriteFileChunk(id string, start int64, src io.Reader) error { - file, err := os.OpenFile(s.filePath(id), os.O_WRONLY, 0666) +func (s *DataStore) CreateFile(id string, finalLength int64, meta map[string]string) error { + file, err := os.OpenFile(s.filePath(id), os.O_CREATE|os.O_WRONLY, defaultFilePerm) if err != nil { return err } defer file.Close() - if n, err := file.Seek(start, os.SEEK_SET); err != nil { + s.infoLock(id).Lock() + defer s.infoLock(id).Unlock() + + return s.writeInfo(id, FileInfo{FinalLength: finalLength, Meta: meta}) +} + +func (s *DataStore) WriteFileChunk(id string, offset int64, src io.Reader) error { + file, err := os.OpenFile(s.filePath(id), os.O_WRONLY, defaultFilePerm) + if err != nil { return err - } else if n != start { + } + defer file.Close() + + if n, err := file.Seek(offset, os.SEEK_SET); err != nil { + return err + } else if n != offset { return errors.New("WriteFileChunk: seek failure") } n, err := io.Copy(file, src) if n > 0 { - entry := logEntry{Chunk: &chunkEntry{Start: start, End: start + n - 1}} - if err := s.appendFileLog(id, entry); err != nil { + if err := s.setOffset(id, offset+n); err != nil { return err } } return err } -func (s *DataStore) GetFileMeta(id string) (*fileMeta, error) { - // @TODO stream the file / limit log file size? - data, err := ioutil.ReadFile(s.logPath(id)) - if err != nil { - return nil, err - } - lines := strings.Split(string(data), "\n") - // last line is always empty, lets skip it - lines = lines[:len(lines)-1] - - meta := &fileMeta{ - Chunks: make(chunkSet, 0, len(lines)), - } - - for _, line := range lines { - entry := logEntry{} - if err := json.Unmarshal([]byte(line), &entry); err != nil { - return nil, err - } - - if entry.Chunk != nil { - meta.Chunks.Add(chunk{Start: entry.Chunk.Start, End: entry.Chunk.End}) - } - - if entry.Meta != nil { - meta.ContentType = entry.Meta.ContentType - meta.ContentDisposition = entry.Meta.ContentDisposition - meta.Size = entry.Meta.Size - } - } - - return meta, nil -} - func (s *DataStore) ReadFile(id string) (io.ReadCloser, error) { - file, err := os.Open(s.filePath(id)) - if err != nil { - return nil, err - } - - return file, nil + return os.Open(s.filePath(id)) } -func (s *DataStore) appendFileLog(id string, entry interface{}) error { - logPath := s.logPath(id) - logFile, err := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return err - } - defer logFile.Close() +func (s *DataStore) GetInfo(id string) (FileInfo, error) { + s.infoLock(id).RLock() + defer s.infoLock(id).RUnlock() - data, err := json.Marshal(entry) + return s.getInfo(id) +} + +// getInfo is the same as GetInfo, but does not apply any locks, requiring +// the caller to take care of this. +func (s *DataStore) getInfo(id string) (FileInfo, error) { + info := FileInfo{} + data, err := ioutil.ReadFile(s.jsonPath(id)) + if err != nil { + return info, err + } + + err = json.Unmarshal(data, &info) + return info, err +} + +func (s *DataStore) writeInfo(id string, info FileInfo) error { + data, err := json.Marshal(info) if err != nil { return err } - if _, err := logFile.WriteString(string(data) + "\n"); err != nil { + return ioutil.WriteFile(s.jsonPath(id), data, defaultFilePerm) +} + +// setOffset updates the offset of a file, unless the current offset on disk is +// already greater. +func (s *DataStore) setOffset(id string, offset int64) error { + s.infoLock(id).Lock() + defer s.infoLock(id).Unlock() + + info, err := s.getInfo(id) + if err != nil { return err } - return nil + + // never decrement the offset + if info.Offset >= offset { + return nil + } + + info.Offset = offset + return s.writeInfo(id, info) } func (s *DataStore) filePath(id string) string { return path.Join(s.dir, id) + ".bin" } -func (s *DataStore) logPath(id string) string { - return path.Join(s.dir, id) + ".log" +func (s *DataStore) jsonPath(id string) string { + return path.Join(s.dir, id) + ".json" } // TODO: This works for now, but it would be better if we would trigger gc() @@ -240,23 +253,8 @@ func (s sortableFiles) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -type fileMeta struct { - ContentType string - ContentDisposition string - Size int64 - Chunks chunkSet -} - -type logEntry struct { - Chunk *chunkEntry `json:",omitempty"` - Meta *metaEntry `json:",omitempty"` -} - -type chunkEntry struct { - Start, End int64 -} -type metaEntry struct { - Size int64 - ContentType string - ContentDisposition string +type FileInfo struct { + Offset int64 + FinalLength int64 + Meta map[string]string } diff --git a/src/http/handler.go b/src/http/handler.go index a898c42..360acb4 100644 --- a/src/http/handler.go +++ b/src/http/handler.go @@ -93,10 +93,13 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method == "PATCH" { h.patchFile(w, r, id) return + } else if r.Method == "HEAD" { + h.headFile(w, r, id) + return } // handle invalid method - allowed := "PATCH" + allowed := "HEAD,PATCH" w.Header().Set("Allow", allowed) err := errors.New(r.Method + " used against file creation url. Allowed: " + allowed) h.err(err, w, http.StatusMethodNotAllowed) @@ -136,13 +139,25 @@ func (h *Handler) patchFile(w http.ResponseWriter, r *http.Request, id string) { return } + // @TODO Reject if offset > current offset + // @TODO Test offset < current offset + err = h.store.WriteFileChunk(id, offset, r.Body) + if err != nil { + // @TODO handle 404 properly (goes for all h.err calls) + h.err(err, w, http.StatusInternalServerError) + return + } +} + +func (h *Handler) headFile(w http.ResponseWriter, r *http.Request, id string) { + info, err := h.store.GetInfo(id) if err != nil { h.err(err, w, http.StatusInternalServerError) return } - fmt.Printf("success\n") + w.Header().Set("Offset", fmt.Sprintf("%d", info.Offset)) } func getPositiveIntHeader(r *http.Request, key string) (int64, error) { diff --git a/src/http/handler_test.go b/src/http/handler_test.go index 3b0bf3d..4f87077 100644 --- a/src/http/handler_test.go +++ b/src/http/handler_test.go @@ -128,7 +128,7 @@ var Protocol_Core_Tests = []struct { { Method: "PUT", ExpectStatusCode: http.StatusMethodNotAllowed, - ExpectHeaders: map[string]string{"Allow": "PATCH"}, + ExpectHeaders: map[string]string{"Allow": "HEAD,PATCH"}, }, }, }, @@ -176,6 +176,31 @@ var Protocol_Core_Tests = []struct { }, }, }, + { + Description: "Resume", + FinalLength: 11, + ExpectFileContent: "hello world", + Requests: []TestRequest{ + { + Method: "PATCH", + Headers: map[string]string{"Offset": "0"}, + Body: "hello", + ExpectStatusCode: http.StatusOK, + }, + { + Method: "HEAD", + ExpectStatusCode: http.StatusOK, + ExpectHeaders: map[string]string{"Offset": "5"}, + }, + { + Method: "PATCH", + Headers: map[string]string{"Offset": "5"}, + Body: " world", + ExpectStatusCode: http.StatusOK, + }, + }, + }, + // @TODO Test applying PATCH at offset > current offset (error) } func TestProtocol_Core(t *testing.T) { @@ -187,7 +212,8 @@ Tests: t.Logf("test: %s", test.Description) location := createFile(setup, test.FinalLength) - for _, request := range test.Requests { + for i, request := range test.Requests { + t.Logf("- request #%d: %s", i+1, request.Method) request.Url = location if err := request.Do(); err != nil { t.Error(err)