parent
79db40bf51
commit
e378965e21
|
@ -5,17 +5,23 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type DataStore struct {
|
type DataStore struct {
|
||||||
dir string
|
dir string
|
||||||
|
maxSize int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDataStore(dir string) *DataStore {
|
func NewDataStore(dir string, maxSize int64) *DataStore {
|
||||||
return &DataStore{dir: dir}
|
store := &DataStore{dir: dir, maxSize: maxSize}
|
||||||
|
go store.gcLoop()
|
||||||
|
return store
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DataStore) CreateFile(id string, size int64, contentType string, contentDisposition string) error {
|
func (s *DataStore) CreateFile(id string, size int64, contentType string, contentDisposition string) error {
|
||||||
|
@ -126,11 +132,115 @@ func (s *DataStore) appendFileLog(id string, entry interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DataStore) filePath(id string) string {
|
func (s *DataStore) filePath(id string) string {
|
||||||
return path.Join(s.dir, id)
|
return path.Join(s.dir, id)+".bin"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DataStore) logPath(id string) string {
|
func (s *DataStore) logPath(id string) string {
|
||||||
return s.filePath(id) + ".log"
|
return path.Join(s.dir, id)+".log"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *DataStore) gcLoop() {
|
||||||
|
for {
|
||||||
|
if before, after, err := s.gc(); err != nil {
|
||||||
|
log.Printf("DataStore: gc error: %s", err)
|
||||||
|
} else if before != after {
|
||||||
|
log.Printf("DataStore: gc before: %d, after: %d", before, after)
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BUG: gc could interfer with active uploads if storage pressure is high. To
|
||||||
|
// fix this we need a mechanism to detect this scenario and reject new storage
|
||||||
|
// ops if the current storage ops require all of the available dataStore size.
|
||||||
|
|
||||||
|
// gc shrinks the amount of bytes used by the DataStore to <= maxSize by
|
||||||
|
// deleting the oldest files according to their mtime.
|
||||||
|
func (s *DataStore) gc() (before int64, after int64, err error) {
|
||||||
|
dataDir, err := os.Open(s.dir)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer dataDir.Close()
|
||||||
|
|
||||||
|
stats, err := dataDir.Readdir(-1)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sortableStats := sortableFiles(stats)
|
||||||
|
sort.Sort(sortableStats)
|
||||||
|
|
||||||
|
deleted := make(map[string]bool, len(sortableStats))
|
||||||
|
|
||||||
|
// Delete enough files so that we are <= maxSize
|
||||||
|
for _, stat := range sortableStats {
|
||||||
|
size := stat.Size()
|
||||||
|
before += size
|
||||||
|
|
||||||
|
if before <= s.maxSize {
|
||||||
|
after += size
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
name := stat.Name()
|
||||||
|
fullPath := path.Join(s.dir, name)
|
||||||
|
if err = os.Remove(fullPath); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
deleted[fullPath] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure we did not delete a .log file but forgot the .bin or vice-versa.
|
||||||
|
for fullPath, _ := range deleted {
|
||||||
|
ext := path.Ext(fullPath)
|
||||||
|
base := fullPath[0:len(fullPath)-len(ext)]
|
||||||
|
|
||||||
|
counterPath := ""
|
||||||
|
if ext == ".bin" {
|
||||||
|
counterPath = base+".log"
|
||||||
|
} else if ext == ".log" {
|
||||||
|
counterPath = base+".bin"
|
||||||
|
}
|
||||||
|
|
||||||
|
if counterPath == "" || deleted[counterPath] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
stat, statErr := os.Stat(counterPath)
|
||||||
|
if statErr != nil {
|
||||||
|
if os.IsNotExist(statErr) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = statErr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = os.Remove(counterPath)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
after -= stat.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type sortableFiles []os.FileInfo
|
||||||
|
|
||||||
|
func (s sortableFiles) Len() int {
|
||||||
|
return len(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s sortableFiles) Less(i, j int) bool {
|
||||||
|
return s[i].ModTime().After(s[j].ModTime())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s sortableFiles) Swap(i, j int) {
|
||||||
|
s[i], s[j] = s[j], s[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
type fileMeta struct {
|
type fileMeta struct {
|
||||||
|
|
|
@ -11,6 +11,11 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// dataStoreSize limits the storage used by the data store. If exceeded, the
|
||||||
|
// data store will start garbage collection old files until enough storage is
|
||||||
|
// available again.
|
||||||
|
const dataStoreSize = 1024 * 1024 * 1024
|
||||||
|
|
||||||
// fileRoute matches /files/<id>. Go seems to use \r to terminate header
|
// fileRoute matches /files/<id>. Go seems to use \r to terminate header
|
||||||
// values, so to ease bash scripting, the route ignores a trailing \r in the
|
// values, so to ease bash scripting, the route ignores a trailing \r in the
|
||||||
// route. Better ideas are welcome.
|
// route. Better ideas are welcome.
|
||||||
|
@ -29,7 +34,7 @@ func init() {
|
||||||
if err := os.MkdirAll(dataDir, 0777); err != nil {
|
if err := os.MkdirAll(dataDir, 0777); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
dataStore = NewDataStore(dataDir)
|
dataStore = NewDataStore(dataDir, dataStoreSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func serveHttp() error {
|
func serveHttp() error {
|
||||||
|
|
Loading…
Reference in New Issue