Compare commits

...

4 Commits

4 changed files with 81 additions and 34 deletions

6
go.mod
View File

@ -16,9 +16,9 @@ require (
github.com/swaggo/swag v1.16.1
github.com/tus/tusd v1.11.0
gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe
go.sia.tech/core v0.1.12-0.20230503202148-581dd00ac1d2
go.sia.tech/jape v0.9.0
go.sia.tech/renterd v0.3.0-beta.0.20230520152334-e004ada9c4e9
go.sia.tech/core v0.1.12-0.20230525021639-f38630eccb9c
go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb
go.sia.tech/renterd v0.3.0-beta.0.20230531134531-26641a0859c2
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca
go.sia.tech/web/renterd v0.17.0
go.uber.org/zap v1.24.0

10
go.sum
View File

@ -1200,14 +1200,24 @@ go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJP
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.sia.tech/core v0.1.12-0.20230503202148-581dd00ac1d2 h1:a1UTWnPoH6Nf8Y4MrIf74jqCeMoPVmEJPoICOHqCA4o=
go.sia.tech/core v0.1.12-0.20230503202148-581dd00ac1d2/go.mod h1:D17UWSn99SEfQnEaR9G9n6Kz9+BwqMoUgZ6Cl424LsQ=
go.sia.tech/core v0.1.12-0.20230525021639-f38630eccb9c h1:rMqNjT+imVQlTJaOdndiAmPWAv05ITq1trxno9xGx5M=
go.sia.tech/core v0.1.12-0.20230525021639-f38630eccb9c/go.mod h1:D17UWSn99SEfQnEaR9G9n6Kz9+BwqMoUgZ6Cl424LsQ=
go.sia.tech/jape v0.9.0 h1:kWgMFqALYhLMJYOwWBgJda5ko/fi4iZzRxHRP7pp8NY=
go.sia.tech/jape v0.9.0/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4=
go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb h1:yLDEqkqC19E/HgBoq2Uhw9oH3SMNRyeRjZ7Ep4dPKR8=
go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4=
go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU=
go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo=
go.sia.tech/renterd v0.3.0-beta.0.20230516200305-8097423dbe64 h1:qA+aREwc+i8Q56F9VL9wdxagmtoPKcAd6MVZKOt0JRI=
go.sia.tech/renterd v0.3.0-beta.0.20230516200305-8097423dbe64/go.mod h1:jFxggAqLQ9fs85iLpC7s2Xnit3rTx7AajW37LgR1vQs=
go.sia.tech/renterd v0.3.0-beta.0.20230520152334-e004ada9c4e9 h1:jfxmpr/8UG9IDoU/vA4Jqq2cqWT9SsDJv06uZ48W/qs=
go.sia.tech/renterd v0.3.0-beta.0.20230520152334-e004ada9c4e9/go.mod h1:ln0uIpeEvgd0lhDx1yB2+u6WXUMermu1QO6lI96xJKI=
go.sia.tech/renterd v0.3.0-beta.0.20230526064553-cce3ae57804b h1:QNzx8mGKuMb9ftIQSsLO8HF/8QlI5/tsay+dL4/Tw3U=
go.sia.tech/renterd v0.3.0-beta.0.20230526064553-cce3ae57804b/go.mod h1:KT/DwXfKeqa9gynCxssYRFsj4/PCAd0Lre1oDJZcf7I=
go.sia.tech/renterd v0.3.0-beta.0.20230530151818-141fd470cfdd h1:2MSN8Q84OIsCB061HF8dG7ZMla8Vg/tp6f75+ye8zMc=
go.sia.tech/renterd v0.3.0-beta.0.20230530151818-141fd470cfdd/go.mod h1:xD2ruYaPzSeB4buFZPDE/wTw6tmwSkzgvfQaJTO6rjc=
go.sia.tech/renterd v0.3.0-beta.0.20230531134531-26641a0859c2 h1:ULbNFpIOfIJfmo6HgzuEZNLJK/Mkl1UqNzPhBwxqSrQ=
go.sia.tech/renterd v0.3.0-beta.0.20230531134531-26641a0859c2/go.mod h1:xD2ruYaPzSeB4buFZPDE/wTw6tmwSkzgvfQaJTO6rjc=
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca h1:aZMg2AKevn7jKx+wlusWQfwSM5pNU9aGtRZme29q3O4=
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca/go.mod h1:h/1afFwpxzff6/gG5i1XdAgPK7dEY6FaibhK7N5F86Y=
go.sia.tech/web/renterd v0.14.0 h1:74WDPNYXk71d8uT86rkQAa7AlDp8+VDRsQ2oyhwPplg=

View File

@ -32,6 +32,7 @@ import (
type WorkerConfig struct {
ID string
AllowPrivateIPs bool
BusFlushInterval time.Duration
ContractLockTimeout time.Duration
SessionLockTimeout time.Duration
@ -50,7 +51,8 @@ type BusConfig struct {
Miner *Miner
PersistInterval time.Duration
DBDialector gorm.Dialector
DBLoggerConfig stores.LoggerConfig
DBDialector gorm.Dialector
}
type AutopilotConfig struct {
@ -96,6 +98,10 @@ func (cm chainManager) AcceptBlock(ctx context.Context, b types.Block) error {
return cm.cs.AcceptBlock(sb)
}
func (cm chainManager) LastBlockTime() time.Time {
return time.Unix(int64(cm.cs.CurrentBlock().Timestamp), 0)
}
func (cm chainManager) Synced(ctx context.Context) bool {
return cm.cs.Synced()
}
@ -227,19 +233,6 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht
return nil, nil, err
}
walletDir := filepath.Join(dir, "wallet")
if err := os.MkdirAll(walletDir, 0700); err != nil {
return nil, nil, err
}
walletAddr := wallet.StandardAddress(seed.PublicKey())
ws, ccid, err := stores.NewJSONWalletStore(walletDir, walletAddr)
if err != nil {
return nil, nil, err
} else if err := cs.ConsensusSetSubscribe(ws, ccid, nil); err != nil {
return nil, nil, err
}
w := wallet.NewSingleAddressWallet(seed, ws)
// If no DB dialector was provided, use SQLite.
dbConn := cfg.DBDialector
if dbConn == nil {
@ -250,14 +243,17 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht
dbConn = stores.NewSQLiteConnection(filepath.Join(dbDir, "db.sqlite"))
}
sqlLogger := stores.NewSQLLogger(l.Named("db"), nil)
sqlStore, ccid, err := stores.NewSQLStore(dbConn, true, cfg.PersistInterval, sqlLogger)
sqlLogger := stores.NewSQLLogger(l.Named("db"), cfg.DBLoggerConfig)
walletAddr := wallet.StandardAddress(seed.PublicKey())
sqlStore, ccid, err := stores.NewSQLStore(dbConn, true, cfg.PersistInterval, walletAddr, sqlLogger)
if err != nil {
return nil, nil, err
} else if err := cs.ConsensusSetSubscribe(sqlStore, ccid, nil); err != nil {
return nil, nil, err
}
w := wallet.NewSingleAddressWallet(seed, sqlStore)
if m := cfg.Miner; m != nil {
if err := cs.ConsensusSetSubscribe(m, ccid, nil); err != nil {
return nil, nil, err
@ -284,7 +280,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht
func NewWorker(cfg WorkerConfig, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) {
workerKey := blake2b.Sum256(append([]byte("worker"), seed...))
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.SessionLockTimeout, cfg.SessionReconnectTimeout, cfg.SessionTTL, cfg.BusFlushInterval, cfg.DownloadSectorTimeout, cfg.UploadSectorTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, l)
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.SessionLockTimeout, cfg.SessionReconnectTimeout, cfg.SessionTTL, cfg.BusFlushInterval, cfg.DownloadSectorTimeout, cfg.UploadSectorTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l)
if err != nil {
return nil, nil, err
}

View File

@ -151,16 +151,28 @@ func Main() {
apiPassword string
node.BusConfig
}
busCfg.PersistInterval = 10 * time.Minute
busCfg.DBDialector = getDBDialectorFromEnv()
busCfg.Network = build.ConsensusNetwork
var dbCfg struct {
uri string
user string
password string
database string
}
var dbLoggerCfg struct {
ignoreNotFoundError string
logLevel string
slowThreshold string
}
var workerCfg struct {
enabled bool
remoteAddrs string
apiPassword string
node.WorkerConfig
}
workerCfg.AllowPrivateIPs = false
workerCfg.ContractLockTimeout = 30 * time.Second
var autopilotCfg struct {
@ -172,32 +184,49 @@ func Main() {
tracingEnabled := flag.Bool("tracing-enabled", false, "Enables tracing through OpenTelemetry. If RENTERD_TRACING_ENABLED is set, it overwrites the CLI flag's value. Tracing can be configured using the standard OpenTelemetry environment variables. https://github.com/open-telemetry/opentelemetry-specification/blob/v1.8.0/specification/protocol/exporter.md")
tracingServiceInstanceId := flag.String("tracing-service-instance-id", "cluster", "ID of the service instance used for tracing. If RENTERD_TRACING_SERVICE_INSTANCE_ID is set, it overwrites the CLI flag's value.")
dir := flag.String("dir", ".", "directory to store node state in")
flag.StringVar(&busCfg.remoteAddr, "bus.remoteAddr", "", "URL of remote bus service - can be overwritten using RENTERD_BUS_REMOTE_ADDR environment variable")
flag.StringVar(&busCfg.apiPassword, "bus.apiPassword", "", "API password for remote bus service - can be overwritten using RENTERD_BUS_API_PASSWORD environment variable")
// db
flag.StringVar(&dbCfg.uri, "db.uri", "", "URI of the database to use for the bus - can be overwritten using RENTERD_DB_URI environment variable")
flag.StringVar(&dbCfg.user, "db.user", "", "username for the database to use for the bus - can be overwritten using RENTERD_DB_USER environment variable")
flag.StringVar(&dbCfg.password, "db.password", "", "password for the database to use for the bus - can be overwritten using RENTERD_DB_PASSWORD environment variable")
flag.StringVar(&dbCfg.database, "db.name", "", "name of the database to use for the bus - can be overwritten using RENTERD_DB_NAME environment variable")
// db logger
flag.StringVar(&dbLoggerCfg.ignoreNotFoundError, "db.logger.ignoreNotFoundError", "true", "ignore not found error for logger - can be overwritten using RENTERD_DB_LOGGER_IGNORE_NOT_FOUND_ERROR environment variable")
flag.StringVar(&dbLoggerCfg.logLevel, "db.logger.logLevel", "warn", "log level for logger - can be overwritten using RENTERD_DB_LOGGER_LOG_LEVEL environment variable")
flag.StringVar(&dbLoggerCfg.slowThreshold, "db.logger.slowThreshold", "500ms", "slow threshold for logger - can be overwritten using RENTERD_DB_LOGGER_SLOW_THRESHOLD environment variable")
// bus
flag.BoolVar(&busCfg.Bootstrap, "bus.bootstrap", true, "bootstrap the gateway and consensus modules")
flag.StringVar(&busCfg.GatewayAddr, "bus.gatewayAddr", build.DefaultGatewayAddress, "address to listen on for Sia peer connections - can be overwritten using RENTERD_BUS_GATEWAY_ADDR environment variable")
flag.BoolVar(&workerCfg.enabled, "worker.enabled", true, "enable/disable creating a worker - can be overwritten using the RENTERD_WORKER_ENABLED environment variable")
flag.DurationVar(&busCfg.PersistInterval, "bus.persistInterval", busCfg.PersistInterval, "interval at which to persist the consensus updates")
flag.StringVar(&busCfg.apiPassword, "bus.apiPassword", "", "API password for remote bus service - can be overwritten using RENTERD_BUS_API_PASSWORD environment variable")
flag.StringVar(&busCfg.remoteAddr, "bus.remoteAddr", "", "URL of remote bus service - can be overwritten using RENTERD_BUS_REMOTE_ADDR environment variable")
// worker
flag.DurationVar(&workerCfg.BusFlushInterval, "worker.busFlushInterval", 5*time.Second, "time after which the worker flushes buffered data to bus for persisting")
flag.Uint64Var(&workerCfg.DownloadMaxOverdrive, "worker.downloadMaxOverdrive", 5, "maximum number of active overdrive workers when downloading a slab")
flag.StringVar(&workerCfg.WorkerConfig.ID, "worker.id", "worker", "unique identifier of worker used internally - can be overwritten using the RENTERD_WORKER_ID environment variable")
flag.StringVar(&workerCfg.remoteAddrs, "worker.remoteAddrs", "", "URL of remote worker service(s). Multiple addresses can be provided by separating them with a semicolon. Can be overwritten using RENTERD_WORKER_REMOTE_ADDRS environment variable")
flag.StringVar(&workerCfg.apiPassword, "worker.apiPassword", "", "API password for remote worker service")
flag.DurationVar(&workerCfg.SessionLockTimeout, "worker.sessionLockTimeout", 30*time.Second, "the maximum amount of time a host should wait on the lock when the lock RPC is called")
flag.DurationVar(&workerCfg.SessionReconnectTimeout, "worker.sessionReconnectTimeout", 10*time.Second, "the maximum amount of time reconnecting a session is allowed to take")
flag.DurationVar(&workerCfg.SessionTTL, "worker.sessionTTL", 2*time.Minute, "the time a host session is valid for before reconnecting")
flag.DurationVar(&workerCfg.DownloadSectorTimeout, "worker.downloadSectorTimeout", 3*time.Second, "timeout applied to sector downloads when downloading a slab")
flag.DurationVar(&workerCfg.UploadSectorTimeout, "worker.uploadSectorTimeout", 5*time.Second, "timeout applied to sector uploads when uploading a slab")
flag.Uint64Var(&workerCfg.DownloadMaxOverdrive, "worker.downloadMaxOverdrive", 5, "maximum number of active overdrive workers when downloading a slab")
flag.Uint64Var(&workerCfg.UploadMaxOverdrive, "worker.uploadMaxOverdrive", 5, "maximum number of active overdrive workers when uploading a slab")
flag.DurationVar(&workerCfg.UploadSectorTimeout, "worker.uploadSectorTimeout", 5*time.Second, "timeout applied to sector uploads when uploading a slab")
flag.StringVar(&workerCfg.apiPassword, "worker.apiPassword", "", "API password for remote worker service")
flag.BoolVar(&workerCfg.enabled, "worker.enabled", true, "enable/disable creating a worker - can be overwritten using the RENTERD_WORKER_ENABLED environment variable")
flag.StringVar(&workerCfg.remoteAddrs, "worker.remoteAddrs", "", "URL of remote worker service(s). Multiple addresses can be provided by separating them with a semicolon. Can be overwritten using RENTERD_WORKER_REMOTE_ADDRS environment variable")
// autopilot
flag.DurationVar(&autopilotCfg.AccountsRefillInterval, "autopilot.accountRefillInterval", defaultAccountRefillInterval, "interval at which the autopilot checks the workers' accounts balance and refills them if necessary")
flag.BoolVar(&autopilotCfg.enabled, "autopilot.enabled", true, "enable/disable the autopilot - can be overwritten using the RENTERD_AUTOPILOT_ENABLED environment variable")
flag.DurationVar(&autopilotCfg.Heartbeat, "autopilot.heartbeat", 10*time.Minute, "interval at which autopilot loop runs")
flag.Float64Var(&autopilotCfg.MigrationHealthCutoff, "autopilot.migrationHealthCutoff", 0.75, "health threshold below which slabs are migrated to new hosts")
flag.DurationVar(&autopilotCfg.ScannerInterval, "autopilot.scannerInterval", 24*time.Hour, "interval at which hosts are scanned")
flag.Uint64Var(&autopilotCfg.ScannerBatchSize, "autopilot.scannerBatchSize", 1000, "size of the batch with which hosts are scanned")
flag.DurationVar(&autopilotCfg.ScannerInterval, "autopilot.scannerInterval", 24*time.Hour, "interval at which hosts are scanned")
flag.Uint64Var(&autopilotCfg.ScannerMinRecentFailures, "autopilot.scannerMinRecentFailures", 10, "minimum amount of consesutive failed scans a host must have before it is removed for exceeding the max downtime")
flag.Uint64Var(&autopilotCfg.ScannerNumThreads, "autopilot.scannerNumThreads", 100, "number of threads that scan hosts")
flag.BoolVar(&autopilotCfg.enabled, "autopilot.enabled", true, "enable/disable the autopilot - can be overwritten using the RENTERD_AUTOPILOT_ENABLED environment variable")
flag.DurationVar(&nodeCfg.shutdownTimeout, "node.shutdownTimeout", 5*time.Minute, "the timeout applied to the node shutdown")
flag.Parse()
log.Println("renterd v0.1.0")
@ -212,16 +241,28 @@ func Main() {
}
// Overwrite flags from environment if set.
parseEnvVar("RENTERD_TRACING_ENABLED", tracingEnabled)
parseEnvVar("RENTERD_TRACING_SERVICE_INSTANCE_ID", tracingServiceInstanceId)
parseEnvVar("RENTERD_BUS_REMOTE_ADDR", &busCfg.remoteAddr)
parseEnvVar("RENTERD_BUS_API_PASSWORD", &busCfg.apiPassword)
parseEnvVar("RENTERD_BUS_GATEWAY_ADDR", &busCfg.GatewayAddr)
parseEnvVar("RENTERD_DB_URI", &dbCfg.uri)
parseEnvVar("RENTERD_DB_USER", &dbCfg.user)
parseEnvVar("RENTERD_DB_PASSWORD", &dbCfg.password)
parseEnvVar("RENTERD_DB_NAME", &dbCfg.database)
parseEnvVar("RENTERD_DB_LOGGER_IGNORE_NOT_FOUND_ERROR", &dbLoggerCfg.ignoreNotFoundError)
parseEnvVar("RENTERD_DB_LOGGER_LOG_LEVEL", &dbLoggerCfg.logLevel)
parseEnvVar("RENTERD_DB_LOGGER_SLOW_THRESHOLD", &dbLoggerCfg.slowThreshold)
parseEnvVar("RENTERD_WORKER_REMOTE_ADDRS", &workerCfg.remoteAddrs)
parseEnvVar("RENTERD_WORKER_API_PASSWORD", &workerCfg.apiPassword)
parseEnvVar("RENTERD_WORKER_ENABLED", &workerCfg.enabled)
parseEnvVar("RENTERD_WORKER_ID", &workerCfg.ID)
parseEnvVar("RENTERD_AUTOPILOT_ENABLED", &autopilotCfg.enabled)
parseEnvVar("RENTERD_TRACING_ENABLED", tracingEnabled)
parseEnvVar("RENTERD_TRACING_SERVICE_INSTANCE_ID", tracingServiceInstanceId)
var autopilotShutdownFn func(context.Context) error
var shutdownFns []func(context.Context) error