libs5-go/net/ws.go

142 lines
2.7 KiB
Go
Raw Normal View History

2024-01-06 11:33:46 +00:00
package net
import (
"context"
2024-01-06 11:33:46 +00:00
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"net/url"
"nhooyr.io/websocket"
)
var (
_ PeerFactory = (*WebSocketPeer)(nil)
_ PeerStatic = (*WebSocketPeer)(nil)
_ Peer = (*WebSocketPeer)(nil)
2024-01-06 11:33:46 +00:00
)
type WebSocketPeer struct {
BasePeer
2024-01-07 11:47:19 +00:00
socket *websocket.Conn
abused bool
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) Connect(uri *url.URL) (interface{}, error) {
dial, _, err := websocket.Dial(context.Background(), uri.String(), nil)
2024-01-06 11:33:46 +00:00
if err != nil {
return nil, err
2024-01-06 11:33:46 +00:00
}
return dial, nil
}
func (p *WebSocketPeer) NewPeer(options *TransportPeerConfig) (Peer, error) {
peer := &WebSocketPeer{
BasePeer: BasePeer{
connectionURIs: options.Uris,
socket: options.Socket,
},
2024-01-07 11:47:19 +00:00
socket: options.Socket.(*websocket.Conn),
}
return peer, nil
}
func (p *WebSocketPeer) SendMessage(message []byte) error {
2024-01-07 11:47:19 +00:00
err := p.socket.Write(context.Background(), websocket.MessageBinary, message)
if err != nil {
return err
}
return nil
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) RenderLocationURI() string {
return "WebSocket client"
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) ListenForMessages(callback EventCallback, options ListenerOptions) {
errChan := make(chan error, 10)
2024-01-06 11:33:46 +00:00
for {
2024-01-07 11:47:19 +00:00
_, message, err := p.socket.Read(context.Background())
2024-01-06 11:33:46 +00:00
if err != nil {
if options.OnError != nil {
(*options.OnError)(err)
2024-01-06 11:33:46 +00:00
}
break
}
// Process each message in a separate goroutine
go func(msg []byte) {
// Call the callback and send any errors to the error channel
if err := callback(msg); err != nil {
errChan <- err
}
}(message)
// Non-blocking error check
select {
case err := <-errChan:
if options.OnError != nil {
(*options.OnError)(err)
2024-01-06 11:33:46 +00:00
}
default:
2024-01-06 11:33:46 +00:00
}
}
if options.OnClose != nil {
(*options.OnClose)()
2024-01-06 11:33:46 +00:00
}
// Handle remaining errors
close(errChan)
for err := range errChan {
if options.OnError != nil {
(*options.OnError)(err)
}
}
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) End() error {
2024-01-07 11:47:19 +00:00
err := p.socket.Close(websocket.StatusNormalClosure, "")
2024-01-06 11:33:46 +00:00
if err != nil {
return err
2024-01-06 11:33:46 +00:00
}
return nil
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) EndForAbuse() error {
err := p.socket.Close(websocket.StatusPolicyViolation, "")
if err != nil {
return err
}
p.abused = true
2024-01-06 11:33:46 +00:00
return nil
}
2024-01-06 11:33:46 +00:00
func (p *WebSocketPeer) SetId(id *encoding.NodeId) {
p.id = id
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) SetChallenge(challenge []byte) {
p.challenge = challenge
}
func (p *WebSocketPeer) GetChallenge() []byte {
return p.challenge
}
2024-01-15 16:15:11 +00:00
func (b *WebSocketPeer) GetIP() string {
ctx, cancel := context.WithCancel(context.Background())
netConn := websocket.NetConn(ctx, b.socket, websocket.MessageBinary)
ipAddr := netConn.RemoteAddr().String()
cancel()
return ipAddr
}
func (p *WebSocketPeer) Abused() bool {
return p.abused
}