360 lines
9.3 KiB
Go
360 lines
9.3 KiB
Go
|
package node
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"errors"
|
||
|
"log"
|
||
|
"net/http"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"gitlab.com/NebulousLabs/encoding"
|
||
|
"go.sia.tech/core/consensus"
|
||
|
"go.sia.tech/core/types"
|
||
|
"go.sia.tech/renterd/autopilot"
|
||
|
"go.sia.tech/renterd/bus"
|
||
|
"go.sia.tech/renterd/stores"
|
||
|
"go.sia.tech/renterd/wallet"
|
||
|
"go.sia.tech/renterd/worker"
|
||
|
"go.sia.tech/siad/modules"
|
||
|
mconsensus "go.sia.tech/siad/modules/consensus"
|
||
|
"go.sia.tech/siad/modules/gateway"
|
||
|
"go.sia.tech/siad/modules/transactionpool"
|
||
|
stypes "go.sia.tech/siad/types"
|
||
|
"go.uber.org/zap"
|
||
|
"go.uber.org/zap/zapcore"
|
||
|
"golang.org/x/crypto/blake2b"
|
||
|
"gorm.io/gorm"
|
||
|
)
|
||
|
|
||
|
type WorkerConfig struct {
|
||
|
ID string
|
||
|
BusFlushInterval time.Duration
|
||
|
ContractLockTimeout time.Duration
|
||
|
SessionLockTimeout time.Duration
|
||
|
SessionReconnectTimeout time.Duration
|
||
|
SessionTTL time.Duration
|
||
|
DownloadSectorTimeout time.Duration
|
||
|
UploadSectorTimeout time.Duration
|
||
|
DownloadMaxOverdrive int
|
||
|
UploadMaxOverdrive int
|
||
|
}
|
||
|
|
||
|
type BusConfig struct {
|
||
|
Bootstrap bool
|
||
|
GatewayAddr string
|
||
|
Network *consensus.Network
|
||
|
Miner *Miner
|
||
|
PersistInterval time.Duration
|
||
|
|
||
|
DBDialector gorm.Dialector
|
||
|
}
|
||
|
|
||
|
type AutopilotConfig struct {
|
||
|
AccountsRefillInterval time.Duration
|
||
|
Heartbeat time.Duration
|
||
|
MigrationHealthCutoff float64
|
||
|
ScannerInterval time.Duration
|
||
|
ScannerBatchSize uint64
|
||
|
ScannerMinRecentFailures uint64
|
||
|
ScannerNumThreads uint64
|
||
|
}
|
||
|
|
||
|
type ShutdownFn = func(context.Context) error
|
||
|
|
||
|
func convertToSiad(core types.EncoderTo, siad encoding.SiaUnmarshaler) {
|
||
|
var buf bytes.Buffer
|
||
|
e := types.NewEncoder(&buf)
|
||
|
core.EncodeTo(e)
|
||
|
e.Flush()
|
||
|
if err := siad.UnmarshalSia(&buf); err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func convertToCore(siad encoding.SiaMarshaler, core types.DecoderFrom) {
|
||
|
var buf bytes.Buffer
|
||
|
siad.MarshalSia(&buf)
|
||
|
d := types.NewBufDecoder(buf.Bytes())
|
||
|
core.DecodeFrom(d)
|
||
|
if d.Err() != nil {
|
||
|
panic(d.Err())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type chainManager struct {
|
||
|
cs modules.ConsensusSet
|
||
|
network *consensus.Network
|
||
|
}
|
||
|
|
||
|
func (cm chainManager) AcceptBlock(ctx context.Context, b types.Block) error {
|
||
|
var sb stypes.Block
|
||
|
convertToSiad(b, &sb)
|
||
|
return cm.cs.AcceptBlock(sb)
|
||
|
}
|
||
|
|
||
|
func (cm chainManager) Synced(ctx context.Context) bool {
|
||
|
return cm.cs.Synced()
|
||
|
}
|
||
|
|
||
|
func (cm chainManager) TipState(ctx context.Context) consensus.State {
|
||
|
return consensus.State{
|
||
|
Network: cm.network,
|
||
|
Index: types.ChainIndex{
|
||
|
Height: uint64(cm.cs.Height()),
|
||
|
ID: types.BlockID(cm.cs.CurrentBlock().ID()),
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type syncer struct {
|
||
|
g modules.Gateway
|
||
|
tp modules.TransactionPool
|
||
|
}
|
||
|
|
||
|
func (s syncer) Addr() string {
|
||
|
return string(s.g.Address())
|
||
|
}
|
||
|
|
||
|
func (s syncer) Peers() []string {
|
||
|
var peers []string
|
||
|
for _, p := range s.g.Peers() {
|
||
|
peers = append(peers, string(p.NetAddress))
|
||
|
}
|
||
|
return peers
|
||
|
}
|
||
|
|
||
|
func (s syncer) Connect(addr string) error {
|
||
|
return s.g.Connect(modules.NetAddress(addr))
|
||
|
}
|
||
|
|
||
|
func (s syncer) BroadcastTransaction(txn types.Transaction, dependsOn []types.Transaction) {
|
||
|
txnSet := make([]stypes.Transaction, len(dependsOn)+1)
|
||
|
for i, txn := range dependsOn {
|
||
|
convertToSiad(txn, &txnSet[i])
|
||
|
}
|
||
|
convertToSiad(txn, &txnSet[len(txnSet)-1])
|
||
|
s.tp.Broadcast(txnSet)
|
||
|
}
|
||
|
|
||
|
func (s syncer) SyncerAddress(ctx context.Context) (string, error) {
|
||
|
return string(s.g.Address()), nil
|
||
|
}
|
||
|
|
||
|
type txpool struct {
|
||
|
tp modules.TransactionPool
|
||
|
}
|
||
|
|
||
|
func (tp txpool) RecommendedFee() (fee types.Currency) {
|
||
|
_, max := tp.tp.FeeEstimation()
|
||
|
convertToCore(&max, &fee)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (tp txpool) Transactions() []types.Transaction {
|
||
|
stxns := tp.tp.Transactions()
|
||
|
txns := make([]types.Transaction, len(stxns))
|
||
|
for i := range txns {
|
||
|
convertToCore(&stxns[i], &txns[i])
|
||
|
}
|
||
|
return txns
|
||
|
}
|
||
|
|
||
|
func (tp txpool) AddTransactionSet(txns []types.Transaction) error {
|
||
|
stxns := make([]stypes.Transaction, len(txns))
|
||
|
for i := range stxns {
|
||
|
convertToSiad(&txns[i], &stxns[i])
|
||
|
}
|
||
|
return tp.tp.AcceptTransactionSet(stxns)
|
||
|
}
|
||
|
|
||
|
func (tp txpool) UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error) {
|
||
|
pool := tp.Transactions()
|
||
|
outputToParent := make(map[types.SiacoinOutputID]*types.Transaction)
|
||
|
for i, txn := range pool {
|
||
|
for j := range txn.SiacoinOutputs {
|
||
|
outputToParent[txn.SiacoinOutputID(j)] = &pool[i]
|
||
|
}
|
||
|
}
|
||
|
var parents []types.Transaction
|
||
|
seen := make(map[types.TransactionID]bool)
|
||
|
for _, sci := range txn.SiacoinInputs {
|
||
|
if parent, ok := outputToParent[sci.ParentID]; ok {
|
||
|
if txid := parent.ID(); !seen[txid] {
|
||
|
seen[txid] = true
|
||
|
parents = append(parents, *parent)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return parents, nil
|
||
|
}
|
||
|
|
||
|
func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) {
|
||
|
gatewayDir := filepath.Join(dir, "gateway")
|
||
|
if err := os.MkdirAll(gatewayDir, 0700); err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
g, err := gateway.New(cfg.GatewayAddr, cfg.Bootstrap, gatewayDir)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
consensusDir := filepath.Join(dir, "consensus")
|
||
|
if err := os.MkdirAll(consensusDir, 0700); err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
cs, errCh := mconsensus.New(g, cfg.Bootstrap, consensusDir)
|
||
|
select {
|
||
|
case err := <-errCh:
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
default:
|
||
|
go func() {
|
||
|
if err := <-errCh; err != nil {
|
||
|
log.Println("WARNING: consensus initialization returned an error:", err)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
tpoolDir := filepath.Join(dir, "transactionpool")
|
||
|
if err := os.MkdirAll(tpoolDir, 0700); err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
tp, err := transactionpool.New(cs, g, tpoolDir)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
|
||
|
walletDir := filepath.Join(dir, "wallet")
|
||
|
if err := os.MkdirAll(walletDir, 0700); err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
walletAddr := wallet.StandardAddress(seed.PublicKey())
|
||
|
ws, ccid, err := stores.NewJSONWalletStore(walletDir, walletAddr)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
} else if err := cs.ConsensusSetSubscribe(ws, ccid, nil); err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
w := wallet.NewSingleAddressWallet(seed, ws)
|
||
|
|
||
|
// If no DB dialector was provided, use SQLite.
|
||
|
dbConn := cfg.DBDialector
|
||
|
if dbConn == nil {
|
||
|
dbDir := filepath.Join(dir, "db")
|
||
|
if err := os.MkdirAll(dbDir, 0700); err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
dbConn = stores.NewSQLiteConnection(filepath.Join(dbDir, "db.sqlite"))
|
||
|
}
|
||
|
|
||
|
sqlLogger := stores.NewSQLLogger(l.Named("db"), nil)
|
||
|
sqlStore, ccid, err := stores.NewSQLStore(dbConn, true, cfg.PersistInterval, sqlLogger)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
} else if err := cs.ConsensusSetSubscribe(sqlStore, ccid, nil); err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
|
||
|
if m := cfg.Miner; m != nil {
|
||
|
if err := cs.ConsensusSetSubscribe(m, ccid, nil); err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
tp.TransactionPoolSubscribe(m)
|
||
|
}
|
||
|
|
||
|
b, err := bus.New(syncer{g, tp}, chainManager{cs: cs, network: cfg.Network}, txpool{tp}, w, sqlStore, sqlStore, sqlStore, sqlStore, l)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
|
||
|
shutdownFn := func(ctx context.Context) error {
|
||
|
return joinErrors([]error{
|
||
|
g.Close(),
|
||
|
cs.Close(),
|
||
|
tp.Close(),
|
||
|
b.Shutdown(ctx),
|
||
|
sqlStore.Close(),
|
||
|
})
|
||
|
}
|
||
|
return b.Handler(), shutdownFn, nil
|
||
|
}
|
||
|
|
||
|
func NewWorker(cfg WorkerConfig, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) {
|
||
|
workerKey := blake2b.Sum256(append([]byte("worker"), seed...))
|
||
|
w := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.SessionLockTimeout, cfg.SessionReconnectTimeout, cfg.SessionTTL, cfg.BusFlushInterval, cfg.DownloadSectorTimeout, cfg.UploadSectorTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, l)
|
||
|
return w.Handler(), w.Shutdown, nil
|
||
|
}
|
||
|
|
||
|
func NewAutopilot(cfg AutopilotConfig, s autopilot.Store, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, func() error, ShutdownFn, error) {
|
||
|
ap, err := autopilot.New(s, b, workers, l, cfg.Heartbeat, cfg.ScannerInterval, cfg.ScannerBatchSize, cfg.ScannerMinRecentFailures, cfg.ScannerNumThreads, cfg.MigrationHealthCutoff, cfg.AccountsRefillInterval)
|
||
|
if err != nil {
|
||
|
return nil, nil, nil, err
|
||
|
}
|
||
|
return ap.Handler(), ap.Run, ap.Shutdown, nil
|
||
|
}
|
||
|
|
||
|
func NewLogger(path string) (*zap.Logger, func(context.Context) error, error) {
|
||
|
writer, closeFn, err := zap.Open(path)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
|
||
|
// console
|
||
|
config := zap.NewProductionEncoderConfig()
|
||
|
config.EncodeTime = zapcore.RFC3339TimeEncoder
|
||
|
config.EncodeLevel = zapcore.CapitalColorLevelEncoder
|
||
|
config.StacktraceKey = ""
|
||
|
consoleEncoder := zapcore.NewConsoleEncoder(config)
|
||
|
|
||
|
// file
|
||
|
config = zap.NewProductionEncoderConfig()
|
||
|
config.EncodeTime = zapcore.RFC3339TimeEncoder
|
||
|
config.CallerKey = "" // hide
|
||
|
config.StacktraceKey = "" // hide
|
||
|
config.NameKey = "component"
|
||
|
config.TimeKey = "date"
|
||
|
fileEncoder := zapcore.NewJSONEncoder(config)
|
||
|
|
||
|
core := zapcore.NewTee(
|
||
|
zapcore.NewCore(fileEncoder, writer, zapcore.DebugLevel),
|
||
|
zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), zapcore.DebugLevel),
|
||
|
)
|
||
|
|
||
|
logger := zap.New(
|
||
|
core,
|
||
|
zap.AddCaller(),
|
||
|
zap.AddStacktrace(zapcore.ErrorLevel),
|
||
|
)
|
||
|
|
||
|
return logger, func(_ context.Context) error {
|
||
|
_ = logger.Sync() // ignore Error
|
||
|
closeFn()
|
||
|
return nil
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func joinErrors(errs []error) error {
|
||
|
filtered := errs[:0]
|
||
|
for _, err := range errs {
|
||
|
if err != nil {
|
||
|
filtered = append(filtered, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
switch len(filtered) {
|
||
|
case 0:
|
||
|
return nil
|
||
|
case 1:
|
||
|
return filtered[0]
|
||
|
default:
|
||
|
strs := make([]string, len(filtered))
|
||
|
for i := range strs {
|
||
|
strs[i] = filtered[i].Error()
|
||
|
}
|
||
|
return errors.New(strings.Join(strs, ";"))
|
||
|
}
|
||
|
}
|