Compare commits
4 Commits
26042b62ac
...
8331136f7f
Author | SHA1 | Date |
---|---|---|
Derrick Hammer | 8331136f7f | |
Derrick Hammer | 325ab7044f | |
Derrick Hammer | d1742265b6 | |
Derrick Hammer | 09cd274d29 |
6
go.mod
6
go.mod
|
@ -16,9 +16,9 @@ require (
|
|||
github.com/swaggo/swag v1.16.1
|
||||
github.com/tus/tusd v1.11.0
|
||||
gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe
|
||||
go.sia.tech/core v0.1.12-0.20230503202148-581dd00ac1d2
|
||||
go.sia.tech/jape v0.9.0
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230520152334-e004ada9c4e9
|
||||
go.sia.tech/core v0.1.12-0.20230525021639-f38630eccb9c
|
||||
go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230531134531-26641a0859c2
|
||||
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca
|
||||
go.sia.tech/web/renterd v0.17.0
|
||||
go.uber.org/zap v1.24.0
|
||||
|
|
10
go.sum
10
go.sum
|
@ -1200,14 +1200,24 @@ go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJP
|
|||
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
|
||||
go.sia.tech/core v0.1.12-0.20230503202148-581dd00ac1d2 h1:a1UTWnPoH6Nf8Y4MrIf74jqCeMoPVmEJPoICOHqCA4o=
|
||||
go.sia.tech/core v0.1.12-0.20230503202148-581dd00ac1d2/go.mod h1:D17UWSn99SEfQnEaR9G9n6Kz9+BwqMoUgZ6Cl424LsQ=
|
||||
go.sia.tech/core v0.1.12-0.20230525021639-f38630eccb9c h1:rMqNjT+imVQlTJaOdndiAmPWAv05ITq1trxno9xGx5M=
|
||||
go.sia.tech/core v0.1.12-0.20230525021639-f38630eccb9c/go.mod h1:D17UWSn99SEfQnEaR9G9n6Kz9+BwqMoUgZ6Cl424LsQ=
|
||||
go.sia.tech/jape v0.9.0 h1:kWgMFqALYhLMJYOwWBgJda5ko/fi4iZzRxHRP7pp8NY=
|
||||
go.sia.tech/jape v0.9.0/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4=
|
||||
go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb h1:yLDEqkqC19E/HgBoq2Uhw9oH3SMNRyeRjZ7Ep4dPKR8=
|
||||
go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4=
|
||||
go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU=
|
||||
go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230516200305-8097423dbe64 h1:qA+aREwc+i8Q56F9VL9wdxagmtoPKcAd6MVZKOt0JRI=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230516200305-8097423dbe64/go.mod h1:jFxggAqLQ9fs85iLpC7s2Xnit3rTx7AajW37LgR1vQs=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230520152334-e004ada9c4e9 h1:jfxmpr/8UG9IDoU/vA4Jqq2cqWT9SsDJv06uZ48W/qs=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230520152334-e004ada9c4e9/go.mod h1:ln0uIpeEvgd0lhDx1yB2+u6WXUMermu1QO6lI96xJKI=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230526064553-cce3ae57804b h1:QNzx8mGKuMb9ftIQSsLO8HF/8QlI5/tsay+dL4/Tw3U=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230526064553-cce3ae57804b/go.mod h1:KT/DwXfKeqa9gynCxssYRFsj4/PCAd0Lre1oDJZcf7I=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230530151818-141fd470cfdd h1:2MSN8Q84OIsCB061HF8dG7ZMla8Vg/tp6f75+ye8zMc=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230530151818-141fd470cfdd/go.mod h1:xD2ruYaPzSeB4buFZPDE/wTw6tmwSkzgvfQaJTO6rjc=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230531134531-26641a0859c2 h1:ULbNFpIOfIJfmo6HgzuEZNLJK/Mkl1UqNzPhBwxqSrQ=
|
||||
go.sia.tech/renterd v0.3.0-beta.0.20230531134531-26641a0859c2/go.mod h1:xD2ruYaPzSeB4buFZPDE/wTw6tmwSkzgvfQaJTO6rjc=
|
||||
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca h1:aZMg2AKevn7jKx+wlusWQfwSM5pNU9aGtRZme29q3O4=
|
||||
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca/go.mod h1:h/1afFwpxzff6/gG5i1XdAgPK7dEY6FaibhK7N5F86Y=
|
||||
go.sia.tech/web/renterd v0.14.0 h1:74WDPNYXk71d8uT86rkQAa7AlDp8+VDRsQ2oyhwPplg=
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
|
||||
type WorkerConfig struct {
|
||||
ID string
|
||||
AllowPrivateIPs bool
|
||||
BusFlushInterval time.Duration
|
||||
ContractLockTimeout time.Duration
|
||||
SessionLockTimeout time.Duration
|
||||
|
@ -50,6 +51,7 @@ type BusConfig struct {
|
|||
Miner *Miner
|
||||
PersistInterval time.Duration
|
||||
|
||||
DBLoggerConfig stores.LoggerConfig
|
||||
DBDialector gorm.Dialector
|
||||
}
|
||||
|
||||
|
@ -96,6 +98,10 @@ func (cm chainManager) AcceptBlock(ctx context.Context, b types.Block) error {
|
|||
return cm.cs.AcceptBlock(sb)
|
||||
}
|
||||
|
||||
func (cm chainManager) LastBlockTime() time.Time {
|
||||
return time.Unix(int64(cm.cs.CurrentBlock().Timestamp), 0)
|
||||
}
|
||||
|
||||
func (cm chainManager) Synced(ctx context.Context) bool {
|
||||
return cm.cs.Synced()
|
||||
}
|
||||
|
@ -227,19 +233,6 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
walletDir := filepath.Join(dir, "wallet")
|
||||
if err := os.MkdirAll(walletDir, 0700); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
walletAddr := wallet.StandardAddress(seed.PublicKey())
|
||||
ws, ccid, err := stores.NewJSONWalletStore(walletDir, walletAddr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
} else if err := cs.ConsensusSetSubscribe(ws, ccid, nil); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
w := wallet.NewSingleAddressWallet(seed, ws)
|
||||
|
||||
// If no DB dialector was provided, use SQLite.
|
||||
dbConn := cfg.DBDialector
|
||||
if dbConn == nil {
|
||||
|
@ -250,14 +243,17 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht
|
|||
dbConn = stores.NewSQLiteConnection(filepath.Join(dbDir, "db.sqlite"))
|
||||
}
|
||||
|
||||
sqlLogger := stores.NewSQLLogger(l.Named("db"), nil)
|
||||
sqlStore, ccid, err := stores.NewSQLStore(dbConn, true, cfg.PersistInterval, sqlLogger)
|
||||
sqlLogger := stores.NewSQLLogger(l.Named("db"), cfg.DBLoggerConfig)
|
||||
walletAddr := wallet.StandardAddress(seed.PublicKey())
|
||||
sqlStore, ccid, err := stores.NewSQLStore(dbConn, true, cfg.PersistInterval, walletAddr, sqlLogger)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
} else if err := cs.ConsensusSetSubscribe(sqlStore, ccid, nil); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
w := wallet.NewSingleAddressWallet(seed, sqlStore)
|
||||
|
||||
if m := cfg.Miner; m != nil {
|
||||
if err := cs.ConsensusSetSubscribe(m, ccid, nil); err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -284,7 +280,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht
|
|||
|
||||
func NewWorker(cfg WorkerConfig, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) {
|
||||
workerKey := blake2b.Sum256(append([]byte("worker"), seed...))
|
||||
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.SessionLockTimeout, cfg.SessionReconnectTimeout, cfg.SessionTTL, cfg.BusFlushInterval, cfg.DownloadSectorTimeout, cfg.UploadSectorTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, l)
|
||||
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.SessionLockTimeout, cfg.SessionReconnectTimeout, cfg.SessionTTL, cfg.BusFlushInterval, cfg.DownloadSectorTimeout, cfg.UploadSectorTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -151,16 +151,28 @@ func Main() {
|
|||
apiPassword string
|
||||
node.BusConfig
|
||||
}
|
||||
busCfg.PersistInterval = 10 * time.Minute
|
||||
busCfg.DBDialector = getDBDialectorFromEnv()
|
||||
busCfg.Network = build.ConsensusNetwork
|
||||
|
||||
var dbCfg struct {
|
||||
uri string
|
||||
user string
|
||||
password string
|
||||
database string
|
||||
}
|
||||
|
||||
var dbLoggerCfg struct {
|
||||
ignoreNotFoundError string
|
||||
logLevel string
|
||||
slowThreshold string
|
||||
}
|
||||
|
||||
var workerCfg struct {
|
||||
enabled bool
|
||||
remoteAddrs string
|
||||
apiPassword string
|
||||
node.WorkerConfig
|
||||
}
|
||||
workerCfg.AllowPrivateIPs = false
|
||||
workerCfg.ContractLockTimeout = 30 * time.Second
|
||||
|
||||
var autopilotCfg struct {
|
||||
|
@ -172,32 +184,49 @@ func Main() {
|
|||
tracingEnabled := flag.Bool("tracing-enabled", false, "Enables tracing through OpenTelemetry. If RENTERD_TRACING_ENABLED is set, it overwrites the CLI flag's value. Tracing can be configured using the standard OpenTelemetry environment variables. https://github.com/open-telemetry/opentelemetry-specification/blob/v1.8.0/specification/protocol/exporter.md")
|
||||
tracingServiceInstanceId := flag.String("tracing-service-instance-id", "cluster", "ID of the service instance used for tracing. If RENTERD_TRACING_SERVICE_INSTANCE_ID is set, it overwrites the CLI flag's value.")
|
||||
dir := flag.String("dir", ".", "directory to store node state in")
|
||||
flag.StringVar(&busCfg.remoteAddr, "bus.remoteAddr", "", "URL of remote bus service - can be overwritten using RENTERD_BUS_REMOTE_ADDR environment variable")
|
||||
flag.StringVar(&busCfg.apiPassword, "bus.apiPassword", "", "API password for remote bus service - can be overwritten using RENTERD_BUS_API_PASSWORD environment variable")
|
||||
|
||||
// db
|
||||
flag.StringVar(&dbCfg.uri, "db.uri", "", "URI of the database to use for the bus - can be overwritten using RENTERD_DB_URI environment variable")
|
||||
flag.StringVar(&dbCfg.user, "db.user", "", "username for the database to use for the bus - can be overwritten using RENTERD_DB_USER environment variable")
|
||||
flag.StringVar(&dbCfg.password, "db.password", "", "password for the database to use for the bus - can be overwritten using RENTERD_DB_PASSWORD environment variable")
|
||||
flag.StringVar(&dbCfg.database, "db.name", "", "name of the database to use for the bus - can be overwritten using RENTERD_DB_NAME environment variable")
|
||||
|
||||
// db logger
|
||||
flag.StringVar(&dbLoggerCfg.ignoreNotFoundError, "db.logger.ignoreNotFoundError", "true", "ignore not found error for logger - can be overwritten using RENTERD_DB_LOGGER_IGNORE_NOT_FOUND_ERROR environment variable")
|
||||
flag.StringVar(&dbLoggerCfg.logLevel, "db.logger.logLevel", "warn", "log level for logger - can be overwritten using RENTERD_DB_LOGGER_LOG_LEVEL environment variable")
|
||||
flag.StringVar(&dbLoggerCfg.slowThreshold, "db.logger.slowThreshold", "500ms", "slow threshold for logger - can be overwritten using RENTERD_DB_LOGGER_SLOW_THRESHOLD environment variable")
|
||||
|
||||
// bus
|
||||
flag.BoolVar(&busCfg.Bootstrap, "bus.bootstrap", true, "bootstrap the gateway and consensus modules")
|
||||
flag.StringVar(&busCfg.GatewayAddr, "bus.gatewayAddr", build.DefaultGatewayAddress, "address to listen on for Sia peer connections - can be overwritten using RENTERD_BUS_GATEWAY_ADDR environment variable")
|
||||
flag.BoolVar(&workerCfg.enabled, "worker.enabled", true, "enable/disable creating a worker - can be overwritten using the RENTERD_WORKER_ENABLED environment variable")
|
||||
flag.DurationVar(&busCfg.PersistInterval, "bus.persistInterval", busCfg.PersistInterval, "interval at which to persist the consensus updates")
|
||||
flag.StringVar(&busCfg.apiPassword, "bus.apiPassword", "", "API password for remote bus service - can be overwritten using RENTERD_BUS_API_PASSWORD environment variable")
|
||||
flag.StringVar(&busCfg.remoteAddr, "bus.remoteAddr", "", "URL of remote bus service - can be overwritten using RENTERD_BUS_REMOTE_ADDR environment variable")
|
||||
|
||||
// worker
|
||||
flag.DurationVar(&workerCfg.BusFlushInterval, "worker.busFlushInterval", 5*time.Second, "time after which the worker flushes buffered data to bus for persisting")
|
||||
flag.Uint64Var(&workerCfg.DownloadMaxOverdrive, "worker.downloadMaxOverdrive", 5, "maximum number of active overdrive workers when downloading a slab")
|
||||
flag.StringVar(&workerCfg.WorkerConfig.ID, "worker.id", "worker", "unique identifier of worker used internally - can be overwritten using the RENTERD_WORKER_ID environment variable")
|
||||
flag.StringVar(&workerCfg.remoteAddrs, "worker.remoteAddrs", "", "URL of remote worker service(s). Multiple addresses can be provided by separating them with a semicolon. Can be overwritten using RENTERD_WORKER_REMOTE_ADDRS environment variable")
|
||||
flag.StringVar(&workerCfg.apiPassword, "worker.apiPassword", "", "API password for remote worker service")
|
||||
flag.DurationVar(&workerCfg.SessionLockTimeout, "worker.sessionLockTimeout", 30*time.Second, "the maximum amount of time a host should wait on the lock when the lock RPC is called")
|
||||
flag.DurationVar(&workerCfg.SessionReconnectTimeout, "worker.sessionReconnectTimeout", 10*time.Second, "the maximum amount of time reconnecting a session is allowed to take")
|
||||
flag.DurationVar(&workerCfg.SessionTTL, "worker.sessionTTL", 2*time.Minute, "the time a host session is valid for before reconnecting")
|
||||
flag.DurationVar(&workerCfg.DownloadSectorTimeout, "worker.downloadSectorTimeout", 3*time.Second, "timeout applied to sector downloads when downloading a slab")
|
||||
flag.DurationVar(&workerCfg.UploadSectorTimeout, "worker.uploadSectorTimeout", 5*time.Second, "timeout applied to sector uploads when uploading a slab")
|
||||
flag.Uint64Var(&workerCfg.DownloadMaxOverdrive, "worker.downloadMaxOverdrive", 5, "maximum number of active overdrive workers when downloading a slab")
|
||||
flag.Uint64Var(&workerCfg.UploadMaxOverdrive, "worker.uploadMaxOverdrive", 5, "maximum number of active overdrive workers when uploading a slab")
|
||||
flag.DurationVar(&workerCfg.UploadSectorTimeout, "worker.uploadSectorTimeout", 5*time.Second, "timeout applied to sector uploads when uploading a slab")
|
||||
flag.StringVar(&workerCfg.apiPassword, "worker.apiPassword", "", "API password for remote worker service")
|
||||
flag.BoolVar(&workerCfg.enabled, "worker.enabled", true, "enable/disable creating a worker - can be overwritten using the RENTERD_WORKER_ENABLED environment variable")
|
||||
flag.StringVar(&workerCfg.remoteAddrs, "worker.remoteAddrs", "", "URL of remote worker service(s). Multiple addresses can be provided by separating them with a semicolon. Can be overwritten using RENTERD_WORKER_REMOTE_ADDRS environment variable")
|
||||
|
||||
// autopilot
|
||||
flag.DurationVar(&autopilotCfg.AccountsRefillInterval, "autopilot.accountRefillInterval", defaultAccountRefillInterval, "interval at which the autopilot checks the workers' accounts balance and refills them if necessary")
|
||||
flag.BoolVar(&autopilotCfg.enabled, "autopilot.enabled", true, "enable/disable the autopilot - can be overwritten using the RENTERD_AUTOPILOT_ENABLED environment variable")
|
||||
flag.DurationVar(&autopilotCfg.Heartbeat, "autopilot.heartbeat", 10*time.Minute, "interval at which autopilot loop runs")
|
||||
flag.Float64Var(&autopilotCfg.MigrationHealthCutoff, "autopilot.migrationHealthCutoff", 0.75, "health threshold below which slabs are migrated to new hosts")
|
||||
flag.DurationVar(&autopilotCfg.ScannerInterval, "autopilot.scannerInterval", 24*time.Hour, "interval at which hosts are scanned")
|
||||
flag.Uint64Var(&autopilotCfg.ScannerBatchSize, "autopilot.scannerBatchSize", 1000, "size of the batch with which hosts are scanned")
|
||||
flag.DurationVar(&autopilotCfg.ScannerInterval, "autopilot.scannerInterval", 24*time.Hour, "interval at which hosts are scanned")
|
||||
flag.Uint64Var(&autopilotCfg.ScannerMinRecentFailures, "autopilot.scannerMinRecentFailures", 10, "minimum amount of consesutive failed scans a host must have before it is removed for exceeding the max downtime")
|
||||
flag.Uint64Var(&autopilotCfg.ScannerNumThreads, "autopilot.scannerNumThreads", 100, "number of threads that scan hosts")
|
||||
flag.BoolVar(&autopilotCfg.enabled, "autopilot.enabled", true, "enable/disable the autopilot - can be overwritten using the RENTERD_AUTOPILOT_ENABLED environment variable")
|
||||
flag.DurationVar(&nodeCfg.shutdownTimeout, "node.shutdownTimeout", 5*time.Minute, "the timeout applied to the node shutdown")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
log.Println("renterd v0.1.0")
|
||||
|
@ -212,16 +241,28 @@ func Main() {
|
|||
}
|
||||
|
||||
// Overwrite flags from environment if set.
|
||||
parseEnvVar("RENTERD_TRACING_ENABLED", tracingEnabled)
|
||||
parseEnvVar("RENTERD_TRACING_SERVICE_INSTANCE_ID", tracingServiceInstanceId)
|
||||
|
||||
parseEnvVar("RENTERD_BUS_REMOTE_ADDR", &busCfg.remoteAddr)
|
||||
parseEnvVar("RENTERD_BUS_API_PASSWORD", &busCfg.apiPassword)
|
||||
parseEnvVar("RENTERD_BUS_GATEWAY_ADDR", &busCfg.GatewayAddr)
|
||||
|
||||
parseEnvVar("RENTERD_DB_URI", &dbCfg.uri)
|
||||
parseEnvVar("RENTERD_DB_USER", &dbCfg.user)
|
||||
parseEnvVar("RENTERD_DB_PASSWORD", &dbCfg.password)
|
||||
parseEnvVar("RENTERD_DB_NAME", &dbCfg.database)
|
||||
|
||||
parseEnvVar("RENTERD_DB_LOGGER_IGNORE_NOT_FOUND_ERROR", &dbLoggerCfg.ignoreNotFoundError)
|
||||
parseEnvVar("RENTERD_DB_LOGGER_LOG_LEVEL", &dbLoggerCfg.logLevel)
|
||||
parseEnvVar("RENTERD_DB_LOGGER_SLOW_THRESHOLD", &dbLoggerCfg.slowThreshold)
|
||||
|
||||
parseEnvVar("RENTERD_WORKER_REMOTE_ADDRS", &workerCfg.remoteAddrs)
|
||||
parseEnvVar("RENTERD_WORKER_API_PASSWORD", &workerCfg.apiPassword)
|
||||
parseEnvVar("RENTERD_WORKER_ENABLED", &workerCfg.enabled)
|
||||
parseEnvVar("RENTERD_WORKER_ID", &workerCfg.ID)
|
||||
|
||||
parseEnvVar("RENTERD_AUTOPILOT_ENABLED", &autopilotCfg.enabled)
|
||||
parseEnvVar("RENTERD_TRACING_ENABLED", tracingEnabled)
|
||||
parseEnvVar("RENTERD_TRACING_SERVICE_INSTANCE_ID", tracingServiceInstanceId)
|
||||
|
||||
var autopilotShutdownFn func(context.Context) error
|
||||
var shutdownFns []func(context.Context) error
|
||||
|
|
Loading…
Reference in New Issue