From eefbfa06d074149947a8488247ea8161612e0f7b Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 6 Jan 2024 10:54:03 -0500 Subject: [PATCH] feat: initial node scoring support --- protocol/protocol.go | 17 ++++++++- service/p2p.go | 91 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index c68b40c..786a29e 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,6 +1,9 @@ package protocol -import "crypto/rand" +import ( + "crypto/rand" + "math" +) func GenerateChallenge() []byte { challenge := make([]byte, 32) @@ -11,3 +14,15 @@ func GenerateChallenge() []byte { return challenge } + +func CalculateNodeScore(goodResponses, badResponses int) float64 { + totalVotes := goodResponses + badResponses + if totalVotes == 0 { + return 0.5 + } + + average := float64(goodResponses) / float64(totalVotes) + score := average - (average-0.5)*math.Pow(2, -math.Log(float64(totalVotes+1))) + + return score +} diff --git a/service/p2p.go b/service/p2p.go index 8682950..d80c2b3 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -13,10 +13,13 @@ import ( bolt "go.etcd.io/bbolt" "go.uber.org/zap" "net/url" + "sort" "time" ) var _ Service = (*P2P)(nil) +var _ msgpack.CustomEncoder = (*nodeVotes)(nil) +var _ msgpack.CustomDecoder = (*nodeVotes)(nil) var ( errUnsupportedProtocol = errors.New("unsupported protocol") @@ -37,6 +40,42 @@ type P2P struct { peers *structs.Map } +type nodeVotes struct { + good int + bad int +} + +func (n nodeVotes) EncodeMsgpack(enc *msgpack.Encoder) error { + err := enc.EncodeInt(int64(n.good)) + if err != nil { + return err + } + + err = enc.EncodeInt(int64(n.bad)) + if err != nil { + return err + } + + return nil +} + +func (n *nodeVotes) DecodeMsgpack(dec *msgpack.Decoder) error { + good, err := dec.DecodeInt() + if err != nil { + return err + } + + bad, err := dec.DecodeInt() + if err != nil { + return err + } + + n.good = good + n.bad = bad + + return nil +} + func NewP2P(node *libs5_go.Node) *P2P { service := &P2P{ logger: node.Logger(), @@ -246,7 +285,7 @@ func (p *P2P) onNewPeerListen(peer *net.Peer, verifyId bool) { handler, ok := protocol.GetMessageType(imsg.GetKind()) if ok { - + imsg.SetOriginal(message) handler.SetIncomingMessage(imsg) err := msgpack.Unmarshal(imsg.Data(), handler) @@ -265,4 +304,54 @@ func (p *P2P) onNewPeerListen(peer *net.Peer, verifyId bool) { OnError: &onError, Logger: p.logger, }) + +} + +func (p *P2P) readNodeScore(nodeId *encoding.NodeId) (nodeVotes, error) { + node := p.nodesBucket.Get(nodeId.Raw()) + if node == nil { + return nodeVotes{}, nil + } + + var score nodeVotes + err := msgpack.Unmarshal(node, &score) + if err != nil { + + } + + return score, nil +} + +func (p *P2P) getNodeScore(nodeId *encoding.NodeId) (float64, error) { + if nodeId.Equals(p.localNodeID) { + return 1, nil + } + + score, err := p.readNodeScore(nodeId) + if err != nil { + return 0.5, err + } + + return protocol.CalculateNodeScore(score.good, score.bad), nil + +} +func (p *P2P) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) { + scores := make(map[encoding.NodeIdCode]float64) + var errOccurred error + + for _, nodeId := range nodes { + score, err := p.getNodeScore(nodeId) + if err != nil { + errOccurred = err + scores[nodeId.HashCode()] = 0 // You may choose a different default value for error cases + } else { + scores[nodeId.HashCode()] = score + } + } + + sort.Slice(nodes, func(i, j int) bool { + return scores[nodes[i].HashCode()] > scores[nodes[j].HashCode()] + }) + + return nodes, errOccurred }