diff --git a/net/ws.go b/net/ws.go index bb8e167..7aa5328 100644 --- a/net/ws.go +++ b/net/ws.go @@ -5,6 +5,7 @@ import ( "git.lumeweb.com/LumeWeb/libs5-go/encoding" "net/url" "nhooyr.io/websocket" + "sync" ) var ( @@ -55,6 +56,8 @@ func (p *WebSocketPeer) RenderLocationURI() string { func (p *WebSocketPeer) ListenForMessages(callback EventCallback, options ListenerOptions) { errChan := make(chan error, 10) + doneChan := make(chan struct{}) + var wg sync.WaitGroup for { _, message, err := p.socket.Read(context.Background()) @@ -65,11 +68,17 @@ func (p *WebSocketPeer) ListenForMessages(callback EventCallback, options Listen break } + wg.Add(1) // Process each message in a separate goroutine go func(msg []byte) { + defer wg.Done() // Call the callback and send any errors to the error channel if err := callback(msg); err != nil { - errChan <- err + select { + case errChan <- err: + case <-doneChan: + // Stop sending errors if doneChan is closed + } } }(message) @@ -87,6 +96,9 @@ func (p *WebSocketPeer) ListenForMessages(callback EventCallback, options Listen (*options.OnClose)() } + // Close doneChan and wait for all goroutines to finish + close(doneChan) + wg.Wait() // Handle remaining errors close(errChan) for err := range errChan {