feat: add incoming and outgoing peer blocking to handle abuse

This commit is contained in:
Derrick Hammer 2024-01-15 10:54:31 -05:00
parent d79455c68c
commit 883f50b198
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
5 changed files with 92 additions and 27 deletions

View File

@ -16,6 +16,7 @@ type NodeConfig struct {
type P2PConfig struct {
Network string
Peers PeersConfig
MaxOutgoingPeerFailures uint
}
type PeersConfig struct {

View File

@ -12,7 +12,7 @@ import (
type P2PService interface {
Peers() structs.Map
ConnectToNode(connectionUris []*url.URL, retried bool) error
ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error
OnNewPeer(peer net.Peer, verifyId bool) error
OnNewPeerListen(peer net.Peer, verifyId bool)
GetNodeScore(nodeId *encoding.NodeId) (float64, error)

View File

@ -45,6 +45,7 @@ type Peer interface {
ConnectionURIs() []*url.URL
IsHandshakeDone() bool
SetHandshakeDone(status bool)
GetIP() string
}
type BasePeer struct {
@ -79,6 +80,10 @@ func (b *BasePeer) ListenForMessages(callback EventCallback, options ListenerOpt
func (b *BasePeer) End() error {
panic("must implement in child class")
}
func (b *BasePeer) GetIP() string {
//TODO implement me
panic("must implement in child class")
}
func (b *BasePeer) Challenge() []byte {
return b.challenge

View File

@ -108,7 +108,7 @@ func (a *AnnouncePeers) DecodeMessage(dec *msgpack.Decoder) error {
func (a AnnouncePeers) HandleMessage(node interfaces.Node, peer net.Peer, verifyId bool) error {
if len(a.connectionUris) > 0 {
err := node.Services().P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false)
err := node.Services().P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer)
if err != nil {
return err
}

View File

@ -44,6 +44,11 @@ type P2PImpl struct {
peers structs.Map
peersPending structs.Map
selfConnectionUris []*url.URL
outgoingPeerBlocklist structs.Map
incomingPeerBlockList structs.Map
incomingIPBlocklist structs.Map
outgoingPeerFailures structs.Map
maxOutgoingPeerFailures uint
}
func NewP2P(node interfaces.Node) *P2PImpl {
@ -62,6 +67,10 @@ func NewP2P(node interfaces.Node) *P2PImpl {
peers: structs.NewMap(),
peersPending: structs.NewMap(),
selfConnectionUris: []*url.URL{uri},
outgoingPeerBlocklist: structs.NewMap(),
incomingPeerBlockList: structs.NewMap(),
incomingIPBlocklist: structs.NewMap(),
maxOutgoingPeerFailures: node.Config().P2P.MaxOutgoingPeerFailures,
}
return service
@ -92,7 +101,7 @@ func (p *P2PImpl) Start() error {
peer := peer
go func() {
err := p.ConnectToNode([]*url.URL{u}, false)
err := p.ConnectToNode([]*url.URL{u}, false, nil)
if err != nil {
p.logger.Error("failed to connect to initial peer", zap.Error(err), zap.String("peer", peer))
}
@ -123,7 +132,7 @@ func (p *P2PImpl) Init() error {
return nil
}
func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error {
func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error {
if !p.Node().IsStarted() {
return nil
}
@ -179,6 +188,11 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error {
return nil
}
if p.outgoingPeerFailures.Contains(idString) {
p.logger.Error("outgoing peer is on blocklist", zap.String("node", connectionUri.String()))
return nil
}
reconnectDelay := p.reconnectDelay.GetInt(idString)
if reconnectDelay == nil {
delay := 1
@ -195,6 +209,28 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error {
if err != nil {
if retried {
p.logger.Error("failed to connect, too many retries", zap.String("node", connectionUri.String()), zap.Error(err))
counter := uint(0)
if p.outgoingPeerFailures.Contains(idString) {
tmp := *p.outgoingPeerFailures.GetInt(idString)
counter = uint(tmp)
}
counter++
p.outgoingPeerFailures.Put(idString, counter)
if counter >= p.maxOutgoingPeerFailures {
fromPeerId, err := fromPeer.Id().ToString()
if err != nil {
return err
}
p.incomingPeerBlockList.Put(idString, fromPeerId)
err = fromPeer.End()
if err != nil {
return err
}
}
return nil
}
retried = true
@ -211,7 +247,11 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error {
time.Sleep(time.Duration(delayDeref) * time.Second)
return p.ConnectToNode(connectionUris, retried)
return p.ConnectToNode(connectionUris, retried, fromPeer)
}
if p.outgoingPeerFailures.Contains(idString) {
p.outgoingPeerFailures.Remove(idString)
}
peer, err := net.CreateTransportPeer(scheme, &net.TransportPeerConfig{
@ -256,6 +296,25 @@ func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error {
pid = "unknown"
}
pip := peer.GetIP()
if p.incomingIPBlocklist.Contains(pid) {
p.logger.Error("peer is on identity blocklist", zap.String("peer", pid))
err := peer.End()
if err != nil {
return err
}
return nil
}
if p.incomingPeerBlockList.Contains(pip) {
p.logger.Debug("peer is on ip blocklist", zap.String("peer", pid), zap.String("ip", pip))
err := peer.End()
if err != nil {
return err
}
return nil
}
p.logger.Debug("OnNewPeer started", zap.String("peer", pid))
challenge := protocol.GenerateChallenge()