libs5-go/net/ws.go

159 lines
3.0 KiB
Go
Raw Normal View History

2024-01-06 11:33:46 +00:00
package net
import (
"context"
2024-01-06 11:33:46 +00:00
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
2024-03-10 11:19:49 +00:00
"net"
"net/url"
"nhooyr.io/websocket"
2024-01-16 00:34:11 +00:00
"sync"
)
var (
_ PeerFactory = (*WebSocketPeer)(nil)
_ PeerStatic = (*WebSocketPeer)(nil)
_ Peer = (*WebSocketPeer)(nil)
2024-01-06 11:33:46 +00:00
)
type WebSocketPeer struct {
BasePeer
2024-01-07 11:47:19 +00:00
socket *websocket.Conn
2024-01-15 18:50:04 +00:00
abuser bool
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) Connect(uri *url.URL) (interface{}, error) {
dial, _, err := websocket.Dial(context.Background(), uri.String(), nil)
2024-01-06 11:33:46 +00:00
if err != nil {
return nil, err
2024-01-06 11:33:46 +00:00
}
return dial, nil
}
func (p *WebSocketPeer) NewPeer(options *TransportPeerConfig) (Peer, error) {
peer := &WebSocketPeer{
BasePeer: BasePeer{
connectionURIs: options.Uris,
socket: options.Socket,
},
2024-01-07 11:47:19 +00:00
socket: options.Socket.(*websocket.Conn),
}
return peer, nil
}
func (p *WebSocketPeer) SendMessage(message []byte) error {
2024-01-07 11:47:19 +00:00
err := p.socket.Write(context.Background(), websocket.MessageBinary, message)
if err != nil {
return err
}
return nil
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) RenderLocationURI() string {
return "WebSocket client"
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) ListenForMessages(callback EventCallback, options ListenerOptions) {
errChan := make(chan error, 10)
2024-01-16 00:34:11 +00:00
doneChan := make(chan struct{})
var wg sync.WaitGroup
2024-01-06 11:33:46 +00:00
for {
2024-01-07 11:47:19 +00:00
_, message, err := p.socket.Read(context.Background())
2024-01-06 11:33:46 +00:00
if err != nil {
if options.OnError != nil {
(*options.OnError)(err)
2024-01-06 11:33:46 +00:00
}
break
}
2024-01-16 00:34:11 +00:00
wg.Add(1)
// Process each message in a separate goroutine
go func(msg []byte) {
2024-01-16 00:34:11 +00:00
defer wg.Done()
// Call the callback and send any errors to the error channel
if err := callback(msg); err != nil {
2024-01-16 00:34:11 +00:00
select {
case errChan <- err:
case <-doneChan:
// Stop sending errors if doneChan is closed
}
}
}(message)
// Non-blocking error check
select {
case err := <-errChan:
if options.OnError != nil {
(*options.OnError)(err)
2024-01-06 11:33:46 +00:00
}
default:
2024-01-06 11:33:46 +00:00
}
}
if options.OnClose != nil {
(*options.OnClose)()
2024-01-06 11:33:46 +00:00
}
2024-01-16 00:34:11 +00:00
// Close doneChan and wait for all goroutines to finish
close(doneChan)
wg.Wait()
// Handle remaining errors
close(errChan)
for err := range errChan {
if options.OnError != nil {
(*options.OnError)(err)
}
}
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) End() error {
2024-01-07 11:47:19 +00:00
err := p.socket.Close(websocket.StatusNormalClosure, "")
2024-01-06 11:33:46 +00:00
if err != nil {
return err
2024-01-06 11:33:46 +00:00
}
return nil
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) EndForAbuse() error {
2024-01-15 18:50:04 +00:00
p.abuser = true
err := p.socket.Close(websocket.StatusPolicyViolation, "")
if err != nil {
return err
}
return nil
}
2024-01-06 11:33:46 +00:00
func (p *WebSocketPeer) SetId(id *encoding.NodeId) {
p.id = id
2024-01-06 11:33:46 +00:00
}
func (p *WebSocketPeer) SetChallenge(challenge []byte) {
p.challenge = challenge
}
func (p *WebSocketPeer) GetChallenge() []byte {
return p.challenge
}
2024-01-15 16:15:11 +00:00
2024-03-10 11:19:49 +00:00
func (p *WebSocketPeer) GetIP() net.Addr {
2024-01-15 16:15:11 +00:00
ctx, cancel := context.WithCancel(context.Background())
2024-03-10 11:19:49 +00:00
netConn := websocket.NetConn(ctx, p.socket, websocket.MessageBinary)
2024-01-15 16:15:11 +00:00
2024-03-10 11:19:49 +00:00
ipAddr := netConn.RemoteAddr()
2024-01-15 16:15:11 +00:00
cancel()
return ipAddr
}
2024-03-10 11:19:49 +00:00
func (b *WebSocketPeer) GetIPString() string {
return b.GetIP().String()
}
2024-01-15 18:50:04 +00:00
func (p *WebSocketPeer) Abuser() bool {
return p.abuser
}