fix: enhance WebSocket stability, resolve XHTTP configurations and fix UI loading shifts (#3997)

* feat: implement real-time traffic monitoring and UI updates using a high-performance WebSocket hub and background job system

* feat: add bulk client management support and improve inbound data handling

* Fix bug

* **Fixes & Changes:**
1. **Fixed XPadding Placement Dropdown**:
   - Added the missing `cookie` and `query` options to `xPaddingPlacement` (`stream_xhttp.html`).
   - *Why:* Previously, users wanting `cookie` obfuscation were forced to use the `header` placement string. This caused Xray-core to blindly intercept the entire monolithic HTTP Cookie header, failing internal padding-length validations and causing the inbound to silently drop the connection.
2. **Fixed Uplink Data Placement Validation**:
   - Replaced the unsupported `query` option with `cookie` in `uplinkDataPlacement`.
   - *Why:* Xray-core's `transport_internet.go` explicitly forbids `query` as an uplink placement option. Selecting it from the UI previously sent a payload that would cause Xray-core to instantly throw an `unsupported uplink data placement: query` panic. Adding `cookie` perfectly aligns the UI with Xray-core restrictions.
### Related Issues
- Resolves #3992

* This commit fixes structural payload issues preventing XHTTP from functioning correctly and eliminates WebSocket log spam.
- **[Fix X-Padding UI]** Added missing `cookie` and `query` options to X-Padding Placement. Fixes the issue where using Cookie fallback triggers whole HTTP Cookie header interception and silent drop in Xray-core. (Resolves [#3992](https://github.com/MHSanaei/3x-ui/issues/3992))
- **[Fix Uplink Data Options]** Replaced the invalid `query` option with `cookie` in Uplink Data Placement dropdown to prevent Xray-core backend panic `unsupported uplink data placement: query`.
- **[Fix WebSockets Spam]** Boosted `maxMessageSize` boundary to 100MB and gracefully handled fallback fetch signals via `broadcastInvalidate` to avoid buffer dropping spam. (Resolves [#3984](https://github.com/MHSanaei/3x-ui/issues/3984))

* Fix

* gofmt

* fix(websocket): resolve channel race condition and graceful shutdown deadlock

* Fix: inbounds switch

* Change max quantity from 10000 to 500

* fix
This commit is contained in:
lolka1333
2026-04-19 22:01:00 +03:00
committed by GitHub
parent e02f78ac68
commit fec714a243
16 changed files with 291 additions and 132 deletions

View File

@@ -21,6 +21,7 @@ const (
MessageTypeNotification MessageType = "notification" // System notification
MessageTypeXrayState MessageType = "xray_state" // Xray state change
MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update
MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST
)
// Message represents a WebSocket message
@@ -32,10 +33,11 @@ type Message struct {
// Client represents a WebSocket client connection
type Client struct {
ID string
Send chan []byte
Hub *Hub
Topics map[MessageType]bool // Subscribed topics
ID string
Send chan []byte
Hub *Hub
Topics map[MessageType]bool // Subscribed topics
closeOnce sync.Once // Ensures Send channel is closed exactly once
}
// Hub maintains the set of active clients and broadcasts messages to them
@@ -61,7 +63,6 @@ type Hub struct {
// Worker pool for parallel broadcasting
workerPoolSize int
broadcastWg sync.WaitGroup
}
// NewHub creates a new WebSocket hub
@@ -104,20 +105,12 @@ func (h *Hub) Run() {
// Graceful shutdown: close all clients
h.mu.Lock()
for client := range h.clients {
// Safely close channel (avoid double close panic)
select {
case _, stillOpen := <-client.Send:
if stillOpen {
close(client.Send)
}
default:
client.closeOnce.Do(func() {
close(client.Send)
}
})
}
h.clients = make(map[*Client]bool)
h.mu.Unlock()
// Wait for all broadcast workers to finish
h.broadcastWg.Wait()
logger.Info("WebSocket hub stopped gracefully")
return
@@ -138,19 +131,9 @@ func (h *Hub) Run() {
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
// Safely close channel (avoid double close panic)
// Check if channel is already closed by trying to read from it
select {
case _, stillOpen := <-client.Send:
if stillOpen {
// Channel was open and had data, now it's empty, safe to close
close(client.Send)
}
// If stillOpen is false, channel was already closed, do nothing
default:
// Channel is empty and open, safe to close
client.closeOnce.Do(func() {
close(client.Send)
}
})
}
count := len(h.clients)
h.mu.Unlock()
@@ -220,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
}
close(clientChan)
// Start workers for parallel processing
h.broadcastWg.Add(h.workerPoolSize)
// Use a local WaitGroup to avoid blocking hub shutdown
var wg sync.WaitGroup
wg.Add(h.workerPoolSize)
for i := 0; i < h.workerPoolSize; i++ {
go func() {
defer h.broadcastWg.Done()
defer wg.Done()
for client := range clientChan {
func() {
defer func() {
@@ -246,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
}
// Wait for all workers to finish
h.broadcastWg.Wait()
wg.Wait()
}
// Broadcast sends a message to all connected clients
@@ -259,6 +243,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
return
}
// Skip all work if no clients are connected
if h.GetClientCount() == 0 {
return
}
msg := Message{
Type: messageType,
Payload: payload,
@@ -271,10 +260,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
return
}
// Limit message size to prevent memory issues
const maxMessageSize = 1024 * 1024 // 1MB
// If message exceeds size limit, send a lightweight invalidate notification
// instead of dropping it entirely — the frontend will re-fetch via REST API
const maxMessageSize = 10 * 1024 * 1024 // 10MB
if len(data) > maxMessageSize {
logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
h.broadcastInvalidate(messageType)
return
}
@@ -298,6 +289,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
return
}
// Skip all work if no clients are connected
if h.GetClientCount() == 0 {
return
}
msg := Message{
Type: messageType,
Payload: payload,
@@ -310,10 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
return
}
// Limit message size to prevent memory issues
const maxMessageSize = 1024 * 1024 // 1MB
// If message exceeds size limit, send a lightweight invalidate notification
const maxMessageSize = 10 * 1024 * 1024 // 10MB
if len(data) > maxMessageSize {
logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
h.broadcastInvalidate(messageType)
return
}
@@ -374,6 +371,31 @@ func (h *Hub) Stop() {
}
}
// broadcastInvalidate sends a lightweight invalidate message to all clients,
// telling them to re-fetch the specified data type via REST API.
// This is used when the full payload exceeds the WebSocket message size limit.
func (h *Hub) broadcastInvalidate(originalType MessageType) {
msg := Message{
Type: MessageTypeInvalidate,
Payload: map[string]string{"type": string(originalType)},
Time: getCurrentTimestamp(),
}
data, err := json.Marshal(msg)
if err != nil {
logger.Error("Failed to marshal invalidate message:", err)
return
}
// Non-blocking send with timeout
select {
case h.broadcast <- data:
case <-time.After(100 * time.Millisecond):
logger.Warning("WebSocket broadcast channel is full, dropping invalidate message")
case <-h.ctx.Done():
}
}
// getCurrentTimestamp returns current Unix timestamp in milliseconds
func getCurrentTimestamp() int64 {
return time.Now().UnixMilli()