feat: add ImportReader class to wrap and track the reading of the import and update at every read, and support a stage number to act as an offset so it can be used for both s3 upload and sia upload stages
This commit is contained in:
parent
0e3a25aa8a
commit
5523d5e60d
|
@ -15,7 +15,7 @@ import (
|
||||||
var ErrNotFound = gorm.ErrRecordNotFound
|
var ErrNotFound = gorm.ErrRecordNotFound
|
||||||
|
|
||||||
var _ ImportService = (*ImportServiceDefault)(nil)
|
var _ ImportService = (*ImportServiceDefault)(nil)
|
||||||
var _ io.Reader = (*ImportReader)(nil)
|
var _ io.ReadSeekCloser = (*ImportReader)(nil)
|
||||||
|
|
||||||
type ImportMetadata struct {
|
type ImportMetadata struct {
|
||||||
ID uint
|
ID uint
|
||||||
|
@ -180,3 +180,90 @@ func NewImportService(params ImportServiceParams) *ImportServiceDefault {
|
||||||
db: params.Db,
|
db: params.Db,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ImportReader struct {
|
||||||
|
service ImportService
|
||||||
|
meta ImportMetadata
|
||||||
|
reader io.Reader
|
||||||
|
size uint64
|
||||||
|
stage int
|
||||||
|
totalStages int
|
||||||
|
bytesRead uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ImportReader) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
if seeker, ok := i.reader.(io.Seeker); ok {
|
||||||
|
// If seeking to the start, reset progress based on recorded bytes
|
||||||
|
if whence == io.SeekStart && offset == 0 {
|
||||||
|
i.bytesRead = 0
|
||||||
|
i.meta.Progress = 0
|
||||||
|
if err := i.service.SaveImport(context.Background(), i.meta, false); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return seeker.Seek(offset, whence)
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, errors.New("Seek not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ImportReader) Close() error {
|
||||||
|
if closer, ok := i.reader.(io.Closer); ok {
|
||||||
|
return closer.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ImportReader) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = i.reader.Read(p)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update cumulative bytes read
|
||||||
|
i.bytesRead += uint64(n)
|
||||||
|
|
||||||
|
err = i.ReadBytes(n)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ImportReader) ReadBytes(n int) (err error) {
|
||||||
|
stageProgress := float64(100) / float64(i.totalStages)
|
||||||
|
|
||||||
|
// Calculate progress based on bytes read
|
||||||
|
i.meta.Progress = float64(i.bytesRead) / float64(i.size) * 100.0
|
||||||
|
|
||||||
|
// Adjust progress for current stage
|
||||||
|
if i.stage > 1 {
|
||||||
|
i.meta.Progress += float64(i.stage-1) * stageProgress
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure progress doesn't exceed 100%
|
||||||
|
if i.meta.Progress > 100 {
|
||||||
|
i.meta.Progress = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save import progress
|
||||||
|
err = i.service.SaveImport(context.Background(), i.meta, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewImportReader(service ImportService, meta ImportMetadata, reader io.Reader, size uint64, stage, totalStages int) *ImportReader {
|
||||||
|
return &ImportReader{
|
||||||
|
service: service,
|
||||||
|
meta: meta,
|
||||||
|
reader: reader,
|
||||||
|
size: size,
|
||||||
|
stage: stage,
|
||||||
|
totalStages: totalStages,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue