diff --git a/web/controller/login_limiter_test.go b/web/controller/login_limiter_test.go index cc9cd80e..c6f4ac6e 100644 --- a/web/controller/login_limiter_test.go +++ b/web/controller/login_limiter_test.go @@ -10,7 +10,7 @@ func TestLoginLimiterBlocksAfterConfiguredFailures(t *testing.T) { limiter := newLoginLimiter(5, 5*time.Minute, 15*time.Minute) limiter.now = func() time.Time { return now } - for i := 0; i < 4; i++ { + for i := range 4 { if _, blocked := limiter.registerFailure("192.0.2.10", "Admin"); blocked { t.Fatalf("failure %d should not block yet", i+1) } @@ -41,7 +41,7 @@ func TestLoginLimiterPrunesOldFailuresAndResetsOnSuccess(t *testing.T) { limiter := newLoginLimiter(5, 5*time.Minute, 15*time.Minute) limiter.now = func() time.Time { return now } - for i := 0; i < 4; i++ { + for range 4 { limiter.registerFailure("192.0.2.10", "admin") } now = now.Add(6 * time.Minute) @@ -50,7 +50,7 @@ func TestLoginLimiterPrunesOldFailuresAndResetsOnSuccess(t *testing.T) { } limiter.registerSuccess("192.0.2.10", "admin") - for i := 0; i < 4; i++ { + for i := range 4 { if _, blocked := limiter.registerFailure("192.0.2.10", "admin"); blocked { t.Fatalf("success should reset previous failures; failure %d blocked", i+1) } @@ -62,7 +62,7 @@ func TestLoginLimiterSeparatesIPAndUsername(t *testing.T) { limiter := newLoginLimiter(5, 5*time.Minute, 15*time.Minute) limiter.now = func() time.Time { return now } - for i := 0; i < 5; i++ { + for range 5 { limiter.registerFailure("192.0.2.10", "admin") } if _, ok := limiter.allow("192.0.2.11", "admin"); !ok { diff --git a/web/controller/websocket.go b/web/controller/websocket.go index 2e9fbca0..f2b4ee0e 100644 --- a/web/controller/websocket.go +++ b/web/controller/websocket.go @@ -5,25 +5,15 @@ import ( "net/http" "net/url" "strings" - "time" - "github.com/google/uuid" "github.com/mhsanaei/3x-ui/v2/logger" - "github.com/mhsanaei/3x-ui/v2/util/common" + "github.com/mhsanaei/3x-ui/v2/web/service" "github.com/mhsanaei/3x-ui/v2/web/session" - "github.com/mhsanaei/3x-ui/v2/web/websocket" "github.com/gin-gonic/gin" ws "github.com/gorilla/websocket" ) -const ( - writeWait = 10 * time.Second - pongWait = 60 * time.Second - pingPeriod = (pongWait * 9) / 10 - clientReadLimit = 512 -) - var upgrader = ws.Upgrader{ ReadBufferSize: 32768, WriteBufferSize: 32768, @@ -57,18 +47,21 @@ func checkSameOrigin(r *http.Request) bool { return strings.EqualFold(u.Hostname(), host) } -// WebSocketController handles WebSocket connections for real-time updates. +// WebSocketController handles the HTTP→WebSocket upgrade for real-time updates. +// All per-connection lifecycle (pumps, hub registration) lives in +// service.WebSocketService — this controller is HTTP-layer only. type WebSocketController struct { BaseController - hub *websocket.Hub + service *service.WebSocketService } -// NewWebSocketController creates a new WebSocket controller. -func NewWebSocketController(hub *websocket.Hub) *WebSocketController { - return &WebSocketController{hub: hub} +// NewWebSocketController creates a controller wired to the given service. +func NewWebSocketController(svc *service.WebSocketService) *WebSocketController { + return &WebSocketController{service: svc} } -// HandleWebSocket upgrades the HTTP connection and starts the read/write pumps. +// HandleWebSocket authenticates the request, upgrades the HTTP connection, and +// hands ownership of the connection off to the service. func (w *WebSocketController) HandleWebSocket(c *gin.Context) { if !session.IsLogin(c) { logger.Warningf("Unauthorized WebSocket connection attempt from %s", getRemoteIp(c)) @@ -82,71 +75,5 @@ func (w *WebSocketController) HandleWebSocket(c *gin.Context) { return } - client := websocket.NewClient(uuid.New().String()) - w.hub.Register(client) - logger.Debugf("WebSocket client %s registered from %s", client.ID, getRemoteIp(c)) - - go w.writePump(client, conn) - go w.readPump(client, conn) -} - -// readPump consumes inbound frames so the gorilla deadline/pong machinery keeps -// running. Clients send no commands today; frames are discarded. -func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) { - defer func() { - if r := common.Recover("WebSocket readPump panic"); r != nil { - logger.Error("WebSocket readPump panic recovered:", r) - } - w.hub.Unregister(client) - conn.Close() - }() - - conn.SetReadLimit(clientReadLimit) - conn.SetReadDeadline(time.Now().Add(pongWait)) - conn.SetPongHandler(func(string) error { - return conn.SetReadDeadline(time.Now().Add(pongWait)) - }) - - for { - if _, _, err := conn.ReadMessage(); err != nil { - if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) { - logger.Debugf("WebSocket read error for client %s: %v", client.ID, err) - } - return - } - } -} - -// writePump pushes hub messages to the connection and emits keepalive pings. -func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) { - ticker := time.NewTicker(pingPeriod) - defer func() { - if r := common.Recover("WebSocket writePump panic"); r != nil { - logger.Error("WebSocket writePump panic recovered:", r) - } - ticker.Stop() - conn.Close() - }() - - for { - select { - case msg, ok := <-client.Send: - conn.SetWriteDeadline(time.Now().Add(writeWait)) - if !ok { - conn.WriteMessage(ws.CloseMessage, []byte{}) - return - } - if err := conn.WriteMessage(ws.TextMessage, msg); err != nil { - logger.Debugf("WebSocket write error for client %s: %v", client.ID, err) - return - } - - case <-ticker.C: - conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := conn.WriteMessage(ws.PingMessage, nil); err != nil { - logger.Debugf("WebSocket ping error for client %s: %v", client.ID, err) - return - } - } - } + w.service.HandleConnection(conn, getRemoteIp(c)) } diff --git a/web/service/websocket.go b/web/service/websocket.go new file mode 100644 index 00000000..0f86322c --- /dev/null +++ b/web/service/websocket.go @@ -0,0 +1,115 @@ +// Package service: WebSocketService owns the per-connection pump goroutines +// and bridges the HTTP-layer controller to the broadcast hub. The controller +// handles the upgrade handshake and authentication, then hands the raw +// connection to this service which takes ownership of its lifecycle. +package service + +import ( + "time" + + "github.com/mhsanaei/3x-ui/v2/logger" + "github.com/mhsanaei/3x-ui/v2/util/common" + "github.com/mhsanaei/3x-ui/v2/web/websocket" + + "github.com/google/uuid" + ws "github.com/gorilla/websocket" +) + +const ( + wsWriteWait = 10 * time.Second + wsPongWait = 60 * time.Second + wsPingPeriod = (wsPongWait * 9) / 10 + wsClientReadLimit = 512 +) + +// WebSocketService manages WebSocket client connections. It owns the +// read/write pumps for each accepted connection and registers/unregisters +// clients with the hub. +type WebSocketService struct { + hub *websocket.Hub +} + +// NewWebSocketService creates a service backed by the given hub. +func NewWebSocketService(hub *websocket.Hub) *WebSocketService { + return &WebSocketService{hub: hub} +} + +// HandleConnection takes ownership of an upgraded WebSocket connection: +// registers a new client, starts the read/write pumps, and returns +// immediately. The connection is closed when both pumps exit. +func (s *WebSocketService) HandleConnection(conn *ws.Conn, remoteIP string) { + if s == nil || s.hub == nil || conn == nil { + if conn != nil { + conn.Close() + } + return + } + + client := websocket.NewClient(uuid.New().String()) + s.hub.Register(client) + logger.Debugf("WebSocket client %s registered from %s", client.ID, remoteIP) + + go s.writePump(client, conn) + go s.readPump(client, conn) +} + +// readPump consumes inbound frames so the gorilla deadline/pong machinery keeps +// running. Clients send no commands today; frames are discarded. +func (s *WebSocketService) readPump(client *websocket.Client, conn *ws.Conn) { + defer func() { + if r := common.Recover("WebSocket readPump panic"); r != nil { + logger.Error("WebSocket readPump panic recovered:", r) + } + s.hub.Unregister(client) + conn.Close() + }() + + conn.SetReadLimit(wsClientReadLimit) + conn.SetReadDeadline(time.Now().Add(wsPongWait)) + conn.SetPongHandler(func(string) error { + return conn.SetReadDeadline(time.Now().Add(wsPongWait)) + }) + + for { + if _, _, err := conn.ReadMessage(); err != nil { + if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) { + logger.Debugf("WebSocket read error for client %s: %v", client.ID, err) + } + return + } + } +} + +// writePump pushes hub messages to the connection and emits keepalive pings. +func (s *WebSocketService) writePump(client *websocket.Client, conn *ws.Conn) { + ticker := time.NewTicker(wsPingPeriod) + defer func() { + if r := common.Recover("WebSocket writePump panic"); r != nil { + logger.Error("WebSocket writePump panic recovered:", r) + } + ticker.Stop() + conn.Close() + }() + + for { + select { + case msg, ok := <-client.Send: + conn.SetWriteDeadline(time.Now().Add(wsWriteWait)) + if !ok { + conn.WriteMessage(ws.CloseMessage, []byte{}) + return + } + if err := conn.WriteMessage(ws.TextMessage, msg); err != nil { + logger.Debugf("WebSocket write error for client %s: %v", client.ID, err) + return + } + + case <-ticker.C: + conn.SetWriteDeadline(time.Now().Add(wsWriteWait)) + if err := conn.WriteMessage(ws.PingMessage, nil); err != nil { + logger.Debugf("WebSocket ping error for client %s: %v", client.ID, err) + return + } + } + } +} diff --git a/web/web.go b/web/web.go index b1f55332..7d634e70 100644 --- a/web/web.go +++ b/web/web.go @@ -289,8 +289,9 @@ func (s *Server) initRouter() (*gin.Engine, error) { s.wsHub = websocket.NewHub() go s.wsHub.Run() - // Initialize WebSocket controller - s.ws = controller.NewWebSocketController(s.wsHub) + // Initialize WebSocket controller — service owns per-connection pumps, + // controller is HTTP-layer only (auth + upgrade). + s.ws = controller.NewWebSocketController(service.NewWebSocketService(s.wsHub)) // Register WebSocket route with basePath (g already has basePath prefix) g.GET("/ws", s.ws.HandleWebSocket)