diff --git a/config/config.go b/config/config.go index 3b90c15..cf1949f 100644 --- a/config/config.go +++ b/config/config.go @@ -14,8 +14,9 @@ type NodeConfig struct { HTTP HTTPConfig } type P2PConfig struct { - Network string - Peers PeersConfig + Network string + Peers PeersConfig + MaxOutgoingPeerFailures uint } type PeersConfig struct { diff --git a/interfaces/p2p.go b/interfaces/p2p.go index 63d5863..e389d09 100644 --- a/interfaces/p2p.go +++ b/interfaces/p2p.go @@ -12,7 +12,7 @@ import ( type P2PService interface { Peers() structs.Map - ConnectToNode(connectionUris []*url.URL, retried bool) error + ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error OnNewPeer(peer net.Peer, verifyId bool) error OnNewPeerListen(peer net.Peer, verifyId bool) GetNodeScore(nodeId *encoding.NodeId) (float64, error) diff --git a/net/peer.go b/net/peer.go index b9efc4a..f592509 100644 --- a/net/peer.go +++ b/net/peer.go @@ -45,6 +45,7 @@ type Peer interface { ConnectionURIs() []*url.URL IsHandshakeDone() bool SetHandshakeDone(status bool) + GetIP() string } type BasePeer struct { @@ -79,6 +80,10 @@ func (b *BasePeer) ListenForMessages(callback EventCallback, options ListenerOpt func (b *BasePeer) End() error { panic("must implement in child class") } +func (b *BasePeer) GetIP() string { + //TODO implement me + panic("must implement in child class") +} func (b *BasePeer) Challenge() []byte { return b.challenge diff --git a/protocol/signed/accounce_peers.go b/protocol/signed/accounce_peers.go index 6c84872..4115dea 100644 --- a/protocol/signed/accounce_peers.go +++ b/protocol/signed/accounce_peers.go @@ -108,7 +108,7 @@ func (a *AnnouncePeers) DecodeMessage(dec *msgpack.Decoder) error { func (a AnnouncePeers) HandleMessage(node interfaces.Node, peer net.Peer, verifyId bool) error { if len(a.connectionUris) > 0 { - err := node.Services().P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false) + err := node.Services().P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer) if err != nil { return err } diff --git a/service/p2p.go b/service/p2p.go index 01df1db..074ed6d 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -33,17 +33,22 @@ var ( const nodeBucketName = "nodes" type P2PImpl struct { - logger *zap.Logger - nodeKeyPair *ed25519.KeyPairEd25519 - localNodeID *encoding.NodeId - networkID string - nodesBucket *bolt.Bucket - node interfaces.Node - inited bool - reconnectDelay structs.Map - peers structs.Map - peersPending structs.Map - selfConnectionUris []*url.URL + logger *zap.Logger + nodeKeyPair *ed25519.KeyPairEd25519 + localNodeID *encoding.NodeId + networkID string + nodesBucket *bolt.Bucket + node interfaces.Node + inited bool + reconnectDelay structs.Map + peers structs.Map + peersPending structs.Map + selfConnectionUris []*url.URL + outgoingPeerBlocklist structs.Map + incomingPeerBlockList structs.Map + incomingIPBlocklist structs.Map + outgoingPeerFailures structs.Map + maxOutgoingPeerFailures uint } func NewP2P(node interfaces.Node) *P2PImpl { @@ -53,15 +58,19 @@ func NewP2P(node interfaces.Node) *P2PImpl { } service := &P2PImpl{ - logger: node.Logger(), - nodeKeyPair: node.Config().KeyPair, - networkID: node.Config().P2P.Network, - node: node, - inited: false, - reconnectDelay: structs.NewMap(), - peers: structs.NewMap(), - peersPending: structs.NewMap(), - selfConnectionUris: []*url.URL{uri}, + logger: node.Logger(), + nodeKeyPair: node.Config().KeyPair, + networkID: node.Config().P2P.Network, + node: node, + inited: false, + reconnectDelay: structs.NewMap(), + peers: structs.NewMap(), + peersPending: structs.NewMap(), + selfConnectionUris: []*url.URL{uri}, + outgoingPeerBlocklist: structs.NewMap(), + incomingPeerBlockList: structs.NewMap(), + incomingIPBlocklist: structs.NewMap(), + maxOutgoingPeerFailures: node.Config().P2P.MaxOutgoingPeerFailures, } return service @@ -92,7 +101,7 @@ func (p *P2PImpl) Start() error { peer := peer go func() { - err := p.ConnectToNode([]*url.URL{u}, false) + err := p.ConnectToNode([]*url.URL{u}, false, nil) if err != nil { p.logger.Error("failed to connect to initial peer", zap.Error(err), zap.String("peer", peer)) } @@ -123,7 +132,7 @@ func (p *P2PImpl) Init() error { return nil } -func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error { +func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error { if !p.Node().IsStarted() { return nil } @@ -179,6 +188,11 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error { return nil } + if p.outgoingPeerFailures.Contains(idString) { + p.logger.Error("outgoing peer is on blocklist", zap.String("node", connectionUri.String())) + return nil + } + reconnectDelay := p.reconnectDelay.GetInt(idString) if reconnectDelay == nil { delay := 1 @@ -195,6 +209,28 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error { if err != nil { if retried { p.logger.Error("failed to connect, too many retries", zap.String("node", connectionUri.String()), zap.Error(err)) + counter := uint(0) + if p.outgoingPeerFailures.Contains(idString) { + tmp := *p.outgoingPeerFailures.GetInt(idString) + counter = uint(tmp) + } + + counter++ + + p.outgoingPeerFailures.Put(idString, counter) + + if counter >= p.maxOutgoingPeerFailures { + fromPeerId, err := fromPeer.Id().ToString() + if err != nil { + return err + } + p.incomingPeerBlockList.Put(idString, fromPeerId) + err = fromPeer.End() + if err != nil { + return err + } + } + return nil } retried = true @@ -211,7 +247,11 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error { time.Sleep(time.Duration(delayDeref) * time.Second) - return p.ConnectToNode(connectionUris, retried) + return p.ConnectToNode(connectionUris, retried, fromPeer) + } + + if p.outgoingPeerFailures.Contains(idString) { + p.outgoingPeerFailures.Remove(idString) } peer, err := net.CreateTransportPeer(scheme, &net.TransportPeerConfig{ @@ -256,6 +296,25 @@ func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error { pid = "unknown" } + pip := peer.GetIP() + + if p.incomingIPBlocklist.Contains(pid) { + p.logger.Error("peer is on identity blocklist", zap.String("peer", pid)) + err := peer.End() + if err != nil { + return err + } + return nil + } + if p.incomingPeerBlockList.Contains(pip) { + p.logger.Debug("peer is on ip blocklist", zap.String("peer", pid), zap.String("ip", pip)) + err := peer.End() + if err != nil { + return err + } + return nil + } + p.logger.Debug("OnNewPeer started", zap.String("peer", pid)) challenge := protocol.GenerateChallenge()