Files
3x-ui/web/websocket/hub.go
lolka1333 8177f6dc66 ws/inbounds: realtime fixes + perf for 10k+ client inbounds (#4123)
* ws/inbounds: realtime fixes + perf for 10k+ client inbounds

- hub: dedup, throttle, panic-restart, deadlock fix, race tests
- client: backoff cap + slow-retry instead of giving up
- broadcast: delta-only payload, count-based invalidate fallback
- filter: fix empty online list (Inbound has no .id, use dbInbound.toInbound)
- perf: O(N²)→O(N) traffic merge, bulk delete, /setEnable endpoint
- traffic: monotonic all_time + UI clamp + propagate in delta handler
- session: persist on update/logout (fixes logout-after-password-change)
- ui: protocol tags flex, traffic bar normalize

* Remove hub_test.go file

* fix: ws hub, inbound service, and frontend correctness

- propagate DelInbound error on disable path in SetInboundEnable
- skip empty emails in updateClientTraffics to avoid constraint violations
- use consistent IN ? clause, drop redundant ErrRecordNotFound guards
- Hub.Unregister: direct removeClient fallback when channel is full
- applyClientStatsDelta: O(1) email lookup via per-inbound Map cache
- WS payload size check: Blob.size instead of .length for real byte count

* fix: chunk large IN ? queries and fix IPv6 same-origin check

* fix: chunk large IN ? queries and fix IPv6 same-origin check

* fix: unify clientStats cache, throttle clarity, hub constants

* fix(ui): align traffic/expiry cell columns across all rows

* style(ui): redesign outbounds table for visual consistency

* style(ui): redesign routing table for visual consistency

* fix:

* fix:

* fix:

* fix:

* fix:

* fix: font

* refactor: simplify outbound tone functions for consistency and maintainability

---------

Co-authored-by: lolka1333 <test123@gmail.com>
2026-05-05 17:27:49 +02:00

380 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package websocket provides a WebSocket hub for real-time updates and notifications.
package websocket
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/mhsanaei/3x-ui/v2/logger"
)
// MessageType identifies the kind of WebSocket message.
type MessageType string
const (
MessageTypeStatus MessageType = "status"
MessageTypeTraffic MessageType = "traffic"
MessageTypeInbounds MessageType = "inbounds"
MessageTypeOutbounds MessageType = "outbounds"
MessageTypeNotification MessageType = "notification"
MessageTypeXrayState MessageType = "xray_state"
// MessageTypeClientStats carries absolute traffic counters for the clients
// that had activity in the latest collection window. Frontend applies these
// in-place — far smaller than re-broadcasting the full inbound list and
// scales to 10k+ clients without falling back to REST.
MessageTypeClientStats MessageType = "client_stats"
MessageTypeInvalidate MessageType = "invalidate" // Tells frontend to re-fetch via REST (last-resort).
// maxMessageSize caps the WebSocket payload. Beyond this the hub sends a
// lightweight invalidate signal and the frontend re-fetches via REST.
// 10MB lets typical 2k8k-client deployments push directly via WS (low
// latency); larger installs fall back to invalidate.
maxMessageSize = 10 * 1024 * 1024 // 10MB
enqueueTimeout = 100 * time.Millisecond
clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser.
hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts.
hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms).
// minBroadcastInterval throttles per-type broadcasts so cron storms or
// rapid mutations cannot drown the hub. Bursts within the interval are
// dropped (not coalesced); the next broadcast outside the window delivers
// the latest state. Only message types in throttledMessageTypes are gated —
// heartbeat and real-time signals (status, traffic, client_stats,
// notification, xray_state, invalidate) bypass this so they are never delayed.
minBroadcastInterval = 250 * time.Millisecond
// hubRestartAttempts caps panic-recovery restarts. After this many
// consecutive failures we stop trying and log; the panel keeps running
// (frontend falls back to REST polling) and the operator can investigate.
hubRestartAttempts = 3
)
// NewClient builds a Client ready for hub registration.
func NewClient(id string) *Client {
return &Client{
ID: id,
Send: make(chan []byte, clientSendQueue),
}
}
// Message is the wire format sent to clients.
type Message struct {
Type MessageType `json:"type"`
Payload any `json:"payload"`
Time int64 `json:"time"`
}
// Client represents a single WebSocket connection.
type Client struct {
ID string
Send chan []byte
closeOnce sync.Once
}
// Hub fan-outs messages to all connected clients.
type Hub struct {
clients map[*Client]struct{}
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
throttleMu sync.Mutex
lastBroadcast map[MessageType]time.Time
}
// NewHub creates a hub. Call Run in a goroutine to start its event loop.
func NewHub() *Hub {
ctx, cancel := context.WithCancel(context.Background())
return &Hub{
clients: make(map[*Client]struct{}),
broadcast: make(chan []byte, hubBroadcastQueue),
register: make(chan *Client, hubControlQueue),
unregister: make(chan *Client, hubControlQueue),
ctx: ctx,
cancel: cancel,
lastBroadcast: make(map[MessageType]time.Time),
}
}
// throttledMessageTypes is the explicit allow-list of message types subject to
// the per-type rate limit. Everything else (status, traffic, client_stats,
// notification, xray_state, invalidate) is heartbeat- or signal-class and must
// not be delayed. Keeping the set explicit (vs. an exclusion list) makes the
// intent obvious when new message types are added — by default they bypass.
var throttledMessageTypes = map[MessageType]struct{}{
MessageTypeInbounds: {},
MessageTypeOutbounds: {},
}
// shouldThrottle returns true if a broadcast of msgType is rate-limited and
// happened within minBroadcastInterval of the previous one. Only message types
// in throttledMessageTypes are gated.
func (h *Hub) shouldThrottle(msgType MessageType) bool {
if _, gated := throttledMessageTypes[msgType]; !gated {
return false
}
h.throttleMu.Lock()
defer h.throttleMu.Unlock()
now := time.Now()
if last, ok := h.lastBroadcast[msgType]; ok && now.Sub(last) < minBroadcastInterval {
return true
}
h.lastBroadcast[msgType] = now
return false
}
// Run drives the hub. The inner loop is wrapped in a panic-recovery harness
// that retries up to hubRestartAttempts times with backoff so a transient
// panic doesn't permanently kill real-time updates for commercial deployments.
// After the cap, the hub stays down and the frontend falls back to REST polling.
func (h *Hub) Run() {
for attempt := 0; attempt < hubRestartAttempts; attempt++ {
stopped := h.runOnce()
if stopped {
return
}
if attempt < hubRestartAttempts-1 {
wait := time.Duration(1<<attempt) * time.Second // 1s, 2s, 4s
logger.Errorf("WebSocket hub crashed, restarting in %s (%d/%d)", wait, attempt+1, hubRestartAttempts-1)
select {
case <-time.After(wait):
case <-h.ctx.Done():
return
}
}
}
logger.Error("WebSocket hub stopped after exhausting restart attempts")
}
// runOnce drives the event loop once and returns true if the hub stopped
// cleanly (context cancelled). On panic, recover logs and returns false so
// Run can decide whether to retry.
func (h *Hub) runOnce() (stopped bool) {
defer func() {
if r := recover(); r != nil {
logger.Errorf("WebSocket hub panic recovered: %v", r)
stopped = false
}
}()
for {
select {
case <-h.ctx.Done():
h.shutdown()
return true
case c := <-h.register:
if c == nil {
continue
}
h.mu.Lock()
h.clients[c] = struct{}{}
n := len(h.clients)
h.mu.Unlock()
logger.Debugf("WebSocket client connected: %s (total: %d)", c.ID, n)
case c := <-h.unregister:
if c == nil {
continue
}
h.removeClient(c)
case msg := <-h.broadcast:
h.fanout(msg)
}
}
}
// shutdown closes all client send channels and clears the registry.
func (h *Hub) shutdown() {
h.mu.Lock()
for c := range h.clients {
c.closeOnce.Do(func() { close(c.Send) })
}
h.clients = make(map[*Client]struct{})
h.mu.Unlock()
logger.Info("WebSocket hub stopped")
}
// removeClient deletes a client and closes its send channel exactly once.
func (h *Hub) removeClient(c *Client) {
h.mu.Lock()
if _, ok := h.clients[c]; ok {
delete(h.clients, c)
c.closeOnce.Do(func() { close(c.Send) })
}
n := len(h.clients)
h.mu.Unlock()
logger.Debugf("WebSocket client disconnected: %s (total: %d)", c.ID, n)
}
// fanout delivers msg to every client. Each send is non-blocking — a client
// whose buffer is full is collected for direct removal at the end. We do NOT
// route slow-client unregistrations through the unregister channel: under
// burst load (panel restart, network blip) that channel can fill up while the
// hub itself is the consumer, causing a self-deadlock.
func (h *Hub) fanout(msg []byte) {
if msg == nil {
return
}
h.mu.RLock()
if len(h.clients) == 0 {
h.mu.RUnlock()
return
}
targets := make([]*Client, 0, len(h.clients))
for c := range h.clients {
targets = append(targets, c)
}
h.mu.RUnlock()
var dead []*Client
for _, c := range targets {
if !trySend(c, msg) {
dead = append(dead, c)
}
}
if len(dead) == 0 {
return
}
h.mu.Lock()
for _, c := range dead {
if _, ok := h.clients[c]; ok {
delete(h.clients, c)
c.closeOnce.Do(func() { close(c.Send) })
logger.Debugf("WebSocket client %s send buffer full, disconnected", c.ID)
}
}
h.mu.Unlock()
}
// trySend performs a non-blocking write to the client's Send channel.
// Returns false if the client should be evicted (full buffer or closed channel).
// A defer-recover guards against the rare race where the channel was closed
// concurrently — sending on a closed channel always panics, even with select+default.
func trySend(c *Client, msg []byte) (ok bool) {
defer func() {
if r := recover(); r != nil {
ok = false
}
}()
select {
case c.Send <- msg:
return true
default:
return false
}
}
// Broadcast serializes payload and queues it for delivery to all clients.
// If the serialized message exceeds maxMessageSize, an invalidate signal is
// queued instead so the frontend re-fetches via REST. Broadcasts of throttled
// message types (see throttledMessageTypes) within minBroadcastInterval of
// the previous one are dropped — the next legitimate mutation will push the
// fresh state.
func (h *Hub) Broadcast(messageType MessageType, payload any) {
if h == nil || payload == nil || h.GetClientCount() == 0 {
return
}
if h.shouldThrottle(messageType) {
return
}
data, err := json.Marshal(Message{
Type: messageType,
Payload: payload,
Time: time.Now().UnixMilli(),
})
if err != nil {
logger.Error("WebSocket marshal failed:", err)
return
}
if len(data) > maxMessageSize {
logger.Debugf("WebSocket payload %d bytes exceeds limit, sending invalidate for %s", len(data), messageType)
h.broadcastInvalidate(messageType)
return
}
h.enqueue(data)
}
// broadcastInvalidate queues a lightweight signal telling clients to re-fetch
// the named data type via REST.
func (h *Hub) broadcastInvalidate(originalType MessageType) {
data, err := json.Marshal(Message{
Type: MessageTypeInvalidate,
Payload: map[string]string{"type": string(originalType)},
Time: time.Now().UnixMilli(),
})
if err != nil {
logger.Error("WebSocket invalidate marshal failed:", err)
return
}
h.enqueue(data)
}
// enqueue submits raw bytes to the broadcast channel. Dropped on backpressure
// (channel full for >100ms) or shutdown.
func (h *Hub) enqueue(data []byte) {
select {
case h.broadcast <- data:
case <-time.After(enqueueTimeout):
logger.Warning("WebSocket broadcast channel full, dropping message")
case <-h.ctx.Done():
}
}
// GetClientCount returns the number of connected clients.
func (h *Hub) GetClientCount() int {
if h == nil {
return 0
}
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
// Register adds a client to the hub.
func (h *Hub) Register(c *Client) {
if h == nil || c == nil {
return
}
select {
case h.register <- c:
case <-h.ctx.Done():
}
}
// Unregister removes a client from the hub. Fast path queues for the hub
// goroutine; if the channel is saturated (disconnect storm) we fall back
// to a direct removal under the write lock so dead clients aren't left in
// the registry waiting for their Send buffer to fill (minutes of wasted
// fanout work at low broadcast rates).
//
// Direct removal is safe from any caller: external goroutines (read/write
// pumps) hold no hub locks, and the hub goroutine itself never holds h.mu
// when it calls Unregister — fanout releases its RLock before per-client
// sends, so we can't self-deadlock here.
func (h *Hub) Unregister(c *Client) {
if h == nil || c == nil {
return
}
select {
case h.unregister <- c:
default:
h.removeClient(c)
}
}
// Stop signals the hub to shut down and close all client connections.
func (h *Hub) Stop() {
if h != nil && h.cancel != nil {
h.cancel()
}
}