From 8177f6dc667f2edca66073e433baf5cff36cda41 Mon Sep 17 00:00:00 2001 From: lolka1333 Date: Tue, 5 May 2026 18:27:49 +0300 Subject: [PATCH] ws/inbounds: realtime fixes + perf for 10k+ client inbounds (#4123) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ws/inbounds: realtime fixes + perf for 10k+ client inbounds - hub: dedup, throttle, panic-restart, deadlock fix, race tests - client: backoff cap + slow-retry instead of giving up - broadcast: delta-only payload, count-based invalidate fallback - filter: fix empty online list (Inbound has no .id, use dbInbound.toInbound) - perf: O(N²)→O(N) traffic merge, bulk delete, /setEnable endpoint - traffic: monotonic all_time + UI clamp + propagate in delta handler - session: persist on update/logout (fixes logout-after-password-change) - ui: protocol tags flex, traffic bar normalize * Remove hub_test.go file * fix: ws hub, inbound service, and frontend correctness - propagate DelInbound error on disable path in SetInboundEnable - skip empty emails in updateClientTraffics to avoid constraint violations - use consistent IN ? clause, drop redundant ErrRecordNotFound guards - Hub.Unregister: direct removeClient fallback when channel is full - applyClientStatsDelta: O(1) email lookup via per-inbound Map cache - WS payload size check: Blob.size instead of .length for real byte count * fix: chunk large IN ? queries and fix IPv6 same-origin check * fix: chunk large IN ? queries and fix IPv6 same-origin check * fix: unify clientStats cache, throttle clarity, hub constants * fix(ui): align traffic/expiry cell columns across all rows * style(ui): redesign outbounds table for visual consistency * style(ui): redesign routing table for visual consistency * fix: * fix: * fix: * fix: * fix: * fix: font * refactor: simplify outbound tone functions for consistency and maintainability --------- Co-authored-by: lolka1333 --- web/assets/js/websocket.js | 296 ++++++---- web/controller/inbound.go | 76 ++- web/controller/index.go | 11 +- web/controller/setting.go | 4 +- web/controller/websocket.go | 137 ++--- web/html/component/aClientTable.html | 54 +- web/html/component/aTableSortable.html | 428 ++++++++------- web/html/inbounds.html | 412 +++++++++----- web/html/settings/xray/outbounds.html | 183 ++++--- web/html/settings/xray/routing.html | 296 ++++++---- web/html/xray.html | 718 ++++++++++++++++++------- web/job/xray_traffic_job.go | 85 ++- web/service/inbound.go | 381 ++++++++++--- web/session/session.go | 26 +- web/websocket/hub.go | 585 ++++++++++---------- web/websocket/notifier.go | 80 +-- 16 files changed, 2373 insertions(+), 1399 deletions(-) diff --git a/web/assets/js/websocket.js b/web/assets/js/websocket.js index ccafef87..64dd2769 100644 --- a/web/assets/js/websocket.js +++ b/web/assets/js/websocket.js @@ -1,150 +1,212 @@ /** - * WebSocket client for real-time updates + * WebSocket client for real-time panel updates. + * + * Public API (kept stable for index.html / inbounds.html / xray.html): + * - connect() — open the connection (idempotent) + * - disconnect() — close and stop reconnecting + * - on(event, callback) — subscribe to event + * - off(event, callback) — unsubscribe + * - send(data) — send JSON to the server + * - isConnected — boolean, current state + * - reconnectAttempts — number, attempts since last success + * - maxReconnectAttempts — number, give-up threshold + * + * Built-in events: + * 'connected', 'disconnected', 'error', 'message', + * plus any server-emitted message type (status, traffic, client_stats, ...). */ class WebSocketClient { + static #MAX_PAYLOAD_BYTES = 10 * 1024 * 1024; // 10 MB, mirrors hub maxMessageSize. + static #BASE_RECONNECT_MS = 1000; + static #MAX_RECONNECT_MS = 30_000; + // After exhausting maxReconnectAttempts we switch to a polite slow-retry + // cadence rather than giving up forever — a panel that recovers an hour + // later should reconnect without a manual page reload. + static #SLOW_RETRY_MS = 60_000; + constructor(basePath = '') { this.basePath = basePath; - this.ws = null; - this.reconnectAttempts = 0; this.maxReconnectAttempts = 10; - this.reconnectDelay = 1000; - this.listeners = new Map(); + this.reconnectAttempts = 0; this.isConnected = false; + + this.ws = null; this.shouldReconnect = true; + this.reconnectTimer = null; + this.listeners = new Map(); // event → Set } + // Open the connection. Safe to call repeatedly — no-op if already + // open/connecting. Re-enables reconnects if previously disabled. Cancels + // any pending reconnect timer so an external connect() can't race a + // delayed retry into spawning a second socket. connect() { if (this.ws && (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING)) { return; } - this.shouldReconnect = true; - - const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; - // Ensure basePath ends with '/' for proper URL construction - let basePath = this.basePath || ''; - if (basePath && !basePath.endsWith('/')) { - basePath += '/'; - } - const wsUrl = `${protocol}//${window.location.host}${basePath}ws`; - - console.log('WebSocket connecting to:', wsUrl, 'basePath:', this.basePath); - - try { - this.ws = new WebSocket(wsUrl); - - this.ws.onopen = () => { - console.log('WebSocket connected'); - this.isConnected = true; - this.reconnectAttempts = 0; - this.emit('connected'); - }; - - this.ws.onmessage = (event) => { - try { - // Validate message size (prevent memory issues) - const maxMessageSize = 10 * 1024 * 1024; // 10MB - if (event.data && event.data.length > maxMessageSize) { - console.error('WebSocket message too large:', event.data.length, 'bytes'); - this.ws.close(); - return; - } - - const message = JSON.parse(event.data); - if (!message || typeof message !== 'object') { - console.error('Invalid WebSocket message format'); - return; - } - - this.handleMessage(message); - } catch (e) { - console.error('Failed to parse WebSocket message:', e); - } - }; - - this.ws.onerror = (error) => { - console.error('WebSocket error:', error); - this.emit('error', error); - }; - - this.ws.onclose = () => { - console.log('WebSocket disconnected'); - this.isConnected = false; - this.emit('disconnected'); - - if (this.shouldReconnect && this.reconnectAttempts < this.maxReconnectAttempts) { - this.reconnectAttempts++; - const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); - console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`); - setTimeout(() => this.connect(), delay); - } - }; - } catch (e) { - console.error('Failed to create WebSocket connection:', e); - this.emit('error', e); - } - } - - handleMessage(message) { - const { type, payload, time } = message; - - // Emit to specific type listeners - this.emit(type, payload, time); - - // Emit to all listeners - this.emit('message', { type, payload, time }); - } - - on(event, callback) { - if (!this.listeners.has(event)) { - this.listeners.set(event, []); - } - const callbacks = this.listeners.get(event); - if (!callbacks.includes(callback)) { - callbacks.push(callback); - } - } - - off(event, callback) { - if (!this.listeners.has(event)) { - return; - } - const callbacks = this.listeners.get(event); - const index = callbacks.indexOf(callback); - if (index > -1) { - callbacks.splice(index, 1); - } - } - - emit(event, ...args) { - if (this.listeners.has(event)) { - this.listeners.get(event).forEach(callback => { - try { - callback(...args); - } catch (e) { - console.error('Error in WebSocket event handler:', e); - } - }); - } + this.#cancelReconnect(); + this.#openSocket(); } + // Close the connection and stop any pending reconnect attempt. Resets the + // attempt counter so a future connect() starts fresh from the small backoff. disconnect() { this.shouldReconnect = false; + this.#cancelReconnect(); + this.reconnectAttempts = 0; if (this.ws) { - this.ws.close(); + try { this.ws.close(1000, 'client disconnect'); } catch { /* ignore */ } this.ws = null; } + this.isConnected = false; } + // Subscribe to an event. Re-subscribing the same callback is a no-op. + on(event, callback) { + if (typeof callback !== 'function') return; + let set = this.listeners.get(event); + if (!set) { + set = new Set(); + this.listeners.set(event, set); + } + set.add(callback); + } + + // Unsubscribe from an event. + off(event, callback) { + const set = this.listeners.get(event); + if (!set) return; + set.delete(callback); + if (set.size === 0) this.listeners.delete(event); + } + + // Send JSON to the server. Drops silently if not connected — callers + // should rely on connect()/server pushes rather than client-initiated sends. send(data) { if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(data)); + } + } + + // ───── internals ───── + + #openSocket() { + const url = this.#buildUrl(); + let socket; + try { + socket = new WebSocket(url); + } catch (err) { + console.error('WebSocket: failed to construct connection', err); + this.#emit('error', err); + this.#scheduleReconnect(); + return; + } + this.ws = socket; + + socket.addEventListener('open', () => { + this.isConnected = true; + this.reconnectAttempts = 0; + this.#emit('connected'); + }); + + socket.addEventListener('message', (event) => this.#onMessage(event)); + + socket.addEventListener('error', (event) => { + // Browsers fire 'error' before 'close' on failure. We surface it for + // consumers (so polling fallbacks can engage) but don't log every blip + // — bad networks would flood the console otherwise. + this.#emit('error', event); + }); + + socket.addEventListener('close', () => { + this.isConnected = false; + this.ws = null; + this.#emit('disconnected'); + if (this.shouldReconnect) this.#scheduleReconnect(); + }); + } + + #buildUrl() { + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + let basePath = this.basePath || ''; + if (basePath && !basePath.endsWith('/')) basePath += '/'; + return `${protocol}//${window.location.host}${basePath}ws`; + } + + #onMessage(event) { + const data = event.data; + // Reject oversized payloads up front. We compare actual UTF-8 byte + // length (via Blob.size) against the limit — string.length counts + // UTF-16 code units, which can undercount real bytes by up to 4× for + // payloads with non-ASCII characters and bypass the cap. + if (typeof data === 'string') { + const byteLen = new Blob([data]).size; + if (byteLen > WebSocketClient.#MAX_PAYLOAD_BYTES) { + console.error(`WebSocket: payload too large (${byteLen} bytes), closing`); + try { this.ws?.close(1009, 'message too big'); } catch { /* ignore */ } + return; + } + } + let message; + try { + message = JSON.parse(data); + } catch (err) { + console.error('WebSocket: invalid JSON message', err); + return; + } + if (!message || typeof message !== 'object' || typeof message.type !== 'string') { + console.error('WebSocket: malformed message envelope'); + return; + } + this.#emit(message.type, message.payload, message.time); + this.#emit('message', message); + } + + #emit(event, ...args) { + const set = this.listeners.get(event); + if (!set) return; + for (const callback of set) { + try { + callback(...args); + } catch (err) { + console.error(`WebSocket: handler for "${event}" threw`, err); + } + } + } + + #scheduleReconnect() { + if (!this.shouldReconnect) return; + this.#cancelReconnect(); + + let base; + if (this.reconnectAttempts < this.maxReconnectAttempts) { + this.reconnectAttempts += 1; + // Exponential backoff inside the active window. + const exp = WebSocketClient.#BASE_RECONNECT_MS * 2 ** (this.reconnectAttempts - 1); + base = Math.min(WebSocketClient.#MAX_RECONNECT_MS, exp); } else { - console.warn('WebSocket is not connected'); + // Active window exhausted — keep trying once a minute. The page-level + // polling fallback runs in parallel; this just brings WS back when the + // network recovers. + base = WebSocketClient.#SLOW_RETRY_MS; + } + // ±25% jitter so reloads after a panel restart don't reconnect in lockstep. + const delay = base * (0.75 + Math.random() * 0.5); + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this.#openSocket(); + }, delay); + } + + #cancelReconnect() { + if (this.reconnectTimer !== null) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; } } } -// Create global WebSocket client instance -// Safely get basePath from global scope (defined in page.html) +// Global instance — basePath is set by page.html before this script loads. window.wsClient = new WebSocketClient(typeof basePath !== 'undefined' ? basePath : ''); diff --git a/web/controller/inbound.go b/web/controller/inbound.go index ee024cc6..5b8ad38b 100644 --- a/web/controller/inbound.go +++ b/web/controller/inbound.go @@ -27,6 +27,34 @@ func NewInboundController(g *gin.RouterGroup) *InboundController { return a } +// broadcastInboundsUpdateClientLimit is the threshold past which we skip the +// full-list push over WebSocket and signal the frontend to re-fetch via REST. +// Mirrors the same heuristic used by the periodic traffic job. +const broadcastInboundsUpdateClientLimit = 5000 + +// broadcastInboundsUpdate fetches and broadcasts the inbound list for userId. +// At scale (10k+ clients) the marshaled JSON exceeds the WS payload ceiling, +// so we send an invalidate signal instead — frontend re-fetches via REST. +// Skipped entirely when no WebSocket clients are connected. +func (a *InboundController) broadcastInboundsUpdate(userId int) { + if !websocket.HasClients() { + return + } + inbounds, err := a.inboundService.GetInbounds(userId) + if err != nil { + return + } + totalClients := 0 + for _, ib := range inbounds { + totalClients += len(ib.ClientStats) + } + if totalClients > broadcastInboundsUpdateClientLimit { + websocket.BroadcastInvalidate(websocket.MessageTypeInbounds) + return + } + websocket.BroadcastInbounds(inbounds) +} + // initRouter initializes the routes for inbound-related operations. func (a *InboundController) initRouter(g *gin.RouterGroup) { @@ -38,6 +66,7 @@ func (a *InboundController) initRouter(g *gin.RouterGroup) { g.POST("/add", a.addInbound) g.POST("/del/:id", a.delInbound) g.POST("/update/:id", a.updateInbound) + g.POST("/setEnable/:id", a.setInboundEnable) g.POST("/clientIps/:email", a.getClientIps) g.POST("/clearClientIps/:email", a.clearClientIps) g.POST("/addClient", a.addInboundClient) @@ -134,9 +163,7 @@ func (a *InboundController) addInbound(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } - // Broadcast inbounds update via WebSocket - inbounds, _ := a.inboundService.GetInbounds(user.Id) - websocket.BroadcastInbounds(inbounds) + a.broadcastInboundsUpdate(user.Id) } // delInbound deletes an inbound configuration by its ID. @@ -155,10 +182,8 @@ func (a *InboundController) delInbound(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } - // Broadcast inbounds update via WebSocket user := session.GetLoginUser(c) - inbounds, _ := a.inboundService.GetInbounds(user.Id) - websocket.BroadcastInbounds(inbounds) + a.broadcastInboundsUpdate(user.Id) } // updateInbound updates an existing inbound configuration. @@ -185,10 +210,43 @@ func (a *InboundController) updateInbound(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } - // Broadcast inbounds update via WebSocket user := session.GetLoginUser(c) - inbounds, _ := a.inboundService.GetInbounds(user.Id) - websocket.BroadcastInbounds(inbounds) + a.broadcastInboundsUpdate(user.Id) +} + +// setInboundEnable flips only the enable flag of an inbound. This is a +// dedicated endpoint because the regular update path serialises the entire +// settings JSON (every client) — far too heavy for an interactive switch +// on inbounds with thousands of clients. Frontend optimistically updates +// the UI; we just persist + sync xray + nudge other open admin sessions. +func (a *InboundController) setInboundEnable(c *gin.Context) { + id, err := strconv.Atoi(c.Param("id")) + if err != nil { + jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundUpdateSuccess"), err) + return + } + type form struct { + Enable bool `json:"enable" form:"enable"` + } + var f form + if err := c.ShouldBind(&f); err != nil { + jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err) + return + } + needRestart, err := a.inboundService.SetInboundEnable(id, f.Enable) + if err != nil { + jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err) + return + } + jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundUpdateSuccess"), nil) + if needRestart { + a.xrayService.SetToNeedRestart() + } + // Cross-admin sync: lightweight invalidate signal (a few hundred bytes) + // instead of fetching + serialising the whole inbound list. Other open + // sessions re-fetch via REST. The toggling admin's own UI already + // updated optimistically. + websocket.BroadcastInvalidate(websocket.MessageTypeInbounds) } // getClientIps retrieves the IP addresses associated with a client by email. diff --git a/web/controller/index.go b/web/controller/index.go index 1325bed5..14791543 100644 --- a/web/controller/index.go +++ b/web/controller/index.go @@ -10,7 +10,6 @@ import ( "github.com/mhsanaei/3x-ui/v2/web/service" "github.com/mhsanaei/3x-ui/v2/web/session" - "github.com/gin-contrib/sessions" "github.com/gin-gonic/gin" ) @@ -95,9 +94,8 @@ func (a *IndexController) login(c *gin.Context) { logger.Infof("%s logged in successfully, Ip Address: %s\n", safeUser, getRemoteIp(c)) a.tgbot.UserLoginNotify(safeUser, ``, getRemoteIp(c), timeStr, 1) - session.SetLoginUser(c, user) - if err := sessions.Default(c).Save(); err != nil { - logger.Warning("Unable to save session: ", err) + if err := session.SetLoginUser(c, user); err != nil { + logger.Warning("Unable to save session:", err) return } @@ -111,9 +109,8 @@ func (a *IndexController) logout(c *gin.Context) { if user != nil { logger.Infof("%s logged out successfully", user.Username) } - session.ClearSession(c) - if err := sessions.Default(c).Save(); err != nil { - logger.Warning("Unable to save session after clearing:", err) + if err := session.ClearSession(c); err != nil { + logger.Warning("Unable to clear session on logout:", err) } c.Redirect(http.StatusTemporaryRedirect, c.GetString("base_path")) } diff --git a/web/controller/setting.go b/web/controller/setting.go index fc5486bc..263dbe32 100644 --- a/web/controller/setting.go +++ b/web/controller/setting.go @@ -99,7 +99,9 @@ func (a *SettingController) updateUser(c *gin.Context) { if err == nil { user.Username = form.NewUsername user.Password, _ = crypto.HashPasswordAsBcrypt(form.NewPassword) - session.SetLoginUser(c, user) + if saveErr := session.SetLoginUser(c, user); saveErr != nil { + err = saveErr + } } jsonMsg(c, I18nWeb(c, "pages.settings.toasts.modifyUser"), err) } diff --git a/web/controller/websocket.go b/web/controller/websocket.go index dfb59709..2e9fbca0 100644 --- a/web/controller/websocket.go +++ b/web/controller/websocket.go @@ -1,7 +1,9 @@ package controller import ( + "net" "net/http" + "net/url" "strings" "time" @@ -16,105 +18,80 @@ import ( ) const ( - // Time allowed to write a message to the peer - writeWait = 10 * time.Second - - // Time allowed to read the next pong message from the peer - pongWait = 60 * time.Second - - // Send pings to peer with this period (must be less than pongWait) - pingPeriod = (pongWait * 9) / 10 - - // Maximum message size allowed from peer - maxMessageSize = 512 + writeWait = 10 * time.Second + pongWait = 60 * time.Second + pingPeriod = (pongWait * 9) / 10 + clientReadLimit = 512 ) var upgrader = ws.Upgrader{ ReadBufferSize: 32768, WriteBufferSize: 32768, - EnableCompression: true, // Negotiate permessage-deflate compression if the client supports it - - CheckOrigin: func(r *http.Request) bool { - // Check origin for security - origin := r.Header.Get("Origin") - if origin == "" { - // Allow connections without Origin header (same-origin requests) - return true - } - // Get the host from the request - host := r.Host - // Extract scheme and host from origin - originURL := origin - // Simple check: origin should match the request host - // This prevents cross-origin WebSocket hijacking - if strings.HasPrefix(originURL, "http://") || strings.HasPrefix(originURL, "https://") { - // Extract host from origin - originHost := strings.TrimPrefix(strings.TrimPrefix(originURL, "http://"), "https://") - if idx := strings.Index(originHost, "/"); idx != -1 { - originHost = originHost[:idx] - } - if idx := strings.Index(originHost, ":"); idx != -1 { - originHost = originHost[:idx] - } - // Compare hosts (without port) - requestHost := host - if idx := strings.Index(requestHost, ":"); idx != -1 { - requestHost = requestHost[:idx] - } - return originHost == requestHost || originHost == "" || requestHost == "" - } - return false - }, + EnableCompression: true, + CheckOrigin: checkSameOrigin, } -// WebSocketController handles WebSocket connections for real-time updates +// checkSameOrigin allows requests with no Origin header (same-origin or non-browser +// clients) and otherwise requires the Origin hostname to match the request hostname. +// Comparison is case-insensitive (RFC 7230 §2.7.3) and ignores port differences +// (the panel often sits behind a reverse proxy on a different port). +func checkSameOrigin(r *http.Request) bool { + origin := r.Header.Get("Origin") + if origin == "" { + return true + } + u, err := url.Parse(origin) + if err != nil || u.Hostname() == "" { + return false + } + host, _, err := net.SplitHostPort(r.Host) + if err != nil { + // IPv6 literals without a port arrive as "[::1]"; net.SplitHostPort + // fails in that case while url.Hostname() returns the address without + // brackets. Strip them so same-origin checks pass for bare IPv6 hosts. + host = r.Host + if len(host) >= 2 && host[0] == '[' && host[len(host)-1] == ']' { + host = host[1 : len(host)-1] + } + } + return strings.EqualFold(u.Hostname(), host) +} + +// WebSocketController handles WebSocket connections for real-time updates. type WebSocketController struct { BaseController hub *websocket.Hub } -// NewWebSocketController creates a new WebSocket controller +// NewWebSocketController creates a new WebSocket controller. func NewWebSocketController(hub *websocket.Hub) *WebSocketController { - return &WebSocketController{ - hub: hub, - } + return &WebSocketController{hub: hub} } -// HandleWebSocket handles WebSocket connections +// HandleWebSocket upgrades the HTTP connection and starts the read/write pumps. func (w *WebSocketController) HandleWebSocket(c *gin.Context) { - // Check authentication if !session.IsLogin(c) { logger.Warningf("Unauthorized WebSocket connection attempt from %s", getRemoteIp(c)) c.AbortWithStatus(http.StatusUnauthorized) return } - // Upgrade connection to WebSocket conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { logger.Error("Failed to upgrade WebSocket connection:", err) return } - // Create client - clientID := uuid.New().String() - client := &websocket.Client{ - ID: clientID, - Hub: w.hub, - Send: make(chan []byte, 512), // Increased from 256 to 512 to prevent overflow - Topics: make(map[websocket.MessageType]bool), - } - - // Register client + client := websocket.NewClient(uuid.New().String()) w.hub.Register(client) - logger.Debugf("WebSocket client %s registered from %s", clientID, getRemoteIp(c)) + logger.Debugf("WebSocket client %s registered from %s", client.ID, getRemoteIp(c)) - // Start goroutines for reading and writing go w.writePump(client, conn) go w.readPump(client, conn) } -// readPump pumps messages from the WebSocket connection to the hub +// readPump consumes inbound frames so the gorilla deadline/pong machinery keeps +// running. Clients send no commands today; frames are discarded. func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) { defer func() { if r := common.Recover("WebSocket readPump panic"); r != nil { @@ -124,35 +101,23 @@ func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) conn.Close() }() + conn.SetReadLimit(clientReadLimit) conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetPongHandler(func(string) error { - conn.SetReadDeadline(time.Now().Add(pongWait)) - return nil + return conn.SetReadDeadline(time.Now().Add(pongWait)) }) - conn.SetReadLimit(maxMessageSize) for { - _, message, err := conn.ReadMessage() - if err != nil { + if _, _, err := conn.ReadMessage(); err != nil { if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) { logger.Debugf("WebSocket read error for client %s: %v", client.ID, err) } - break + return } - - // Validate message size - if len(message) > maxMessageSize { - logger.Warningf("WebSocket message from client %s exceeds max size: %d bytes", client.ID, len(message)) - continue - } - - // Handle incoming messages (e.g., subscription requests) - // For now, we'll just log them - logger.Debugf("Received WebSocket message from client %s: %s", client.ID, string(message)) } } -// writePump pumps messages from the hub to the WebSocket connection +// writePump pushes hub messages to the connection and emits keepalive pings. func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) { ticker := time.NewTicker(pingPeriod) defer func() { @@ -165,17 +130,13 @@ func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) for { select { - case message, ok := <-client.Send: + case msg, ok := <-client.Send: conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { - // Hub closed the channel conn.WriteMessage(ws.CloseMessage, []byte{}) return } - - // Send each message individually (no batching) - // This ensures each JSON message is sent separately and can be parsed correctly - if err := conn.WriteMessage(ws.TextMessage, message); err != nil { + if err := conn.WriteMessage(ws.TextMessage, msg); err != nil { logger.Debugf("WebSocket write error for client %s: %v", client.ID, err) return } diff --git a/web/html/component/aClientTable.html b/web/html/component/aClientTable.html index 6e525396..977638f2 100644 --- a/web/html/component/aClientTable.html +++ b/web/html/component/aClientTable.html @@ -93,27 +93,22 @@ - - - - - - - - -
[[ SizeFormatter.sizeFormat(getSumStats(record, client.email)) ]] - - - - - - - - -
+
+
[[ SizeFormatter.sizeFormat(getSumStats(record, client.email)) ]]
+
+ +
+
+ +
+
+ +
+
+ + +
+
@@ -127,16 +122,13 @@ {{ i18n "pages.client.delayedStart" }} [[ IntlUtil.formatDate(client.expiryTime) ]] - - - - - - -
[[ IntlUtil.formatRelativeTime(client.expiryTime) ]] - - [[ client.reset + "d" ]]
+
+
[[ IntlUtil.formatRelativeTime(client.expiryTime) ]]
+
+ +
+
[[ client.reset + "d" ]]
+