refactor: change how we manage peers, create getter/setters on Peer/BasePeer, and refactor WebSocketPeer to use new ws package and add Connect/NewPeer

This commit is contained in:
Derrick Hammer 2024-01-07 06:33:32 -05:00
parent 8435ce33de
commit 52b7426a7a
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
6 changed files with 95 additions and 36 deletions

View File

@ -25,8 +25,8 @@ var (
func init() { func init() {
transports = sync.Map{} transports = sync.Map{}
//RegisterPeerType("ws", WebSocketPeer) RegisterTransport("ws", WebSocketPeer{})
//RegisterPeerType("wss", WebSocketPeer) RegisterTransport("wss", WebSocketPeer{})
} }
func RegisterTransport(peerType string, factory interface{}) { func RegisterTransport(peerType string, factory interface{}) {
if _, ok := factory.(PeerFactory); !ok { if _, ok := factory.(PeerFactory); !ok {

View File

@ -9,15 +9,15 @@ import (
// EventCallback type for the callback function // EventCallback type for the callback function
type EventCallback func(event []byte) error type EventCallback func(event []byte) error
// DoneCallback type for the onDone callback // CloseCallback type for the OnClose callback
type DoneCallback func() type CloseCallback func()
// ErrorCallback type for the onError callback // ErrorCallback type for the onError callback
type ErrorCallback func(args ...interface{}) type ErrorCallback func(args ...interface{})
// ListenerOptions struct for options // ListenerOptions struct for options
type ListenerOptions struct { type ListenerOptions struct {
OnDone *DoneCallback OnClose *CloseCallback
OnError *ErrorCallback OnError *ErrorCallback
Logger *zap.Logger Logger *zap.Logger
} }
@ -28,15 +28,41 @@ type Peer interface {
ListenForMessages(callback EventCallback, options ListenerOptions) ListenForMessages(callback EventCallback, options ListenerOptions)
End() error End() error
SetId(id *encoding.NodeId) SetId(id *encoding.NodeId)
GetId() *encoding.NodeId Id() *encoding.NodeId
SetChallenge(challenge []byte) SetChallenge(challenge []byte)
GetChallenge() []byte Challenge() []byte
SetSocket(socket interface{})
Socket() interface{}
} }
type BasePeer struct { type BasePeer struct {
ConnectionURIs []url.URL connectionURIs []*url.URL
IsConnected bool isConnected bool
challenge []byte challenge []byte
Socket interface{} socket interface{}
Id *encoding.NodeId id *encoding.NodeId
}
func (b *BasePeer) Challenge() []byte {
return b.challenge
}
func (b *BasePeer) SetChallenge(challenge []byte) {
b.challenge = challenge
}
func (b *BasePeer) Socket() interface{} {
return b.socket
}
func (b *BasePeer) SetSocket(socket interface{}) {
b.socket = socket
}
func (b *BasePeer) Id() *encoding.NodeId {
return b.id
}
func (b *BasePeer) SetId(id *encoding.NodeId) {
b.id = id
} }

View File

@ -1,8 +1,16 @@
package net package net
import ( import (
"context"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"github.com/gorilla/websocket" "net/url"
"nhooyr.io/websocket"
)
var (
_ PeerFactory = (*WebSocketPeer)(nil)
_ PeerStatic = (*WebSocketPeer)(nil)
_ Peer = (*WebSocketPeer)(nil)
) )
type WebSocketPeer struct { type WebSocketPeer struct {
@ -10,49 +18,74 @@ type WebSocketPeer struct {
Socket *websocket.Conn Socket *websocket.Conn
} }
func (p *WebSocketPeer) SendMessage(message []byte) { func (p *WebSocketPeer) Connect(uri *url.URL) (interface{}, error) {
err := p.Socket.WriteMessage(websocket.BinaryMessage, message) dial, _, err := websocket.Dial(context.Background(), uri.String(), nil)
if err != nil { if err != nil {
return return nil, err
} }
return dial, nil
}
func (p *WebSocketPeer) NewPeer(options *TransportPeerConfig) (Peer, error) {
peer := &WebSocketPeer{
BasePeer: BasePeer{
connectionURIs: options.Uris,
socket: options.Socket,
},
Socket: options.Socket.(*websocket.Conn),
}
return peer, nil
}
func (p *WebSocketPeer) SendMessage(message []byte) error {
err := p.Socket.Write(context.Background(), websocket.MessageBinary, message)
if err != nil {
return err
}
return nil
} }
func (p *WebSocketPeer) RenderLocationURI() string { func (p *WebSocketPeer) RenderLocationURI() string {
return p.Socket.RemoteAddr().String() return "WebSocket client"
} }
func (p *WebSocketPeer) ListenForMessages(callback EventCallback, onClose func(), onError func(error)) { func (p *WebSocketPeer) ListenForMessages(callback EventCallback, options ListenerOptions) {
for { for {
_, message, err := p.Socket.ReadMessage() _, message, err := p.Socket.Read(context.Background())
if err != nil { if err != nil {
if onError != nil { if options.OnError != nil {
onError(err) (*options.OnError)(err)
} }
break break
} }
err = callback(message) err = callback(message)
if err != nil { if err != nil {
if onError != nil { if options.OnError != nil {
onError(err) (*options.OnError)(err)
} }
} }
} }
if onClose != nil { if options.OnClose != nil {
onClose() (*options.OnClose)()
} }
} }
func (p *WebSocketPeer) End() { func (p *WebSocketPeer) End() error {
err := p.Socket.Close() err := p.Socket.Close(websocket.StatusNormalClosure, "")
if err != nil { if err != nil {
return return err
} }
return nil
} }
func (p *WebSocketPeer) SetId(id *encoding.NodeId) { func (p *WebSocketPeer) SetId(id *encoding.NodeId) {
p.Id = id p.id = id
} }
func (p *WebSocketPeer) SetChallenge(challenge []byte) { func (p *WebSocketPeer) SetChallenge(challenge []byte) {

View File

@ -94,13 +94,13 @@ func (h *HashQuery) HandleMessage(node interfaces.Node, peer *net.Peer, verifyId
peers = peersVal.(*hashset.Set) peers = peersVal.(*hashset.Set)
if exists := peers.Contains((*peer).GetId()); !exists { if exists := peers.Contains((*peer).Id()); !exists {
peers.Add((*peer).GetId()) peers.Add((*peer).Id())
} }
for _, val := range node.Services().P2P().Peers().Values() { for _, val := range node.Services().P2P().Peers().Values() {
peerVal := val.(net.Peer) peerVal := val.(net.Peer)
if !peerVal.GetId().Equals((*peer).GetId()) { if !peerVal.Id().Equals((*peer).Id()) {
err := peerVal.SendMessage(h.IncomingMessageImpl.Original()) err := peerVal.SendMessage(h.IncomingMessageImpl.Original())
if err != nil { if err != nil {
node.Logger().Error("Failed to send message", zap.Error(err)) node.Logger().Error("Failed to send message", zap.Error(err))

View File

@ -40,7 +40,7 @@ func (h HandshakeDone) HandleMessage(node interfaces.Node, peer *net.Peer, verif
return nil return nil
} }
if !bytes.Equal((*peer).GetChallenge(), h.challenge) { if !bytes.Equal((*peer).Challenge(), h.challenge) {
return errors.New("Invalid challenge") return errors.New("Invalid challenge")
} }
/* /*
@ -52,7 +52,7 @@ func (h HandshakeDone) HandleMessage(node interfaces.Node, peer *net.Peer, verif
} }
} }
peer.IsConnected = true peer.isConnected = true
supportedFeatures := data.UnpackInt() supportedFeatures := data.UnpackInt()

View File

@ -219,8 +219,8 @@ func (p *P2PImpl) OnNewPeer(peer *net.Peer, verifyId bool) error {
return nil return nil
} }
func (p *P2PImpl) OnNewPeerListen(peer *net.Peer, verifyId bool) { func (p *P2PImpl) OnNewPeerListen(peer *net.Peer, verifyId bool) {
onDone := net.DoneCallback(func() { onDone := net.CloseCallback(func() {
peerId, err := (*peer).GetId().ToString() peerId, err := (*peer).Id().ToString()
if err != nil { if err != nil {
p.logger.Error("failed to get peer id", zap.Error(err)) p.logger.Error("failed to get peer id", zap.Error(err))
return return
@ -262,7 +262,7 @@ func (p *P2PImpl) OnNewPeerListen(peer *net.Peer, verifyId bool) {
return nil return nil
}, net.ListenerOptions{ }, net.ListenerOptions{
OnDone: &onDone, OnClose: &onDone,
OnError: &onError, OnError: &onError,
Logger: p.logger, Logger: p.logger,
}) })