From d0e59c87299d78cdae9277e44d26c30b635817a4 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Tue, 6 Jun 2023 16:32:07 -0400 Subject: [PATCH] refactor: no longer embed renterd --- main.go | 8 - renterd/internal/node/miner.go | 150 ----------- renterd/internal/node/node.go | 359 --------------------------- renterd/main.go | 439 --------------------------------- renterd/web.go | 52 ---- service/files/files.go | 6 +- 6 files changed, 3 insertions(+), 1011 deletions(-) delete mode 100644 renterd/internal/node/miner.go delete mode 100644 renterd/internal/node/node.go delete mode 100644 renterd/main.go delete mode 100644 renterd/web.go diff --git a/main.go b/main.go index c868889..63a5579 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "git.lumeweb.com/LumeWeb/portal/db" _ "git.lumeweb.com/LumeWeb/portal/docs" "git.lumeweb.com/LumeWeb/portal/logger" - "git.lumeweb.com/LumeWeb/portal/renterd" "git.lumeweb.com/LumeWeb/portal/service/files" "git.lumeweb.com/LumeWeb/portal/tus" "git.lumeweb.com/LumeWeb/portal/validator" @@ -45,11 +44,6 @@ func main() { // Initialize the database connection db.Init() - // Start the renterd process in a goroutine - go renterd.Main() - - renterd.Ready() - logger.Init() files.Init() @@ -111,6 +105,4 @@ func main() { if err != nil { logger.Get().Error("Failed starting webserver proof", zap.Error(err)) } - - renterd.ShutdownComplete() } diff --git a/renterd/internal/node/miner.go b/renterd/internal/node/miner.go deleted file mode 100644 index 003405b..0000000 --- a/renterd/internal/node/miner.go +++ /dev/null @@ -1,150 +0,0 @@ -// TODO: remove this file when we can import it from hostd -package node - -import ( - "bytes" - "context" - "encoding/binary" - "errors" - "fmt" - "sync" - - "go.sia.tech/core/types" - "go.sia.tech/siad/crypto" - "go.sia.tech/siad/modules" - stypes "go.sia.tech/siad/types" - "lukechampine.com/frand" -) - -const solveAttempts = 1e4 - -type ( - // Consensus defines a minimal interface needed by the miner to interact - // with the consensus set - Consensus interface { - AcceptBlock(context.Context, types.Block) error - } - - // A Miner is a CPU miner that can mine blocks, sending the reward to a - // specified address. - Miner struct { - consensus Consensus - - mu sync.Mutex - height stypes.BlockHeight - target stypes.Target - currentBlockID stypes.BlockID - txnsets map[modules.TransactionSetID][]stypes.TransactionID - transactions []stypes.Transaction - } -) - -var errFailedToSolve = errors.New("failed to solve block") - -// ProcessConsensusChange implements modules.ConsensusSetSubscriber. -func (m *Miner) ProcessConsensusChange(cc modules.ConsensusChange) { - m.mu.Lock() - defer m.mu.Unlock() - m.target = cc.ChildTarget - m.currentBlockID = cc.AppliedBlocks[len(cc.AppliedBlocks)-1].ID() - m.height = cc.BlockHeight -} - -// ReceiveUpdatedUnconfirmedTransactions implements modules.TransactionPoolSubscriber -func (m *Miner) ReceiveUpdatedUnconfirmedTransactions(diff *modules.TransactionPoolDiff) { - m.mu.Lock() - defer m.mu.Unlock() - - reverted := make(map[stypes.TransactionID]bool) - for _, setID := range diff.RevertedTransactions { - for _, txnID := range m.txnsets[setID] { - reverted[txnID] = true - } - } - - filtered := m.transactions[:0] - for _, txn := range m.transactions { - if reverted[txn.ID()] { - continue - } - filtered = append(filtered, txn) - } - - for _, txnset := range diff.AppliedTransactions { - m.txnsets[txnset.ID] = txnset.IDs - filtered = append(filtered, txnset.Transactions...) - } - m.transactions = filtered -} - -// mineBlock attempts to mine a block and add it to the consensus set. -func (m *Miner) mineBlock(addr stypes.UnlockHash) error { - m.mu.Lock() - block := stypes.Block{ - ParentID: m.currentBlockID, - Timestamp: stypes.CurrentTimestamp(), - } - - randBytes := frand.Bytes(stypes.SpecifierLen) - randTxn := stypes.Transaction{ - ArbitraryData: [][]byte{append(modules.PrefixNonSia[:], randBytes...)}, - } - block.Transactions = append([]stypes.Transaction{randTxn}, m.transactions...) - block.MinerPayouts = append(block.MinerPayouts, stypes.SiacoinOutput{ - Value: block.CalculateSubsidy(m.height + 1), - UnlockHash: addr, - }) - target := m.target - m.mu.Unlock() - - merkleRoot := block.MerkleRoot() - header := make([]byte, 80) - copy(header, block.ParentID[:]) - binary.LittleEndian.PutUint64(header[40:48], uint64(block.Timestamp)) - copy(header[48:], merkleRoot[:]) - - var nonce uint64 - var solved bool - for i := 0; i < solveAttempts; i++ { - id := crypto.HashBytes(header) - if bytes.Compare(target[:], id[:]) >= 0 { - block.Nonce = *(*stypes.BlockNonce)(header[32:40]) - solved = true - break - } - binary.LittleEndian.PutUint64(header[32:], nonce) - nonce += stypes.ASICHardforkFactor - } - if !solved { - return errFailedToSolve - } - - var b types.Block - convertToCore(&block, &b) - if err := m.consensus.AcceptBlock(context.Background(), b); err != nil { - return fmt.Errorf("failed to get block accepted: %w", err) - } - return nil -} - -// Mine mines n blocks, sending the reward to addr -func (m *Miner) Mine(addr types.Address, n int) error { - var err error - for mined := 1; mined <= n; { - // return the error only if the miner failed to solve the block, - // ignore any consensus related errors - if err = m.mineBlock(stypes.UnlockHash(addr)); errors.Is(err, errFailedToSolve) { - return fmt.Errorf("failed to mine block %v: %w", mined, errFailedToSolve) - } - mined++ - } - return nil -} - -// NewMiner initializes a new CPU miner -func NewMiner(consensus Consensus) *Miner { - return &Miner{ - consensus: consensus, - txnsets: make(map[modules.TransactionSetID][]stypes.TransactionID), - } -} diff --git a/renterd/internal/node/node.go b/renterd/internal/node/node.go deleted file mode 100644 index 8061ec1..0000000 --- a/renterd/internal/node/node.go +++ /dev/null @@ -1,359 +0,0 @@ -package node - -import ( - "bytes" - "context" - "errors" - "log" - "net/http" - "os" - "path/filepath" - "strings" - "time" - - "gitlab.com/NebulousLabs/encoding" - "go.sia.tech/core/consensus" - "go.sia.tech/core/types" - "go.sia.tech/renterd/autopilot" - "go.sia.tech/renterd/bus" - "go.sia.tech/renterd/stores" - "go.sia.tech/renterd/wallet" - "go.sia.tech/renterd/worker" - "go.sia.tech/siad/modules" - mconsensus "go.sia.tech/siad/modules/consensus" - "go.sia.tech/siad/modules/gateway" - "go.sia.tech/siad/modules/transactionpool" - stypes "go.sia.tech/siad/types" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "golang.org/x/crypto/blake2b" - "gorm.io/gorm" -) - -type WorkerConfig struct { - ID string - AllowPrivateIPs bool - BusFlushInterval time.Duration - ContractLockTimeout time.Duration - SessionLockTimeout time.Duration - SessionReconnectTimeout time.Duration - SessionTTL time.Duration - DownloadSectorTimeout time.Duration - UploadSectorTimeout time.Duration - DownloadMaxOverdrive uint64 - UploadMaxOverdrive uint64 -} - -type BusConfig struct { - Bootstrap bool - GatewayAddr string - Network *consensus.Network - Miner *Miner - PersistInterval time.Duration - - DBLoggerConfig stores.LoggerConfig - DBDialector gorm.Dialector -} - -type AutopilotConfig struct { - AccountsRefillInterval time.Duration - Heartbeat time.Duration - MigrationHealthCutoff float64 - ScannerInterval time.Duration - ScannerBatchSize uint64 - ScannerMinRecentFailures uint64 - ScannerNumThreads uint64 -} - -type ShutdownFn = func(context.Context) error - -func convertToSiad(core types.EncoderTo, siad encoding.SiaUnmarshaler) { - var buf bytes.Buffer - e := types.NewEncoder(&buf) - core.EncodeTo(e) - e.Flush() - if err := siad.UnmarshalSia(&buf); err != nil { - panic(err) - } -} - -func convertToCore(siad encoding.SiaMarshaler, core types.DecoderFrom) { - var buf bytes.Buffer - siad.MarshalSia(&buf) - d := types.NewBufDecoder(buf.Bytes()) - core.DecodeFrom(d) - if d.Err() != nil { - panic(d.Err()) - } -} - -type chainManager struct { - cs modules.ConsensusSet - network *consensus.Network -} - -func (cm chainManager) AcceptBlock(ctx context.Context, b types.Block) error { - var sb stypes.Block - convertToSiad(b, &sb) - return cm.cs.AcceptBlock(sb) -} - -func (cm chainManager) LastBlockTime() time.Time { - return time.Unix(int64(cm.cs.CurrentBlock().Timestamp), 0) -} - -func (cm chainManager) Synced(ctx context.Context) bool { - return cm.cs.Synced() -} - -func (cm chainManager) TipState(ctx context.Context) consensus.State { - return consensus.State{ - Network: cm.network, - Index: types.ChainIndex{ - Height: uint64(cm.cs.Height()), - ID: types.BlockID(cm.cs.CurrentBlock().ID()), - }, - } -} - -type syncer struct { - g modules.Gateway - tp modules.TransactionPool -} - -func (s syncer) Addr() string { - return string(s.g.Address()) -} - -func (s syncer) Peers() []string { - var peers []string - for _, p := range s.g.Peers() { - peers = append(peers, string(p.NetAddress)) - } - return peers -} - -func (s syncer) Connect(addr string) error { - return s.g.Connect(modules.NetAddress(addr)) -} - -func (s syncer) BroadcastTransaction(txn types.Transaction, dependsOn []types.Transaction) { - txnSet := make([]stypes.Transaction, len(dependsOn)+1) - for i, txn := range dependsOn { - convertToSiad(txn, &txnSet[i]) - } - convertToSiad(txn, &txnSet[len(txnSet)-1]) - s.tp.Broadcast(txnSet) -} - -func (s syncer) SyncerAddress(ctx context.Context) (string, error) { - return string(s.g.Address()), nil -} - -type txpool struct { - tp modules.TransactionPool -} - -func (tp txpool) RecommendedFee() (fee types.Currency) { - _, max := tp.tp.FeeEstimation() - convertToCore(&max, &fee) - return -} - -func (tp txpool) Transactions() []types.Transaction { - stxns := tp.tp.Transactions() - txns := make([]types.Transaction, len(stxns)) - for i := range txns { - convertToCore(&stxns[i], &txns[i]) - } - return txns -} - -func (tp txpool) AddTransactionSet(txns []types.Transaction) error { - stxns := make([]stypes.Transaction, len(txns)) - for i := range stxns { - convertToSiad(&txns[i], &stxns[i]) - } - return tp.tp.AcceptTransactionSet(stxns) -} - -func (tp txpool) UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error) { - pool := tp.Transactions() - outputToParent := make(map[types.SiacoinOutputID]*types.Transaction) - for i, txn := range pool { - for j := range txn.SiacoinOutputs { - outputToParent[txn.SiacoinOutputID(j)] = &pool[i] - } - } - var parents []types.Transaction - seen := make(map[types.TransactionID]bool) - for _, sci := range txn.SiacoinInputs { - if parent, ok := outputToParent[sci.ParentID]; ok { - if txid := parent.ID(); !seen[txid] { - seen[txid] = true - parents = append(parents, *parent) - } - } - } - return parents, nil -} - -func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { - gatewayDir := filepath.Join(dir, "gateway") - if err := os.MkdirAll(gatewayDir, 0700); err != nil { - return nil, nil, err - } - g, err := gateway.New(cfg.GatewayAddr, cfg.Bootstrap, gatewayDir) - if err != nil { - return nil, nil, err - } - consensusDir := filepath.Join(dir, "consensus") - if err := os.MkdirAll(consensusDir, 0700); err != nil { - return nil, nil, err - } - cs, errCh := mconsensus.New(g, cfg.Bootstrap, consensusDir) - select { - case err := <-errCh: - if err != nil { - return nil, nil, err - } - default: - go func() { - if err := <-errCh; err != nil { - log.Println("WARNING: consensus initialization returned an error:", err) - } - }() - } - tpoolDir := filepath.Join(dir, "transactionpool") - if err := os.MkdirAll(tpoolDir, 0700); err != nil { - return nil, nil, err - } - tp, err := transactionpool.New(cs, g, tpoolDir) - if err != nil { - return nil, nil, err - } - - // If no DB dialector was provided, use SQLite. - dbConn := cfg.DBDialector - if dbConn == nil { - dbDir := filepath.Join(dir, "db") - if err := os.MkdirAll(dbDir, 0700); err != nil { - return nil, nil, err - } - dbConn = stores.NewSQLiteConnection(filepath.Join(dbDir, "db.sqlite")) - } - - sqlLogger := stores.NewSQLLogger(l.Named("db"), cfg.DBLoggerConfig) - walletAddr := wallet.StandardAddress(seed.PublicKey()) - sqlStore, ccid, err := stores.NewSQLStore(dbConn, true, cfg.PersistInterval, walletAddr, sqlLogger) - if err != nil { - return nil, nil, err - } else if err := cs.ConsensusSetSubscribe(sqlStore, ccid, nil); err != nil { - return nil, nil, err - } - - w := wallet.NewSingleAddressWallet(seed, sqlStore) - - if m := cfg.Miner; m != nil { - if err := cs.ConsensusSetSubscribe(m, ccid, nil); err != nil { - return nil, nil, err - } - tp.TransactionPoolSubscribe(m) - } - - b, err := bus.New(syncer{g, tp}, chainManager{cs: cs, network: cfg.Network}, txpool{tp}, w, sqlStore, sqlStore, sqlStore, sqlStore, l) - if err != nil { - return nil, nil, err - } - - shutdownFn := func(ctx context.Context) error { - return joinErrors([]error{ - g.Close(), - cs.Close(), - tp.Close(), - b.Shutdown(ctx), - sqlStore.Close(), - }) - } - return b.Handler(), shutdownFn, nil -} - -func NewWorker(cfg WorkerConfig, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { - workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) - w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.SessionLockTimeout, cfg.SessionReconnectTimeout, cfg.SessionTTL, cfg.BusFlushInterval, cfg.DownloadSectorTimeout, cfg.UploadSectorTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l) - if err != nil { - return nil, nil, err - } - - return w.Handler(), w.Shutdown, nil -} - -func NewAutopilot(cfg AutopilotConfig, s autopilot.Store, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, func() error, ShutdownFn, error) { - ap, err := autopilot.New(s, b, workers, l, cfg.Heartbeat, cfg.ScannerInterval, cfg.ScannerBatchSize, cfg.ScannerMinRecentFailures, cfg.ScannerNumThreads, cfg.MigrationHealthCutoff, cfg.AccountsRefillInterval) - if err != nil { - return nil, nil, nil, err - } - return ap.Handler(), ap.Run, ap.Shutdown, nil -} - -func NewLogger(path string) (*zap.Logger, func(context.Context) error, error) { - writer, closeFn, err := zap.Open(path) - if err != nil { - return nil, nil, err - } - - // console - config := zap.NewProductionEncoderConfig() - config.EncodeTime = zapcore.RFC3339TimeEncoder - config.EncodeLevel = zapcore.CapitalColorLevelEncoder - config.StacktraceKey = "" - consoleEncoder := zapcore.NewConsoleEncoder(config) - - // file - config = zap.NewProductionEncoderConfig() - config.EncodeTime = zapcore.RFC3339TimeEncoder - config.CallerKey = "" // hide - config.StacktraceKey = "" // hide - config.NameKey = "component" - config.TimeKey = "date" - fileEncoder := zapcore.NewJSONEncoder(config) - - core := zapcore.NewTee( - zapcore.NewCore(fileEncoder, writer, zapcore.DebugLevel), - zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), zapcore.DebugLevel), - ) - - logger := zap.New( - core, - zap.AddCaller(), - zap.AddStacktrace(zapcore.ErrorLevel), - ) - - return logger, func(_ context.Context) error { - _ = logger.Sync() // ignore Error - closeFn() - return nil - }, nil -} - -func joinErrors(errs []error) error { - filtered := errs[:0] - for _, err := range errs { - if err != nil { - filtered = append(filtered, err) - } - } - - switch len(filtered) { - case 0: - return nil - case 1: - return filtered[0] - default: - strs := make([]string, len(filtered)) - for i := range strs { - strs[i] = filtered[i].Error() - } - return errors.New(strings.Join(strs, ";")) - } -} diff --git a/renterd/main.go b/renterd/main.go deleted file mode 100644 index 972544d..0000000 --- a/renterd/main.go +++ /dev/null @@ -1,439 +0,0 @@ -package renterd - -import ( - "context" - "flag" - "fmt" - "log" - "net" - "net/http" - "os" - "os/signal" - "path/filepath" - "strings" - "syscall" - "time" - - "git.lumeweb.com/LumeWeb/portal/renterd/internal/node" - "go.sia.tech/core/types" - "go.sia.tech/jape" - "go.sia.tech/renterd/autopilot" - "go.sia.tech/renterd/build" - "go.sia.tech/renterd/bus" - "go.sia.tech/renterd/stores" - "go.sia.tech/renterd/tracing" - "go.sia.tech/renterd/wallet" - "go.sia.tech/renterd/worker" - "golang.org/x/term" - "gorm.io/gorm" -) - -const ( - // accountRefillInterval is the amount of time between refills of ephemeral - // accounts. If we conservatively assume that a good host charges 500 SC / - // TiB, we can pay for about 2.2 GiB with 1 SC. Since we want to refill - // ahead of time at 0.5 SC, that makes 1.1 GiB. Considering a 1 Gbps uplink - // that is shared across 30 uploads, we upload at around 33 Mbps to each - // host. That means uploading 1.1 GiB to drain 0.5 SC takes around 5 - // minutes. That's why we assume 10 seconds to be more than frequent enough - // to refill an account when it's due for another refill. - defaultAccountRefillInterval = 10 * time.Second -) - -var ( - // to be supplied at build time - githash = "?" - builddate = "?" - - // fetched once, then cached - apiPassword *string - apiAddr *string - seed *types.PrivateKey - ready = make(chan bool) - readyFired = false - shutdown = make(chan bool) - shutdownFired = false -) - -func check(context string, err error) { - if err != nil { - log.Fatalf("%v: %v", context, err) - } -} - -func GetAPIPassword() string { - if apiPassword == nil { - pw := os.Getenv("RENTERD_API_PASSWORD") - if pw != "" { - fmt.Println("Using RENTERD_API_PASSWORD environment variable.") - apiPassword = &pw - } else { - fmt.Print("Enter API password: ") - pw, err := term.ReadPassword(int(os.Stdin.Fd())) - fmt.Println() - if err != nil { - log.Fatal(err) - } - s := string(pw) - apiPassword = &s - } - } - return *apiPassword -} - -func GetSeed() types.PrivateKey { - if seed == nil { - phrase := os.Getenv("RENTERD_SEED") - if phrase != "" { - fmt.Println("Using RENTERD_SEED environment variable") - } else { - fmt.Print("Enter seed: ") - pw, err := term.ReadPassword(int(os.Stdin.Fd())) - check("Could not read seed phrase:", err) - fmt.Println() - phrase = string(pw) - } - key, err := wallet.KeyFromPhrase(phrase) - if err != nil { - log.Fatal(err) - } - seed = &key - } - return *seed -} - -type currencyVar types.Currency - -func newCurrencyVar(c *types.Currency, d types.Currency) *currencyVar { - *c = d - return (*currencyVar)(c) -} - -func (c *currencyVar) Set(s string) (err error) { - *(*types.Currency)(c), err = types.ParseCurrency(s) - return -} - -func (c *currencyVar) String() string { - return strings.Replace((*types.Currency)(c).String(), " ", "", -1) -} - -func flagCurrencyVar(c *types.Currency, name string, d types.Currency, usage string) { - flag.Var(newCurrencyVar(c, d), name, usage) -} - -func getDBDialectorFromEnv() gorm.Dialector { - uri, user, password, dbName := stores.DBConfigFromEnv() - if uri == "" { - return nil - } - return stores.NewMySQLConnection(user, password, uri, dbName) -} - -func parseEnvVar(s string, v interface{}) { - if env, ok := os.LookupEnv(s); ok { - if _, err := fmt.Sscan(env, v); err != nil { - log.Fatalf("failed to parse %s: %v", s, err) - } - fmt.Printf("Using %s environment variable\n", s) - } -} - -func Main() { - log.SetFlags(0) - - var nodeCfg struct { - shutdownTimeout time.Duration - } - - var busCfg struct { - remoteAddr string - apiPassword string - node.BusConfig - } - busCfg.Network = build.ConsensusNetwork - - var dbCfg struct { - uri string - user string - password string - database string - } - - var dbLoggerCfg struct { - ignoreNotFoundError string - logLevel string - slowThreshold string - } - - var workerCfg struct { - enabled bool - remoteAddrs string - apiPassword string - node.WorkerConfig - } - workerCfg.AllowPrivateIPs = false - workerCfg.ContractLockTimeout = 30 * time.Second - - var autopilotCfg struct { - enabled bool - node.AutopilotConfig - } - - apiAddr = flag.String("http", build.DefaultAPIAddress, "address to serve API on") - tracingEnabled := flag.Bool("tracing-enabled", false, "Enables tracing through OpenTelemetry. If RENTERD_TRACING_ENABLED is set, it overwrites the CLI flag's value. Tracing can be configured using the standard OpenTelemetry environment variables. https://github.com/open-telemetry/opentelemetry-specification/blob/v1.8.0/specification/protocol/exporter.md") - tracingServiceInstanceId := flag.String("tracing-service-instance-id", "cluster", "ID of the service instance used for tracing. If RENTERD_TRACING_SERVICE_INSTANCE_ID is set, it overwrites the CLI flag's value.") - dir := flag.String("dir", ".", "directory to store node state in") - - // db - flag.StringVar(&dbCfg.uri, "db.uri", "", "URI of the database to use for the bus - can be overwritten using RENTERD_DB_URI environment variable") - flag.StringVar(&dbCfg.user, "db.user", "", "username for the database to use for the bus - can be overwritten using RENTERD_DB_USER environment variable") - flag.StringVar(&dbCfg.password, "db.password", "", "password for the database to use for the bus - can be overwritten using RENTERD_DB_PASSWORD environment variable") - flag.StringVar(&dbCfg.database, "db.name", "", "name of the database to use for the bus - can be overwritten using RENTERD_DB_NAME environment variable") - - // db logger - flag.StringVar(&dbLoggerCfg.ignoreNotFoundError, "db.logger.ignoreNotFoundError", "true", "ignore not found error for logger - can be overwritten using RENTERD_DB_LOGGER_IGNORE_NOT_FOUND_ERROR environment variable") - flag.StringVar(&dbLoggerCfg.logLevel, "db.logger.logLevel", "warn", "log level for logger - can be overwritten using RENTERD_DB_LOGGER_LOG_LEVEL environment variable") - flag.StringVar(&dbLoggerCfg.slowThreshold, "db.logger.slowThreshold", "500ms", "slow threshold for logger - can be overwritten using RENTERD_DB_LOGGER_SLOW_THRESHOLD environment variable") - - // bus - flag.BoolVar(&busCfg.Bootstrap, "bus.bootstrap", true, "bootstrap the gateway and consensus modules") - flag.StringVar(&busCfg.GatewayAddr, "bus.gatewayAddr", build.DefaultGatewayAddress, "address to listen on for Sia peer connections - can be overwritten using RENTERD_BUS_GATEWAY_ADDR environment variable") - flag.DurationVar(&busCfg.PersistInterval, "bus.persistInterval", busCfg.PersistInterval, "interval at which to persist the consensus updates") - flag.StringVar(&busCfg.apiPassword, "bus.apiPassword", "", "API password for remote bus service - can be overwritten using RENTERD_BUS_API_PASSWORD environment variable") - flag.StringVar(&busCfg.remoteAddr, "bus.remoteAddr", "", "URL of remote bus service - can be overwritten using RENTERD_BUS_REMOTE_ADDR environment variable") - - // worker - flag.DurationVar(&workerCfg.BusFlushInterval, "worker.busFlushInterval", 5*time.Second, "time after which the worker flushes buffered data to bus for persisting") - flag.Uint64Var(&workerCfg.DownloadMaxOverdrive, "worker.downloadMaxOverdrive", 5, "maximum number of active overdrive workers when downloading a slab") - flag.StringVar(&workerCfg.WorkerConfig.ID, "worker.id", "worker", "unique identifier of worker used internally - can be overwritten using the RENTERD_WORKER_ID environment variable") - flag.DurationVar(&workerCfg.SessionLockTimeout, "worker.sessionLockTimeout", 30*time.Second, "the maximum amount of time a host should wait on the lock when the lock RPC is called") - flag.DurationVar(&workerCfg.SessionReconnectTimeout, "worker.sessionReconnectTimeout", 10*time.Second, "the maximum amount of time reconnecting a session is allowed to take") - flag.DurationVar(&workerCfg.SessionTTL, "worker.sessionTTL", 2*time.Minute, "the time a host session is valid for before reconnecting") - flag.DurationVar(&workerCfg.DownloadSectorTimeout, "worker.downloadSectorTimeout", 3*time.Second, "timeout applied to sector downloads when downloading a slab") - flag.Uint64Var(&workerCfg.UploadMaxOverdrive, "worker.uploadMaxOverdrive", 5, "maximum number of active overdrive workers when uploading a slab") - flag.DurationVar(&workerCfg.UploadSectorTimeout, "worker.uploadSectorTimeout", 5*time.Second, "timeout applied to sector uploads when uploading a slab") - flag.StringVar(&workerCfg.apiPassword, "worker.apiPassword", "", "API password for remote worker service") - flag.BoolVar(&workerCfg.enabled, "worker.enabled", true, "enable/disable creating a worker - can be overwritten using the RENTERD_WORKER_ENABLED environment variable") - flag.StringVar(&workerCfg.remoteAddrs, "worker.remoteAddrs", "", "URL of remote worker service(s). Multiple addresses can be provided by separating them with a semicolon. Can be overwritten using RENTERD_WORKER_REMOTE_ADDRS environment variable") - - // autopilot - flag.DurationVar(&autopilotCfg.AccountsRefillInterval, "autopilot.accountRefillInterval", defaultAccountRefillInterval, "interval at which the autopilot checks the workers' accounts balance and refills them if necessary") - flag.DurationVar(&autopilotCfg.Heartbeat, "autopilot.heartbeat", 10*time.Minute, "interval at which autopilot loop runs") - flag.Float64Var(&autopilotCfg.MigrationHealthCutoff, "autopilot.migrationHealthCutoff", 0.75, "health threshold below which slabs are migrated to new hosts") - flag.Uint64Var(&autopilotCfg.ScannerBatchSize, "autopilot.scannerBatchSize", 1000, "size of the batch with which hosts are scanned") - flag.DurationVar(&autopilotCfg.ScannerInterval, "autopilot.scannerInterval", 24*time.Hour, "interval at which hosts are scanned") - flag.Uint64Var(&autopilotCfg.ScannerMinRecentFailures, "autopilot.scannerMinRecentFailures", 10, "minimum amount of consesutive failed scans a host must have before it is removed for exceeding the max downtime") - flag.Uint64Var(&autopilotCfg.ScannerNumThreads, "autopilot.scannerNumThreads", 100, "number of threads that scan hosts") - flag.BoolVar(&autopilotCfg.enabled, "autopilot.enabled", true, "enable/disable the autopilot - can be overwritten using the RENTERD_AUTOPILOT_ENABLED environment variable") - flag.DurationVar(&nodeCfg.shutdownTimeout, "node.shutdownTimeout", 5*time.Minute, "the timeout applied to the node shutdown") - flag.Parse() - - log.Println("renterd v0.1.0") - log.Println("Network", build.ConsensusNetworkName) - if flag.Arg(0) == "version" { - log.Println("Commit:", githash) - log.Println("Build Date:", builddate) - return - } else if flag.Arg(0) == "seed" { - log.Println("Seed phrase:", wallet.NewSeedPhrase()) - return - } - - // Overwrite flags from environment if set. - parseEnvVar("RENTERD_TRACING_ENABLED", tracingEnabled) - parseEnvVar("RENTERD_TRACING_SERVICE_INSTANCE_ID", tracingServiceInstanceId) - - parseEnvVar("RENTERD_BUS_REMOTE_ADDR", &busCfg.remoteAddr) - parseEnvVar("RENTERD_BUS_API_PASSWORD", &busCfg.apiPassword) - parseEnvVar("RENTERD_BUS_GATEWAY_ADDR", &busCfg.GatewayAddr) - - parseEnvVar("RENTERD_DB_URI", &dbCfg.uri) - parseEnvVar("RENTERD_DB_USER", &dbCfg.user) - parseEnvVar("RENTERD_DB_PASSWORD", &dbCfg.password) - parseEnvVar("RENTERD_DB_NAME", &dbCfg.database) - - parseEnvVar("RENTERD_DB_LOGGER_IGNORE_NOT_FOUND_ERROR", &dbLoggerCfg.ignoreNotFoundError) - parseEnvVar("RENTERD_DB_LOGGER_LOG_LEVEL", &dbLoggerCfg.logLevel) - parseEnvVar("RENTERD_DB_LOGGER_SLOW_THRESHOLD", &dbLoggerCfg.slowThreshold) - - parseEnvVar("RENTERD_WORKER_REMOTE_ADDRS", &workerCfg.remoteAddrs) - parseEnvVar("RENTERD_WORKER_API_PASSWORD", &workerCfg.apiPassword) - parseEnvVar("RENTERD_WORKER_ENABLED", &workerCfg.enabled) - parseEnvVar("RENTERD_WORKER_ID", &workerCfg.ID) - - parseEnvVar("RENTERD_AUTOPILOT_ENABLED", &autopilotCfg.enabled) - - var autopilotShutdownFn func(context.Context) error - var shutdownFns []func(context.Context) error - - // Init tracing. - if *tracingEnabled { - shutdownFn, err := tracing.Init(*tracingServiceInstanceId) - if err != nil { - log.Fatal("failed to init tracing", err) - } - shutdownFns = append(shutdownFns, shutdownFn) - } - - if busCfg.remoteAddr != "" && workerCfg.remoteAddrs != "" && !autopilotCfg.enabled { - log.Fatal("remote bus, remote worker, and no autopilot -- nothing to do!") - } - if workerCfg.remoteAddrs == "" && !workerCfg.enabled && autopilotCfg.enabled { - log.Fatal("can't enable autopilot without providing either workers to connect to or creating a worker") - } - - // create listener first, so that we know the actual apiAddr if the user - // specifies port :0 - l, err := net.Listen("tcp", *apiAddr) - if err != nil { - log.Fatal("failed to create listener", err) - } - shutdownFns = append(shutdownFns, func(_ context.Context) error { - _ = l.Close() - return nil - }) - *apiAddr = "http://" + l.Addr().String() - - auth := jape.BasicAuth(GetAPIPassword()) - mux := treeMux{ - h: createUIHandler(), - sub: make(map[string]treeMux), - } - - // Create logger. - renterdLog := filepath.Join(*dir, "renterd.log") - logger, closeFn, err := node.NewLogger(renterdLog) - if err != nil { - log.Fatal("failed to create logger", err) - } - shutdownFns = append(shutdownFns, closeFn) - - busAddr, busPassword := busCfg.remoteAddr, busCfg.apiPassword - if busAddr == "" { - b, shutdownFn, err := node.NewBus(busCfg.BusConfig, *dir, GetSeed(), logger) - if err != nil { - log.Fatal("failed to create bus, err: ", err) - } - shutdownFns = append(shutdownFns, shutdownFn) - - mux.sub["/api/bus"] = treeMux{h: auth(b)} - busAddr = *apiAddr + "/api/bus" - busPassword = GetAPIPassword() - } else { - fmt.Println("connecting to remote bus at", busAddr) - } - bc := bus.NewClient(busAddr, busPassword) - - var workers []autopilot.Worker - workerAddrs, workerPassword := workerCfg.remoteAddrs, workerCfg.apiPassword - if workerAddrs == "" { - if workerCfg.enabled { - w, shutdownFn, err := node.NewWorker(workerCfg.WorkerConfig, bc, GetSeed(), logger) - if err != nil { - log.Fatal("failed to create worker", err) - } - shutdownFns = append(shutdownFns, shutdownFn) - - mux.sub["/api/worker"] = treeMux{h: auth(w)} - workerAddr := *apiAddr + "/api/worker" - workerPassword = GetAPIPassword() - workers = append(workers, worker.NewClient(workerAddr, workerPassword)) - } - } else { - // TODO: all workers use the same password. Figure out a nice way to - // have individual passwords. - workerAddrsSplit := strings.Split(workerAddrs, ";") - for _, workerAddr := range workerAddrsSplit { - workers = append(workers, worker.NewClient(workerAddr, workerPassword)) - fmt.Println("connecting to remote worker at", workerAddr) - } - } - - autopilotErr := make(chan error, 1) - if autopilotCfg.enabled { - autopilotDir := filepath.Join(*dir, "autopilot") - if err := os.MkdirAll(autopilotDir, 0700); err != nil { - log.Fatal("failed to create autopilot dir", err) - } - - s, err := stores.NewJSONAutopilotStore(autopilotDir) - if err != nil { - log.Fatal("failed to create JSON autopilot store", err) - } - - ap, runFn, shutdownFn, err := node.NewAutopilot(autopilotCfg.AutopilotConfig, s, bc, workers, logger) - if err != nil { - log.Fatal("failed to create autopilot", err) - } - // NOTE: the autopilot shutdown function is not added to the shutdown - // functions array because it needs to be called first - autopilotShutdownFn = shutdownFn - - go func() { autopilotErr <- runFn() }() - mux.sub["/api/autopilot"] = treeMux{h: auth(ap)} - } - - srv := &http.Server{Handler: mux} - go srv.Serve(l) - log.Println("api: Listening on", l.Addr()) - - syncerAddress, err := bc.SyncerAddress(context.Background()) - if err != nil { - log.Fatal("failed to fetch syncer address", err) - } - log.Println("bus: Listening on", syncerAddress) - - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - - ready <- true - - select { - case <-signalCh: - log.Println("Shutting down...") - shutdownFns = append(shutdownFns, srv.Shutdown) - case err := <-autopilotErr: - log.Fatalln("Fatal autopilot error:", err) - } - - // Shut down the autopilot first, then the rest of the services in reverse order. - ctx, cancel := context.WithTimeout(context.Background(), nodeCfg.shutdownTimeout) - defer cancel() - if autopilotShutdownFn != nil { - if err := autopilotShutdownFn(ctx); err != nil { - log.Fatalf("Failed to shut down autopilot: %v", err) - } - } - for i := len(shutdownFns) - 1; i >= 0; i-- { - if err := shutdownFns[i](ctx); err != nil { - log.Fatalf("Shutdown function %v failed: %v", i+1, err) - } - } - - shutdown <- true -} - -func GetApiAddr() string { - return *apiAddr -} - -func Ready() bool { - if readyFired { - return true - } - - readyFired = <-ready - - return true -} - -func ShutdownComplete() bool { - if shutdownFired { - return true - } - - shutdownFired = <-shutdown - - return true -} diff --git a/renterd/web.go b/renterd/web.go deleted file mode 100644 index 22fb25d..0000000 --- a/renterd/web.go +++ /dev/null @@ -1,52 +0,0 @@ -package renterd - -import ( - "errors" - "io/fs" - "net/http" - _ "net/http/pprof" - "strings" - - "go.sia.tech/web/renterd/ui" -) - -type clientRouterFS struct { - fs fs.FS -} - -func (cr *clientRouterFS) Open(name string) (fs.File, error) { - f, err := cr.fs.Open(name) - if errors.Is(err, fs.ErrNotExist) { - return cr.fs.Open("index.html") - } - return f, err -} - -func createUIHandler() http.Handler { - assets, err := fs.Sub(ui.Assets, "assets") - if err != nil { - panic(err) - } - return http.FileServer(http.FS(&clientRouterFS{fs: assets})) -} - -type treeMux struct { - h http.Handler - sub map[string]treeMux -} - -func (t treeMux) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if strings.HasPrefix(req.URL.Path, "/debug/pprof") { - http.DefaultServeMux.ServeHTTP(w, req) - return - } - - for prefix, c := range t.sub { - if strings.HasPrefix(req.URL.Path, prefix) { - req.URL.Path = strings.TrimPrefix(req.URL.Path, prefix) - c.ServeHTTP(w, req) - return - } - } - t.h.ServeHTTP(w, req) -} diff --git a/service/files/files.go b/service/files/files.go index 2371a47..f73303c 100644 --- a/service/files/files.go +++ b/service/files/files.go @@ -10,10 +10,10 @@ import ( "git.lumeweb.com/LumeWeb/portal/db" "git.lumeweb.com/LumeWeb/portal/logger" "git.lumeweb.com/LumeWeb/portal/model" - "git.lumeweb.com/LumeWeb/portal/renterd" "git.lumeweb.com/LumeWeb/portal/shared" "git.lumeweb.com/LumeWeb/portal/tusstore" "github.com/go-resty/resty/v2" + "github.com/spf13/viper" _ "github.com/tus/tusd/pkg/handler" "go.uber.org/zap" "io" @@ -24,8 +24,8 @@ var client *resty.Client func Init() { client = resty.New() - client.SetBaseURL(renterd.GetApiAddr() + "/api") - client.SetBasicAuth("", renterd.GetAPIPassword()) + client.SetBaseURL("http://localhost:9980/api") + client.SetBasicAuth("", viper.GetString("renterd-api-password")) client.SetDisableWarn(true) }