From 1ead940a71f61171755db0908234e10071ac0f15 Mon Sep 17 00:00:00 2001 From: Yury Kastov Date: Sat, 2 May 2026 16:32:59 +0300 Subject: [PATCH] Config: Parallel `for` for inbounds' `clients` (#6055) https://github.com/XTLS/Xray-core/pull/6055#issuecomment-4360958652 --- common/task/parallel.go | 43 +++++++++++++++++++++++++++++ common/task/parallel_test.go | 50 ++++++++++++++++++++++++++++++++++ infra/conf/hysteria.go | 19 +++++++++---- infra/conf/shadowsocks.go | 53 +++++++++++++++++++++++------------- infra/conf/trojan.go | 10 +++++-- infra/conf/vless.go | 23 ++++++++++------ infra/conf/vmess.go | 14 +++++++--- 7 files changed, 173 insertions(+), 39 deletions(-) create mode 100644 common/task/parallel.go create mode 100644 common/task/parallel_test.go diff --git a/common/task/parallel.go b/common/task/parallel.go new file mode 100644 index 00000000..b20d5285 --- /dev/null +++ b/common/task/parallel.go @@ -0,0 +1,43 @@ +package task + +import ( + "runtime" + + "golang.org/x/sync/errgroup" +) + +// ParallelForN runs fn(0..n-1) in parallel across runtime.GOMAXPROCS(0) worker +// goroutines. Indices are partitioned into contiguous chunks so the number of +// spawned goroutines stays bounded regardless of n. +// +// fn must be safe to call concurrently from different goroutines (each call +// receives its own unique index). Output collected by writing to indexed slots +// in a pre-allocated slice is a common safe pattern. +// +// Returns the first non-nil error reported by fn; other workers may still be +// finishing briefly afterwards. +func ParallelForN(n int, fn func(i int) error) error { + if n <= 0 { + return nil + } + workers := max(runtime.GOMAXPROCS(0), 1) + workers = min(workers, n) + chunk := (n + workers - 1) / workers + var eg errgroup.Group + for w := range workers { + start := w * chunk + end := min(start+chunk, n) + if start >= end { + break + } + eg.Go(func() error { + for i := start; i < end; i++ { + if err := fn(i); err != nil { + return err + } + } + return nil + }) + } + return eg.Wait() +} diff --git a/common/task/parallel_test.go b/common/task/parallel_test.go new file mode 100644 index 00000000..82354f0d --- /dev/null +++ b/common/task/parallel_test.go @@ -0,0 +1,50 @@ +package task_test + +import ( + "errors" + "sync/atomic" + "testing" + + "github.com/xtls/xray-core/common" + . "github.com/xtls/xray-core/common/task" +) + +func TestParallelForN_Empty(t *testing.T) { + called := false + err := ParallelForN(0, func(i int) error { + called = true + return nil + }) + common.Must(err) + if called { + t.Fatal("fn should not be called when n=0") + } +} + +func TestParallelForN_AllIndicesCovered(t *testing.T) { + const N = 10000 + var seen [N]int32 + err := ParallelForN(N, func(i int) error { + atomic.AddInt32(&seen[i], 1) + return nil + }) + common.Must(err) + for i := 0; i < N; i++ { + if seen[i] != 1 { + t.Fatalf("index %d called %d times, expected 1", i, seen[i]) + } + } +} + +func TestParallelForN_Error(t *testing.T) { + boom := errors.New("boom") + err := ParallelForN(1000, func(i int) error { + if i == 42 { + return boom + } + return nil + }) + if err != boom { + t.Fatalf("expected %v, got %v", boom, err) + } +} diff --git a/infra/conf/hysteria.go b/infra/conf/hysteria.go index 3574811c..2c2bede5 100644 --- a/infra/conf/hysteria.go +++ b/infra/conf/hysteria.go @@ -4,6 +4,7 @@ import ( "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/serial" + "github.com/xtls/xray-core/common/task" "github.com/xtls/xray-core/proxy/hysteria" "github.com/xtls/xray-core/proxy/hysteria/account" "google.golang.org/protobuf/proto" @@ -44,16 +45,22 @@ type HysteriaServerConfig struct { func (c *HysteriaServerConfig) Build() (proto.Message, error) { config := new(hysteria.ServerConfig) - if c.Users != nil { - for _, user := range c.Users { - account := &account.Account{ + if len(c.Users) > 0 { + config.Users = make([]*protocol.User, len(c.Users)) + processUser := func(idx int) error { + user := c.Users[idx] + acc := &account.Account{ Auth: user.Auth, } - config.Users = append(config.Users, &protocol.User{ + config.Users[idx] = &protocol.User{ Email: user.Email, Level: user.Level, - Account: serial.ToTypedMessage(account), - }) + Account: serial.ToTypedMessage(acc), + } + return nil + } + if err := task.ParallelForN(len(c.Users), processUser); err != nil { + return nil, err } } diff --git a/infra/conf/shadowsocks.go b/infra/conf/shadowsocks.go index 490c2997..3b542303 100644 --- a/infra/conf/shadowsocks.go +++ b/infra/conf/shadowsocks.go @@ -8,6 +8,7 @@ import ( "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/serial" + "github.com/xtls/xray-core/common/task" "github.com/xtls/xray-core/proxy/shadowsocks" "github.com/xtls/xray-core/proxy/shadowsocks_2022" "google.golang.org/protobuf/proto" @@ -59,23 +60,31 @@ func (v *ShadowsocksServerConfig) Build() (proto.Message, error) { config.Network = v.NetworkList.Build() if v.Users != nil { - for _, user := range v.Users { - account := &shadowsocks.Account{ - Password: user.Password, - CipherType: cipherFromString(user.Cipher), + if len(v.Users) > 0 { + config.Users = make([]*protocol.User, len(v.Users)) + processUser := func(idx int) error { + user := v.Users[idx] + account := &shadowsocks.Account{ + Password: user.Password, + CipherType: cipherFromString(user.Cipher), + } + if account.Password == "" { + return errors.New("Shadowsocks password is not specified.") + } + if account.CipherType < shadowsocks.CipherType_AES_128_GCM || + account.CipherType > shadowsocks.CipherType_XCHACHA20_POLY1305 { + return errors.New("unsupported cipher method: ", user.Cipher) + } + config.Users[idx] = &protocol.User{ + Email: user.Email, + Level: uint32(user.Level), + Account: serial.ToTypedMessage(account), + } + return nil } - if account.Password == "" { - return nil, errors.New("Shadowsocks password is not specified.") + if err := task.ParallelForN(len(v.Users), processUser); err != nil { + return nil, err } - if account.CipherType < shadowsocks.CipherType_AES_128_GCM || - account.CipherType > shadowsocks.CipherType_XCHACHA20_POLY1305 { - return nil, errors.New("unsupported cipher method: ", user.Cipher) - } - config.Users = append(config.Users, &protocol.User{ - Email: user.Email, - Level: uint32(user.Level), - Account: serial.ToTypedMessage(account), - }) } } else { account := &shadowsocks.Account{ @@ -121,18 +130,24 @@ func buildShadowsocks2022(v *ShadowsocksServerConfig) (proto.Message, error) { config.Key = v.Password config.Network = v.NetworkList.Build() - for _, user := range v.Users { + config.Users = make([]*protocol.User, len(v.Users)) + processUser := func(idx int) error { + user := v.Users[idx] if user.Cipher != "" { - return nil, errors.New("shadowsocks 2022 (multi-user): users must have empty method") + return errors.New("shadowsocks 2022 (multi-user): users must have empty method") } account := &shadowsocks_2022.Account{ Key: user.Password, } - config.Users = append(config.Users, &protocol.User{ + config.Users[idx] = &protocol.User{ Email: user.Email, Level: uint32(user.Level), Account: serial.ToTypedMessage(account), - }) + } + return nil + } + if err := task.ParallelForN(len(v.Users), processUser); err != nil { + return nil, err } return config, nil } diff --git a/infra/conf/trojan.go b/infra/conf/trojan.go index b78b6ffc..97f1b79f 100644 --- a/infra/conf/trojan.go +++ b/infra/conf/trojan.go @@ -12,6 +12,7 @@ import ( "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/serial" + "github.com/xtls/xray-core/common/task" "github.com/xtls/xray-core/proxy/trojan" "google.golang.org/protobuf/proto" ) @@ -123,9 +124,10 @@ func (c *TrojanServerConfig) Build() (proto.Message, error) { Users: make([]*protocol.User, len(c.Clients)), } - for idx, rawUser := range c.Clients { + processClient := func(idx int) error { + rawUser := c.Clients[idx] if rawUser.Flow != "" { - return nil, errors.PrintRemovedFeatureError(`Flow for Trojan`, ``) + return errors.PrintRemovedFeatureError(`Flow for Trojan`, ``) } config.Users[idx] = &protocol.User{ @@ -135,6 +137,10 @@ func (c *TrojanServerConfig) Build() (proto.Message, error) { Password: rawUser.Password, }), } + return nil + } + if err := task.ParallelForN(len(c.Clients), processClient); err != nil { + return nil, err } for _, fb := range c.Fallbacks { diff --git a/infra/conf/vless.go b/infra/conf/vless.go index 1e14882c..669be539 100644 --- a/infra/conf/vless.go +++ b/infra/conf/vless.go @@ -13,6 +13,7 @@ import ( "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/serial" + "github.com/xtls/xray-core/common/task" "github.com/xtls/xray-core/common/uuid" "github.com/xtls/xray-core/proxy/vless" "github.com/xtls/xray-core/proxy/vless/inbound" @@ -46,19 +47,20 @@ func (c *VLessInboundConfig) Build() (proto.Message, error) { default: return nil, errors.New(`VLESS "settings.flow" doesn't support "` + c.Flow + `" in this version`) } - for idx, rawUser := range c.Clients { + processClient := func(idx int) error { + rawUser := c.Clients[idx] user := new(protocol.User) if err := json.Unmarshal(rawUser, user); err != nil { - return nil, errors.New(`VLESS clients: invalid user`).Base(err) + return errors.New(`VLESS clients: invalid user`).Base(err) } account := new(vless.Account) if err := json.Unmarshal(rawUser, account); err != nil { - return nil, errors.New(`VLESS clients: invalid user`).Base(err) + return errors.New(`VLESS clients: invalid user`).Base(err) } u, err := uuid.ParseString(account.Id) if err != nil { - return nil, err + return err } account.Id = u.String() @@ -67,7 +69,7 @@ func (c *VLessInboundConfig) Build() (proto.Message, error) { account.Flow = c.Flow case vless.XRV: default: - return nil, errors.New(`VLESS clients: "flow" doesn't support "` + account.Flow + `" in this version`) + return errors.New(`VLESS clients: "flow" doesn't support "` + account.Flow + `" in this version`) } if len(account.Testseed) < 4 { @@ -75,20 +77,25 @@ func (c *VLessInboundConfig) Build() (proto.Message, error) { } if account.Encryption != "" { - return nil, errors.New(`VLESS clients: "encryption" should not be in inbound settings`) + return errors.New(`VLESS clients: "encryption" should not be in inbound settings`) } if account.Reverse != nil { if account.Reverse.Tag == "" { - return nil, errors.New(`VLESS clients: "tag" can't be empty for "reverse"`) + return errors.New(`VLESS clients: "tag" can't be empty for "reverse"`) } if account.Reverse.Sniffing != nil { // may not be reached: error json unmarshal - return nil, errors.New(`VLESS clients: inbound's "reverse" can't have "sniffing"`) + return errors.New(`VLESS clients: inbound's "reverse" can't have "sniffing"`) } } user.Account = serial.ToTypedMessage(account) config.Clients[idx] = user + return nil + } + + if err := task.ParallelForN(len(c.Clients), processClient); err != nil { + return nil, err } config.Decryption = c.Decryption diff --git a/infra/conf/vmess.go b/infra/conf/vmess.go index cb8ff9c9..cb982254 100644 --- a/infra/conf/vmess.go +++ b/infra/conf/vmess.go @@ -7,6 +7,7 @@ import ( "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/serial" + "github.com/xtls/xray-core/common/task" "github.com/xtls/xray-core/common/uuid" "github.com/xtls/xray-core/proxy/vmess" "github.com/xtls/xray-core/proxy/vmess/inbound" @@ -73,24 +74,29 @@ func (c *VMessInboundConfig) Build() (proto.Message, error) { } config.User = make([]*protocol.User, len(c.Users)) - for idx, rawData := range c.Users { + processUser := func(idx int) error { + rawData := c.Users[idx] user := new(protocol.User) if err := json.Unmarshal(rawData, user); err != nil { - return nil, errors.New("invalid VMess user").Base(err) + return errors.New("invalid VMess user").Base(err) } account := new(VMessAccount) if err := json.Unmarshal(rawData, account); err != nil { - return nil, errors.New("invalid VMess user").Base(err) + return errors.New("invalid VMess user").Base(err) } u, err := uuid.ParseString(account.ID) if err != nil { - return nil, err + return err } account.ID = u.String() user.Account = serial.ToTypedMessage(account.Build()) config.User[idx] = user + return nil + } + if err := task.ParallelForN(len(c.Users), processUser); err != nil { + return nil, err } return config, nil