mirror of
https://github.com/XTLS/Xray-core.git
synced 2026-05-08 14:13:22 +00:00
Simplified fix: Set workers to peer count for multi-peer support
The issue was that with only 1 worker (default), when a peer's reader goroutine blocked waiting for data, it prevented other peers from receiving packets. Simple solution: Automatically set workers to the number of peers if not explicitly configured. This allows each peer to have its own worker thread for concurrent packet reception. - Reverted complex architectural changes - Added simple logic to set workers = len(peers) when NumWorkers not set - Much simpler and easier to understand than previous approach Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
This commit is contained in:
@@ -25,23 +25,13 @@ type netReadInfo struct {
|
||||
err error
|
||||
}
|
||||
|
||||
// receivedPacket represents a packet received from a peer connection
|
||||
type receivedPacket struct {
|
||||
data []byte
|
||||
endpoint conn.Endpoint
|
||||
err error
|
||||
}
|
||||
|
||||
// reduce duplicated code
|
||||
type netBind struct {
|
||||
dns dns.Client
|
||||
dnsOption dns.IPOption
|
||||
|
||||
workers int
|
||||
readQueue chan *netReadInfo
|
||||
packetQueue chan *receivedPacket
|
||||
startedMutex sync.Mutex
|
||||
started bool
|
||||
workers int
|
||||
readQueue chan *netReadInfo
|
||||
}
|
||||
|
||||
// SetMark implements conn.Bind
|
||||
@@ -90,35 +80,6 @@ func (bind *netBind) BatchSize() int {
|
||||
// Open implements conn.Bind
|
||||
func (bind *netBind) Open(uport uint16) ([]conn.ReceiveFunc, uint16, error) {
|
||||
bind.readQueue = make(chan *netReadInfo)
|
||||
bind.packetQueue = make(chan *receivedPacket, 100)
|
||||
|
||||
// Start a dispatcher goroutine that matches readQueue requests with received packets
|
||||
bind.startedMutex.Lock()
|
||||
if !bind.started {
|
||||
bind.started = true
|
||||
go func() {
|
||||
for {
|
||||
packet, ok := <-bind.packetQueue
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for a read request from WireGuard
|
||||
request, ok := <-bind.readQueue
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Copy packet data to the request buffer
|
||||
n := copy(request.buff, packet.data)
|
||||
request.bytes = n
|
||||
request.endpoint = packet.endpoint
|
||||
request.err = packet.err
|
||||
request.waiter.Done()
|
||||
}
|
||||
}()
|
||||
}
|
||||
bind.startedMutex.Unlock()
|
||||
|
||||
fun := func(bufs [][]byte, sizes []int, eps []conn.Endpoint) (n int, err error) {
|
||||
defer func() {
|
||||
@@ -154,9 +115,6 @@ func (bind *netBind) Close() error {
|
||||
if bind.readQueue != nil {
|
||||
close(bind.readQueue)
|
||||
}
|
||||
if bind.packetQueue != nil {
|
||||
close(bind.packetQueue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -175,44 +133,30 @@ func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
|
||||
}
|
||||
endpoint.conn = c
|
||||
|
||||
// Start a goroutine that continuously reads from this connection
|
||||
// and sends received packets to the packet queue
|
||||
go func(conn net.Conn, endpoint *netEndpoint) {
|
||||
const maxPacketSize = 1500
|
||||
go func(readQueue <-chan *netReadInfo, endpoint *netEndpoint) {
|
||||
for {
|
||||
buf := make([]byte, maxPacketSize)
|
||||
n, err := conn.Read(buf)
|
||||
|
||||
// Only process data if we successfully read something
|
||||
if err == nil && n > 3 {
|
||||
// Clear reserved bytes
|
||||
buf[1] = 0
|
||||
buf[2] = 0
|
||||
buf[3] = 0
|
||||
}
|
||||
|
||||
packet := &receivedPacket{
|
||||
data: buf[:n],
|
||||
endpoint: endpoint,
|
||||
err: err,
|
||||
}
|
||||
|
||||
// Try to send packet to queue; if queue is full or closed, exit
|
||||
select {
|
||||
case bind.packetQueue <- packet:
|
||||
// Packet sent successfully
|
||||
default:
|
||||
// Queue is full or closed, exit goroutine
|
||||
endpoint.conn = nil
|
||||
v, ok := <-readQueue
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
i, err := c.Read(v.buff)
|
||||
|
||||
if i > 3 {
|
||||
v.buff[1] = 0
|
||||
v.buff[2] = 0
|
||||
v.buff[3] = 0
|
||||
}
|
||||
|
||||
v.bytes = i
|
||||
v.endpoint = endpoint
|
||||
v.err = err
|
||||
v.waiter.Done()
|
||||
if err != nil {
|
||||
endpoint.conn = nil
|
||||
return
|
||||
}
|
||||
}
|
||||
}(c, endpoint)
|
||||
}(bind.readQueue, endpoint)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -114,8 +114,12 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
|
||||
}
|
||||
|
||||
// bind := conn.NewStdNetBind() // TODO: conn.Bind wrapper for dialer
|
||||
// Use a detached context for the bind to avoid tying all peer connections
|
||||
// to a single request context. This allows multiple peers to work independently.
|
||||
// Set workers to number of peers if not explicitly configured
|
||||
// This allows concurrent packet reception from multiple peers
|
||||
workers := int(h.conf.NumWorkers)
|
||||
if workers <= 0 && len(h.conf.Peers) > 0 {
|
||||
workers = len(h.conf.Peers)
|
||||
}
|
||||
h.bind = &netBindClient{
|
||||
netBind: netBind{
|
||||
dns: h.dns,
|
||||
@@ -123,9 +127,9 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
|
||||
IPv4Enable: h.hasIPv4,
|
||||
IPv6Enable: h.hasIPv6,
|
||||
},
|
||||
workers: int(h.conf.NumWorkers),
|
||||
workers: workers,
|
||||
},
|
||||
ctx: core.ToBackgroundDetachedContext(ctx),
|
||||
ctx: ctx,
|
||||
dialer: dialer,
|
||||
reserved: h.conf.Reserved,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user