Loopback outbound: Use DispatchLink() (#6005)

https://github.com/XTLS/Xray-core/pull/6000

Fixes https://github.com/XTLS/Xray-core/issues/5917
This commit is contained in:
风扇滑翔翼
2026-04-26 00:48:49 +08:00
committed by GitHub
parent bc590bcb56
commit 454c930d13

View File

@@ -4,13 +4,8 @@ import (
"context" "context"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/net/cnc"
"github.com/xtls/xray-core/common/retry"
"github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/core" "github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport" "github.com/xtls/xray-core/transport"
@@ -32,83 +27,24 @@ func (l *Loopback) Process(ctx context.Context, link *transport.Link, _ internet
destination := ob.Target destination := ob.Target
errors.LogInfo(ctx, "opening connection to ", destination) errors.LogInfo(ctx, "opening connection to ", destination)
content := new(session.Content)
content.SkipDNSResolve = true
input := link.Reader ctx = session.ContextWithContent(ctx, content)
output := link.Writer inbound := &session.Inbound{}
originInbound := session.InboundFromContext(ctx)
if originInbound != nil {
// get a shallow copy to avoid modifying the inbound tag in upstream context
*inbound = *originInbound
}
inbound.Tag = l.config.InboundTag
ctx = session.ContextWithInbound(ctx, inbound)
var conn net.Conn err := l.dispatcherInstance.DispatchLink(ctx, destination, link)
err := retry.ExponentialBackoff(2, 100).On(func() error {
dialDest := destination
content := new(session.Content)
content.SkipDNSResolve = true
ctx = session.ContextWithContent(ctx, content)
inbound := &session.Inbound{}
originInbound := session.InboundFromContext(ctx)
if originInbound != nil {
// get a shallow copy to avoid modifying the inbound tag in upstream context
*inbound = *originInbound
}
inbound.Tag = l.config.InboundTag
ctx = session.ContextWithInbound(ctx, inbound)
rawConn, err := l.dispatcherInstance.Dispatch(ctx, dialDest)
if err != nil {
return err
}
var readerOpt cnc.ConnectionOption
if dialDest.Network == net.Network_TCP {
readerOpt = cnc.ConnectionOutputMulti(rawConn.Reader)
} else {
readerOpt = cnc.ConnectionOutputMultiUDP(rawConn.Reader)
}
conn = cnc.NewConnection(cnc.ConnectionInputMulti(rawConn.Writer), readerOpt)
return nil
})
if err != nil { if err != nil {
return errors.New("failed to open connection to ", destination).Base(err) errors.New(ctx, "failed to process loopback connection").Base(err)
return err
} }
defer conn.Close()
requestDone := func() error {
var writer buf.Writer
if destination.Network == net.Network_TCP {
writer = buf.NewWriter(conn)
} else {
writer = &buf.SequentialWriter{Writer: conn}
}
if err := buf.Copy(input, writer); err != nil {
return errors.New("failed to process request").Base(err)
}
return nil
}
responseDone := func() error {
var reader buf.Reader
if destination.Network == net.Network_TCP {
reader = buf.NewReader(conn)
} else {
reader = buf.NewPacketReader(conn)
}
if err := buf.Copy(reader, output); err != nil {
return errors.New("failed to process response").Base(err)
}
return nil
}
if err := task.Run(ctx, requestDone, task.OnSuccess(responseDone, task.Close(output))); err != nil {
return errors.New("connection ends").Base(err)
}
return nil return nil
} }