2024-01-30 02:32:13 +00:00
package _default
import (
"bytes"
"context"
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/vmihailenco/msgpack/v5"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"net/url"
"sort"
"sync"
"time"
)
var _ service . P2PService = ( * P2PServiceDefault ) ( nil )
var (
errUnsupportedProtocol = errors . New ( "unsupported protocol" )
errConnectionIdMissingNodeID = errors . New ( "connection id missing node id" )
)
const nodeBucketName = "nodes"
type P2PServiceDefault struct {
nodeKeyPair * ed25519 . KeyPairEd25519
localNodeID * encoding . NodeId
networkID string
nodesBucket * bolt . Bucket
inited bool
reconnectDelay structs . Map
peers structs . Map
peersPending structs . Map
selfConnectionUris [ ] * url . URL
outgoingPeerBlocklist structs . Map
incomingPeerBlockList structs . Map
incomingIPBlocklist structs . Map
outgoingPeerFailures structs . Map
maxOutgoingPeerFailures uint
connections sync . WaitGroup
hashQueryRoutingTable structs . Map
service . ServiceBase
}
func NewP2P ( params service . ServiceParams ) * P2PServiceDefault {
uri , err := url . Parse ( fmt . Sprintf ( "wss://%s:%d/s5/p2p" , params . Config . HTTP . API . Domain , params . Config . HTTP . API . Port ) )
if err != nil {
params . Logger . Fatal ( "failed to parse HTTP API URL" , zap . Error ( err ) )
}
service := & P2PServiceDefault {
nodeKeyPair : params . Config . KeyPair ,
networkID : params . Config . P2P . Network ,
inited : false ,
reconnectDelay : structs . NewMap ( ) ,
peers : structs . NewMap ( ) ,
peersPending : structs . NewMap ( ) ,
selfConnectionUris : [ ] * url . URL { uri } ,
outgoingPeerBlocklist : structs . NewMap ( ) ,
incomingPeerBlockList : structs . NewMap ( ) ,
incomingIPBlocklist : structs . NewMap ( ) ,
outgoingPeerFailures : structs . NewMap ( ) ,
maxOutgoingPeerFailures : params . Config . P2P . MaxOutgoingPeerFailures ,
hashQueryRoutingTable : structs . NewMap ( ) ,
ServiceBase : service . NewServiceBase ( params . Logger , params . Config , params . Db ) ,
}
return service
}
func ( p * P2PServiceDefault ) SelfConnectionUris ( ) [ ] * url . URL {
return p . selfConnectionUris
}
func ( p * P2PServiceDefault ) Peers ( ) structs . Map {
return p . peers
}
2024-02-27 08:28:25 +00:00
func ( p * P2PServiceDefault ) Start ( ctx context . Context ) error {
2024-01-30 02:32:13 +00:00
config := p . Config ( )
if len ( config . P2P . Peers . Initial ) > 0 {
initialPeers := config . P2P . Peers . Initial
for _ , peer := range initialPeers {
u , err := url . Parse ( peer )
if err != nil {
return err
}
peer := peer
go func ( ) {
err := p . ConnectToNode ( [ ] * url . URL { u } , false , nil )
if err != nil {
p . Logger ( ) . Error ( "failed to connect to initial peer" , zap . Error ( err ) , zap . String ( "peer" , peer ) )
}
} ( )
}
}
return nil
}
2024-02-27 08:28:25 +00:00
func ( p * P2PServiceDefault ) Stop ( ctx context . Context ) error {
2024-01-30 02:32:13 +00:00
return nil
}
2024-02-27 08:28:25 +00:00
func ( p * P2PServiceDefault ) Init ( ctx context . Context ) error {
2024-01-30 02:32:13 +00:00
if p . inited {
return nil
}
p . localNodeID = encoding . NewNodeId ( p . nodeKeyPair . PublicKey ( ) )
err := utils . CreateBucket ( nodeBucketName , p . Db ( ) )
if err != nil {
return err
}
p . inited = true
return nil
}
func ( p * P2PServiceDefault ) ConnectToNode ( connectionUris [ ] * url . URL , retried bool , fromPeer net . Peer ) error {
if ! p . Services ( ) . IsStarted ( ) {
2024-02-27 09:10:16 +00:00
if ! p . Services ( ) . IsStarting ( ) {
return nil
}
2024-01-30 02:32:13 +00:00
}
unsupported , _ := url . Parse ( "http://0.0.0.0" )
unsupported . Scheme = "unsupported"
var connectionUri * url . URL
for _ , uri := range connectionUris {
if uri . Scheme == "ws" || uri . Scheme == "wss" {
connectionUri = uri
break
}
}
if connectionUri == nil {
for _ , uri := range connectionUris {
if uri . Scheme == "tcp" {
connectionUri = uri
break
}
}
}
if connectionUri == nil {
connectionUri = unsupported
}
if connectionUri . Scheme == "unsupported" {
return errUnsupportedProtocol
}
scheme := connectionUri . Scheme
if connectionUri . User == nil {
return errConnectionIdMissingNodeID
}
username := connectionUri . User . Username ( )
id , err := encoding . DecodeNodeId ( username )
if err != nil {
return err
}
idString , err := id . ToString ( )
if err != nil {
return err
}
if p . peersPending . Contains ( idString ) || p . peers . Contains ( idString ) {
p . Logger ( ) . Debug ( "already connected" , zap . String ( "node" , connectionUri . String ( ) ) )
return nil
}
if p . outgoingPeerBlocklist . Contains ( idString ) {
p . Logger ( ) . Debug ( "outgoing peer is on blocklist" , zap . String ( "node" , connectionUri . String ( ) ) )
var fromPeerId string
if fromPeer != nil {
blocked := false
if fromPeer . Id ( ) != nil {
fromPeerId , err = fromPeer . Id ( ) . ToString ( )
if err != nil {
return err
}
if ! p . incomingPeerBlockList . Contains ( fromPeerId ) {
p . incomingPeerBlockList . Put ( fromPeerId , true )
blocked = true
}
}
fromPeerIP := fromPeer . GetIP ( )
if ! p . incomingIPBlocklist . Contains ( fromPeerIP ) {
p . incomingIPBlocklist . Put ( fromPeerIP , true )
blocked = true
}
err = fromPeer . EndForAbuse ( )
if err != nil {
return err
}
if blocked {
p . Logger ( ) . Debug ( "blocking peer for sending peer on blocklist" , zap . String ( "node" , connectionUri . String ( ) ) , zap . String ( "peer" , fromPeerId ) , zap . String ( "ip" , fromPeerIP ) )
}
}
return nil
}
reconnectDelay := p . reconnectDelay . GetUInt ( idString )
if reconnectDelay == nil {
delay := uint ( 1 )
reconnectDelay = & delay
}
if id . Equals ( p . localNodeID ) {
return nil
}
p . Logger ( ) . Debug ( "connect" , zap . String ( "node" , connectionUri . String ( ) ) )
socket , err := net . CreateTransportSocket ( scheme , connectionUri )
if err != nil {
if retried {
p . Logger ( ) . Error ( "failed to connect, too many retries" , zap . String ( "node" , connectionUri . String ( ) ) , zap . Error ( err ) )
counter := uint ( 0 )
if p . outgoingPeerFailures . Contains ( idString ) {
tmp := * p . outgoingPeerFailures . GetUInt ( idString )
counter = tmp
}
counter ++
p . outgoingPeerFailures . PutUInt ( idString , counter )
if counter >= p . maxOutgoingPeerFailures {
if fromPeer != nil {
blocked := false
var fromPeerId string
if fromPeer . Id ( ) != nil {
fromPeerId , err = fromPeer . Id ( ) . ToString ( )
if err != nil {
return err
}
if ! p . incomingPeerBlockList . Contains ( fromPeerId ) {
p . incomingPeerBlockList . Put ( fromPeerId , true )
blocked = true
}
}
fromPeerIP := fromPeer . GetIP ( )
if ! p . incomingIPBlocklist . Contains ( fromPeerIP ) {
p . incomingIPBlocklist . Put ( fromPeerIP , true )
blocked = true
}
err = fromPeer . EndForAbuse ( )
if err != nil {
return err
}
if blocked {
p . Logger ( ) . Debug ( "blocking peer for sending peer on blocklist" , zap . String ( "node" , connectionUri . String ( ) ) , zap . String ( "peer" , fromPeerId ) , zap . String ( "ip" , fromPeerIP ) )
}
}
p . outgoingPeerBlocklist . Put ( idString , true )
p . Logger ( ) . Debug ( "blocking peer for too many failures" , zap . String ( "node" , connectionUri . String ( ) ) )
}
return nil
}
retried = true
p . Logger ( ) . Error ( "failed to connect" , zap . String ( "node" , connectionUri . String ( ) ) , zap . Error ( err ) )
delay := p . reconnectDelay . GetUInt ( idString )
if delay == nil {
tmp := uint ( 1 )
delay = & tmp
}
delayDeref := * delay
p . reconnectDelay . PutUInt ( idString , delayDeref * 2 )
time . Sleep ( time . Duration ( delayDeref ) * time . Second )
return p . ConnectToNode ( connectionUris , retried , fromPeer )
}
if p . outgoingPeerFailures . Contains ( idString ) {
p . outgoingPeerFailures . Remove ( idString )
}
peer , err := net . CreateTransportPeer ( scheme , & net . TransportPeerConfig {
Socket : socket ,
Uris : [ ] * url . URL { connectionUri } ,
} )
if err != nil {
return err
}
peer . SetId ( id )
p . Services ( ) . P2P ( ) . ConnectionTracker ( ) . Add ( 1 )
peerId , err := peer . Id ( ) . ToString ( )
if err != nil {
return err
}
p . peersPending . Put ( peerId , peer )
go func ( ) {
err := p . OnNewPeer ( peer , true )
if err != nil && ! peer . Abuser ( ) {
p . Logger ( ) . Error ( "peer error" , zap . Error ( err ) )
}
p . Services ( ) . P2P ( ) . ConnectionTracker ( ) . Done ( )
} ( )
return nil
}
func ( p * P2PServiceDefault ) OnNewPeer ( peer net . Peer , verifyId bool ) error {
var wg sync . WaitGroup
var pid string
if peer . Id ( ) != nil {
pid , _ = peer . Id ( ) . ToString ( )
} else {
pid = "unknown"
}
pip := peer . GetIP ( )
if p . incomingIPBlocklist . Contains ( pid ) {
p . Logger ( ) . Error ( "peer is on identity blocklist" , zap . String ( "peer" , pid ) )
err := peer . EndForAbuse ( )
if err != nil {
return err
}
return nil
}
if p . incomingPeerBlockList . Contains ( pip ) {
p . Logger ( ) . Debug ( "peer is on ip blocklist" , zap . String ( "peer" , pid ) , zap . String ( "ip" , pip ) )
err := peer . EndForAbuse ( )
if err != nil {
return err
}
return nil
}
p . Logger ( ) . Debug ( "OnNewPeer started" , zap . String ( "peer" , pid ) )
challenge := protocol . GenerateChallenge ( )
peer . SetChallenge ( challenge )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
p . OnNewPeerListen ( peer , verifyId )
} ( )
handshakeOpenMsg , err := msgpack . Marshal ( protocol . NewHandshakeOpen ( challenge , p . networkID ) )
if err != nil {
return err
}
err = peer . SendMessage ( handshakeOpenMsg )
if err != nil {
return err
}
p . Logger ( ) . Debug ( "OnNewPeer sent handshake" , zap . String ( "peer" , pid ) )
p . Logger ( ) . Debug ( "OnNewPeer before Wait" , zap . String ( "peer" , pid ) )
wg . Wait ( ) // Wait for OnNewPeerListen goroutine to finish
p . Logger ( ) . Debug ( "OnNewPeer ended" , zap . String ( "peer" , pid ) )
return nil
}
func ( p * P2PServiceDefault ) OnNewPeerListen ( peer net . Peer , verifyId bool ) {
onDone := net . CloseCallback ( func ( ) {
if peer . Id ( ) != nil {
pid , err := peer . Id ( ) . ToString ( )
if err != nil {
p . Logger ( ) . Error ( "failed to get peer id" , zap . Error ( err ) )
return
}
// Handle closure of the connection
if p . peers . Contains ( pid ) {
p . peers . Remove ( pid )
}
if p . peersPending . Contains ( pid ) {
p . peersPending . Remove ( pid )
}
}
} )
onError := net . ErrorCallback ( func ( args ... interface { } ) {
if ! peer . Abuser ( ) {
p . Logger ( ) . Error ( "peer error" , zap . Any ( "args" , args ) )
}
} )
peer . ListenForMessages ( func ( message [ ] byte ) error {
2024-01-30 03:25:21 +00:00
var reader protocol . IncomingMessageReader
2024-01-30 02:32:13 +00:00
err := msgpack . Unmarshal ( message , & reader )
if err != nil {
p . Logger ( ) . Error ( "Error decoding basic message info" , zap . Error ( err ) )
return err
}
// Now, get the specific message handler based on the message kind
2024-01-30 03:38:52 +00:00
handler , ok := protocol . GetMessageType ( reader . Kind )
2024-01-30 02:32:13 +00:00
if ! ok {
p . Logger ( ) . Error ( "Unknown message type" , zap . Int ( "type" , reader . Kind ) )
return fmt . Errorf ( "unknown message type: %d" , reader . Kind )
}
2024-02-28 19:13:44 +00:00
if handler . RequiresHandshake ( ) && ! peer . IsHandshakeDone ( ) {
p . Logger ( ) . Debug ( "Peer is not handshake done, ignoring message" , zap . Any ( "type" , types . ProtocolMethodMap [ types . ProtocolMethod ( reader . Kind ) ] ) )
return nil
}
2024-01-30 03:25:21 +00:00
data := protocol . IncomingMessageData {
2024-01-30 02:32:13 +00:00
Original : message ,
Data : reader . Data ,
Ctx : context . Background ( ) ,
Peer : peer ,
VerifyId : verifyId ,
Config : p . Config ( ) ,
2024-01-30 21:54:13 +00:00
Logger : p . Logger ( ) ,
2024-01-30 05:31:31 +00:00
Mediator : NewMediator ( service . ServiceParams {
Logger : p . Logger ( ) ,
Config : p . Config ( ) ,
Db : p . Db ( ) ,
} ) ,
2024-01-30 02:32:13 +00:00
}
2024-01-30 21:07:22 +00:00
if mediator , ok := data . Mediator . ( service . ServicesSetter ) ; ok {
2024-01-30 21:00:41 +00:00
mediator . SetServices ( p . Services ( ) )
} else {
p . Logger ( ) . Fatal ( "failed to cast mediator to service.Service" )
}
2024-01-30 02:32:13 +00:00
dec := msgpack . NewDecoder ( bytes . NewReader ( reader . Data ) )
err = handler . DecodeMessage ( dec , data )
if err != nil {
p . Logger ( ) . Error ( "Error decoding message" , zap . Error ( err ) )
return err
}
// Directly decode and handle the specific message type
2024-01-30 22:09:05 +00:00
if err = handler . HandleMessage ( data ) ; err != nil {
2024-01-30 02:32:13 +00:00
p . Logger ( ) . Error ( "Error handling message" , zap . Error ( err ) )
return err
}
return nil
} , net . ListenerOptions {
OnClose : & onDone ,
OnError : & onError ,
Logger : p . Logger ( ) ,
} )
}
func ( p * P2PServiceDefault ) readNodeVotes ( nodeId * encoding . NodeId ) ( service . NodeVotes , error ) {
var value [ ] byte
var found bool
err := p . Db ( ) . View ( func ( tx * bolt . Tx ) error {
b := tx . Bucket ( [ ] byte ( nodeBucketName ) )
if b == nil {
return fmt . Errorf ( "Bucket %s not found" , nodeBucketName )
}
value = b . Get ( nodeId . Raw ( ) )
if value == nil {
return nil
}
found = true
return nil
} )
if err != nil {
return nil , err
}
if ! found {
return service . NewNodeVotes ( ) , nil
}
2024-02-29 17:50:20 +00:00
score := service . NewNodeVotes ( )
2024-01-30 02:32:13 +00:00
err = msgpack . Unmarshal ( value , & score )
if err != nil {
return nil , err
}
return score , nil
}
func ( p * P2PServiceDefault ) saveNodeVotes ( nodeId * encoding . NodeId , votes service . NodeVotes ) error {
// Marshal the votes into data
data , err := msgpack . Marshal ( votes )
if err != nil {
return err
}
// Use a transaction to save the data
err = p . Db ( ) . Update ( func ( tx * bolt . Tx ) error {
// Get or create the bucket
b := tx . Bucket ( [ ] byte ( nodeBucketName ) )
// Put the data into the bucket
return b . Put ( nodeId . Raw ( ) , data )
} )
return err
}
func ( p * P2PServiceDefault ) GetNodeScore ( nodeId * encoding . NodeId ) ( float64 , error ) {
if nodeId . Equals ( p . localNodeID ) {
return 1 , nil
}
score , err := p . readNodeVotes ( nodeId )
if err != nil {
return 0.5 , err
}
return protocol . CalculateNodeScore ( score . Good ( ) , score . Bad ( ) ) , nil
}
func ( p * P2PServiceDefault ) SortNodesByScore ( nodes [ ] * encoding . NodeId ) ( [ ] * encoding . NodeId , error ) {
scores := make ( map [ encoding . NodeIdCode ] float64 )
var errOccurred error
for _ , nodeId := range nodes {
score , err := p . GetNodeScore ( nodeId )
if err != nil {
errOccurred = err
scores [ nodeId . HashCode ( ) ] = 0 // You may choose a different default value for error cases
} else {
scores [ nodeId . HashCode ( ) ] = score
}
}
sort . Slice ( nodes , func ( i , j int ) bool {
return scores [ nodes [ i ] . HashCode ( ) ] > scores [ nodes [ j ] . HashCode ( ) ]
} )
return nodes , errOccurred
}
func ( p * P2PServiceDefault ) SignMessageSimple ( message [ ] byte ) ( [ ] byte , error ) {
2024-01-30 03:38:52 +00:00
signedMessage := protocol . NewSignedMessageRequest ( message )
2024-01-30 02:32:13 +00:00
signedMessage . SetNodeId ( p . localNodeID )
err := signedMessage . Sign ( p . Config ( ) )
if err != nil {
return nil , err
}
result , err := msgpack . Marshal ( signedMessage )
if err != nil {
return nil , err
}
return result , nil
}
func ( p * P2PServiceDefault ) AddPeer ( peer net . Peer ) error {
peerId , err := peer . Id ( ) . ToString ( )
if err != nil {
return err
}
p . peers . Put ( peerId , peer )
p . reconnectDelay . PutUInt ( peerId , 1 )
if p . peersPending . Contains ( peerId ) {
p . peersPending . Remove ( peerId )
}
return nil
}
func ( p * P2PServiceDefault ) SendPublicPeersToPeer ( peer net . Peer , peersToSend [ ] net . Peer ) error {
2024-01-30 03:38:52 +00:00
announceRequest := protocol . NewAnnounceRequest ( peer , peersToSend )
2024-01-30 02:32:13 +00:00
message , err := msgpack . Marshal ( announceRequest )
if err != nil {
return err
}
signedMessage , err := p . SignMessageSimple ( message )
if err != nil {
return err
}
err = peer . SendMessage ( signedMessage )
return nil
}
func ( p * P2PServiceDefault ) SendHashRequest ( hash * encoding . Multihash , kinds [ ] types . StorageLocationType ) error {
hashRequest := protocol . NewHashRequest ( hash , kinds )
message , err := msgpack . Marshal ( hashRequest )
if err != nil {
return err
}
for _ , peer := range p . peers . Values ( ) {
peerValue , ok := peer . ( net . Peer )
if ! ok {
p . Logger ( ) . Error ( "failed to cast peer to net.Peer" )
continue
}
err = peerValue . SendMessage ( message )
}
return nil
}
func ( p * P2PServiceDefault ) UpVote ( nodeId * encoding . NodeId ) error {
err := p . vote ( nodeId , true )
if err != nil {
return err
}
return nil
}
func ( p * P2PServiceDefault ) DownVote ( nodeId * encoding . NodeId ) error {
err := p . vote ( nodeId , false )
if err != nil {
return err
}
return nil
}
func ( p * P2PServiceDefault ) vote ( nodeId * encoding . NodeId , upvote bool ) error {
votes , err := p . readNodeVotes ( nodeId )
if err != nil {
return err
}
if upvote {
votes . Upvote ( )
} else {
votes . Downvote ( )
}
err = p . saveNodeVotes ( nodeId , votes )
if err != nil {
return err
}
return nil
}
func ( p * P2PServiceDefault ) NodeId ( ) * encoding . NodeId {
return p . localNodeID
}
func ( p * P2PServiceDefault ) WaitOnConnectedPeers ( ) {
p . connections . Wait ( )
}
func ( p * P2PServiceDefault ) ConnectionTracker ( ) * sync . WaitGroup {
return & p . connections
}
func ( p * P2PServiceDefault ) NetworkId ( ) string {
return p . Config ( ) . P2P . Network
}
func ( n * P2PServiceDefault ) HashQueryRoutingTable ( ) structs . Map {
return n . hashQueryRoutingTable
}