mirror of
https://github.com/XTLS/Xray-core.git
synced 2026-05-08 14:13:22 +00:00
Config: Parallel for for inbounds' clients (#6055)
https://github.com/XTLS/Xray-core/pull/6055#issuecomment-4360958652
This commit is contained in:
43
common/task/parallel.go
Normal file
43
common/task/parallel.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// ParallelForN runs fn(0..n-1) in parallel across runtime.GOMAXPROCS(0) worker
|
||||
// goroutines. Indices are partitioned into contiguous chunks so the number of
|
||||
// spawned goroutines stays bounded regardless of n.
|
||||
//
|
||||
// fn must be safe to call concurrently from different goroutines (each call
|
||||
// receives its own unique index). Output collected by writing to indexed slots
|
||||
// in a pre-allocated slice is a common safe pattern.
|
||||
//
|
||||
// Returns the first non-nil error reported by fn; other workers may still be
|
||||
// finishing briefly afterwards.
|
||||
func ParallelForN(n int, fn func(i int) error) error {
|
||||
if n <= 0 {
|
||||
return nil
|
||||
}
|
||||
workers := max(runtime.GOMAXPROCS(0), 1)
|
||||
workers = min(workers, n)
|
||||
chunk := (n + workers - 1) / workers
|
||||
var eg errgroup.Group
|
||||
for w := range workers {
|
||||
start := w * chunk
|
||||
end := min(start+chunk, n)
|
||||
if start >= end {
|
||||
break
|
||||
}
|
||||
eg.Go(func() error {
|
||||
for i := start; i < end; i++ {
|
||||
if err := fn(i); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
50
common/task/parallel_test.go
Normal file
50
common/task/parallel_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package task_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/xtls/xray-core/common"
|
||||
. "github.com/xtls/xray-core/common/task"
|
||||
)
|
||||
|
||||
func TestParallelForN_Empty(t *testing.T) {
|
||||
called := false
|
||||
err := ParallelForN(0, func(i int) error {
|
||||
called = true
|
||||
return nil
|
||||
})
|
||||
common.Must(err)
|
||||
if called {
|
||||
t.Fatal("fn should not be called when n=0")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParallelForN_AllIndicesCovered(t *testing.T) {
|
||||
const N = 10000
|
||||
var seen [N]int32
|
||||
err := ParallelForN(N, func(i int) error {
|
||||
atomic.AddInt32(&seen[i], 1)
|
||||
return nil
|
||||
})
|
||||
common.Must(err)
|
||||
for i := 0; i < N; i++ {
|
||||
if seen[i] != 1 {
|
||||
t.Fatalf("index %d called %d times, expected 1", i, seen[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParallelForN_Error(t *testing.T) {
|
||||
boom := errors.New("boom")
|
||||
err := ParallelForN(1000, func(i int) error {
|
||||
if i == 42 {
|
||||
return boom
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != boom {
|
||||
t.Fatalf("expected %v, got %v", boom, err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user