Files
xray-core/transport/internet/splithttp/dialer.go

607 lines
18 KiB
Go

package splithttp
import (
"context"
gotls "crypto/tls"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptrace"
"net/url"
reflect "reflect"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/apernet/quic-go"
"github.com/apernet/quic-go/http3"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/net/cnc"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/browser_dialer"
"github.com/xtls/xray-core/transport/internet/hysteria/congestion"
"github.com/xtls/xray-core/transport/internet/hysteria/congestion/bbr"
"github.com/xtls/xray-core/transport/internet/hysteria/udphop"
"github.com/xtls/xray-core/transport/internet/reality"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
"github.com/xtls/xray-core/transport/pipe"
"golang.org/x/net/http2"
)
type dialerConf struct {
net.Destination
*internet.MemoryStreamConfig
}
var (
globalDialerMap map[dialerConf]*XmuxManager
globalDialerAccess sync.Mutex
)
func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (DialerClient, *XmuxClient) {
realityConfig := reality.ConfigFromStreamSettings(streamSettings)
if browser_dialer.HasBrowserDialer() && realityConfig == nil {
return &BrowserDialerClient{transportConfig: streamSettings.ProtocolSettings.(*Config)}, nil
}
globalDialerAccess.Lock()
defer globalDialerAccess.Unlock()
if globalDialerMap == nil {
globalDialerMap = make(map[dialerConf]*XmuxManager)
}
key := dialerConf{dest, streamSettings}
xmuxManager, found := globalDialerMap[key]
if !found {
transportConfig := streamSettings.ProtocolSettings.(*Config)
var xmuxConfig XmuxConfig
if transportConfig.Xmux != nil {
xmuxConfig = *transportConfig.Xmux
}
xmuxManager = NewXmuxManager(xmuxConfig, func() XmuxConn {
return createHTTPClient(dest, streamSettings)
})
globalDialerMap[key] = xmuxManager
}
xmuxClient := xmuxManager.GetXmuxClient(ctx)
return xmuxClient.XmuxConn.(DialerClient), xmuxClient
}
func decideHTTPVersion(tlsConfig *tls.Config, realityConfig *reality.Config) string {
if realityConfig != nil {
return "2"
}
if tlsConfig == nil {
return "1.1"
}
if len(tlsConfig.NextProtocol) != 1 {
return "2"
}
if tlsConfig.NextProtocol[0] == "http/1.1" {
return "1.1"
}
if tlsConfig.NextProtocol[0] == "h3" {
return "3"
}
return "2"
}
func createHTTPClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient {
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
realityConfig := reality.ConfigFromStreamSettings(streamSettings)
httpVersion := decideHTTPVersion(tlsConfig, realityConfig)
if httpVersion == "3" {
dest.Network = net.Network_UDP // better to keep this line
}
var gotlsConfig *gotls.Config
if tlsConfig != nil {
gotlsConfig = tlsConfig.GetTLSConfig(tls.WithDestination(dest))
}
transportConfig := streamSettings.ProtocolSettings.(*Config)
dialContext := func(ctxInner context.Context) (net.Conn, error) {
conn, err := internet.DialSystem(ctxInner, dest, streamSettings.SocketSettings)
if err != nil {
return nil, err
}
if streamSettings.TcpmaskManager != nil {
newConn, err := streamSettings.TcpmaskManager.WrapConnClient(conn)
if err != nil {
conn.Close()
return nil, errors.New("mask err").Base(err)
}
conn = newConn
}
if realityConfig != nil {
return reality.UClient(conn, realityConfig, ctxInner, dest)
}
if gotlsConfig != nil {
if fingerprint := tls.GetFingerprint(tlsConfig.Fingerprint); fingerprint != nil {
conn = tls.UClient(conn, gotlsConfig, fingerprint)
if err := conn.(*tls.UConn).HandshakeContext(ctxInner); err != nil {
return nil, err
}
} else {
conn = tls.Client(conn, gotlsConfig)
}
}
return conn, nil
}
var keepAlivePeriod time.Duration
if streamSettings.ProtocolSettings.(*Config).Xmux != nil {
keepAlivePeriod = time.Duration(streamSettings.ProtocolSettings.(*Config).Xmux.HKeepAlivePeriod) * time.Second
}
var transport http.RoundTripper
if httpVersion == "3" {
quicParams := streamSettings.QuicParams
if quicParams == nil {
quicParams = &internet.QuicParams{
BbrProfile: string(bbr.ProfileStandard),
UdpHop: &internet.UdpHop{},
}
}
quicConfig := &quic.Config{
InitialStreamReceiveWindow: quicParams.InitStreamReceiveWindow,
MaxStreamReceiveWindow: quicParams.MaxStreamReceiveWindow,
InitialConnectionReceiveWindow: quicParams.InitConnReceiveWindow,
MaxConnectionReceiveWindow: quicParams.MaxConnReceiveWindow,
MaxIdleTimeout: time.Duration(quicParams.MaxIdleTimeout) * time.Second,
KeepAlivePeriod: time.Duration(quicParams.KeepAlivePeriod) * time.Second,
MaxIncomingStreams: quicParams.MaxIncomingStreams,
DisablePathMTUDiscovery: quicParams.DisablePathMtuDiscovery || (runtime.GOOS != "linux" && runtime.GOOS != "windows" && runtime.GOOS != "darwin"),
}
if quicParams.MaxIdleTimeout == 0 {
quicConfig.MaxIdleTimeout = net.ConnIdleTimeout
}
if quicParams.KeepAlivePeriod == 0 {
if keepAlivePeriod == 0 {
quicConfig.KeepAlivePeriod = net.QuicgoH3KeepAlivePeriod
}
}
if quicParams.MaxIncomingStreams == 0 {
// these two are defaults of quic-go/http3. the default of quic-go (no
// http3) is different, so it is hardcoded here for clarity.
// https://github.com/quic-go/quic-go/blob/b8ea5c798155950fb5bbfdd06cad1939c9355878/http3/client.go#L36-L39
quicConfig.MaxIncomingStreams = -1
}
transport = &http3.Transport{
QUICConfig: quicConfig,
TLSClientConfig: gotlsConfig,
Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (*quic.Conn, error) {
udpHopDialer := func(addr *net.UDPAddr) (net.PacketConn, error) {
conn, err := internet.DialSystem(ctx, net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)), streamSettings.SocketSettings)
if err != nil {
errors.LogInfoInner(context.Background(), err, "skip hop: failed to dial to dest")
return nil, errors.New("")
}
var pktConn net.PacketConn
switch c := conn.(type) {
case *internet.PacketConnWrapper:
pktConn = c.PacketConn
default:
panic(reflect.TypeOf(c))
}
return pktConn, nil
}
var pktConn net.PacketConn
var udpAddr *net.UDPAddr
if len(quicParams.UdpHop.Ports) > 0 {
index := rand.Intn(len(quicParams.UdpHop.Ports))
dest.Port = net.Port(quicParams.UdpHop.Ports[index])
conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
if err != nil {
return nil, errors.New("failed to dial to dest").Base(err)
}
switch c := conn.(type) {
case *internet.PacketConnWrapper:
pktConn = c.PacketConn
udpAddr = conn.RemoteAddr().(*net.UDPAddr)
default:
panic(reflect.TypeOf(c))
}
pktConn = udphop.NewUDPHopPacketConn(udphop.ToAddrs(udpAddr.IP, quicParams.UdpHop.Ports), time.Duration(quicParams.UdpHop.IntervalMin)*time.Second, time.Duration(quicParams.UdpHop.IntervalMax)*time.Second, udpHopDialer, pktConn, index)
} else {
conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
if err != nil {
return nil, errors.New("failed to dial to dest").Base(err)
}
switch c := conn.(type) {
case *internet.PacketConnWrapper:
pktConn = c.PacketConn
udpAddr = c.RemoteAddr().(*net.UDPAddr)
case *cnc.Connection:
pktConn = &internet.FakePacketConn{Conn: c}
udpAddr = &net.UDPAddr{IP: c.RemoteAddr().(*net.TCPAddr).IP, Port: c.RemoteAddr().(*net.TCPAddr).Port}
default:
panic(reflect.TypeOf(c))
}
}
if streamSettings.UdpmaskManager != nil {
newConn, err := streamSettings.UdpmaskManager.WrapPacketConnClient(pktConn)
if err != nil {
pktConn.Close()
return nil, errors.New("mask err").Base(err)
}
pktConn = newConn
}
conn, err := quic.DialEarly(ctx, pktConn, udpAddr, tlsCfg, cfg)
if err != nil {
return nil, err
}
switch quicParams.Congestion {
case "reno":
case "", "bbr":
congestion.UseBBR(conn, bbr.Profile(quicParams.BbrProfile))
case "force-brutal":
congestion.UseBrutal(conn, quicParams.BrutalUp)
default:
panic(quicParams.Congestion)
}
return conn, nil
},
}
} else if httpVersion == "2" {
if keepAlivePeriod == 0 {
keepAlivePeriod = net.ChromeH2KeepAlivePeriod
}
if keepAlivePeriod < 0 {
keepAlivePeriod = 0
}
transport = &http2.Transport{
DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) {
return dialContext(ctxInner)
},
IdleConnTimeout: net.ConnIdleTimeout,
ReadIdleTimeout: keepAlivePeriod,
}
} else {
httpDialContext := func(ctxInner context.Context, network string, addr string) (net.Conn, error) {
return dialContext(ctxInner)
}
transport = &http.Transport{
DialTLSContext: httpDialContext,
DialContext: httpDialContext,
IdleConnTimeout: net.ConnIdleTimeout,
// chunked transfer download with KeepAlives is buggy with
// http.Client and our custom dial context.
DisableKeepAlives: true,
}
}
client := &DefaultDialerClient{
transportConfig: transportConfig,
client: &http.Client{
Transport: transport,
},
httpVersion: httpVersion,
uploadRawPool: &sync.Pool{},
dialUploadConn: dialContext,
}
return client
}
func init() {
common.Must(internet.RegisterTransportDialer(protocolName, Dial))
}
func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
realityConfig := reality.ConfigFromStreamSettings(streamSettings)
httpVersion := decideHTTPVersion(tlsConfig, realityConfig)
if httpVersion == "3" {
dest.Network = net.Network_UDP
}
transportConfiguration := streamSettings.ProtocolSettings.(*Config)
var requestURL url.URL
if tlsConfig != nil || realityConfig != nil {
requestURL.Scheme = "https"
} else {
requestURL.Scheme = "http"
}
requestURL.Host = transportConfiguration.Host
if requestURL.Host == "" && tlsConfig != nil {
requestURL.Host = tlsConfig.ServerName
}
if requestURL.Host == "" && realityConfig != nil {
requestURL.Host = realityConfig.ServerName
}
if requestURL.Host == "" {
requestURL.Host = dest.Address.String()
}
if browser_dialer.HasBrowserDialer() && realityConfig == nil {
// For Browser Dialer's optimized IP and non-standard port
if !(requestURL.Scheme == "http" && dest.Port == 80) && !(requestURL.Scheme == "https" && dest.Port == 443) {
requestURL.Host += ":" + dest.Port.String()
}
}
requestURL.Path = transportConfiguration.GetNormalizedPath()
requestURL.RawQuery = transportConfiguration.GetNormalizedQuery()
httpClient, xmuxClient := getHTTPClient(ctx, dest, streamSettings)
mode := transportConfiguration.Mode
if mode == "" || mode == "auto" {
mode = "packet-up"
if realityConfig != nil {
mode = "stream-one"
if transportConfiguration.DownloadSettings != nil {
mode = "stream-up"
}
}
}
sessionId := ""
if mode != "stream-one" {
sessionIdUuid := uuid.New()
sessionId = sessionIdUuid.String()
}
errors.LogInfo(ctx, fmt.Sprintf("XHTTP is dialing to %s, mode %s, HTTP version %s, host %s", dest, mode, httpVersion, requestURL.Host))
requestURL2 := requestURL
httpClient2 := httpClient
xmuxClient2 := xmuxClient
if transportConfiguration.DownloadSettings != nil {
globalDialerAccess.Lock()
if streamSettings.DownloadSettings == nil {
streamSettings.DownloadSettings = common.Must2(internet.ToMemoryStreamConfig(transportConfiguration.DownloadSettings))
if streamSettings.SocketSettings != nil && streamSettings.SocketSettings.Penetrate {
streamSettings.DownloadSettings.SocketSettings = streamSettings.SocketSettings
}
}
globalDialerAccess.Unlock()
memory2 := streamSettings.DownloadSettings
dest2 := *memory2.Destination // just panic
tlsConfig2 := tls.ConfigFromStreamSettings(memory2)
realityConfig2 := reality.ConfigFromStreamSettings(memory2)
httpVersion2 := decideHTTPVersion(tlsConfig2, realityConfig2)
if httpVersion2 == "3" {
dest2.Network = net.Network_UDP
}
if tlsConfig2 != nil || realityConfig2 != nil {
requestURL2.Scheme = "https"
} else {
requestURL2.Scheme = "http"
}
config2 := memory2.ProtocolSettings.(*Config)
requestURL2.Host = config2.Host
if requestURL2.Host == "" && tlsConfig2 != nil {
requestURL2.Host = tlsConfig2.ServerName
}
if requestURL2.Host == "" && realityConfig2 != nil {
requestURL2.Host = realityConfig2.ServerName
}
if requestURL2.Host == "" {
requestURL2.Host = dest2.Address.String()
}
if browser_dialer.HasBrowserDialer() && realityConfig2 == nil {
// For Browser Dialer's optimized IP and non-standard port
if !(requestURL2.Scheme == "http" && dest2.Port == 80) && !(requestURL2.Scheme == "https" && dest2.Port == 443) {
requestURL2.Host += ":" + dest2.Port.String()
}
}
requestURL2.Path = config2.GetNormalizedPath()
requestURL2.RawQuery = config2.GetNormalizedQuery()
httpClient2, xmuxClient2 = getHTTPClient(ctx, dest2, memory2)
errors.LogInfo(ctx, fmt.Sprintf("XHTTP is downloading from %s, mode %s, HTTP version %s, host %s", dest2, "stream-down", httpVersion2, requestURL2.Host))
}
if xmuxClient != nil {
xmuxClient.OpenUsage.Add(1)
}
if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
xmuxClient2.OpenUsage.Add(1)
}
var closed atomic.Int32
reader, writer := io.Pipe()
conn := splitConn{
writer: writer,
onClose: func() {
if closed.Add(1) > 1 {
return
}
if xmuxClient != nil {
xmuxClient.OpenUsage.Add(-1)
}
if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
xmuxClient2.OpenUsage.Add(-1)
}
},
}
var err error
if mode == "stream-one" {
requestURL.Path = transportConfiguration.GetNormalizedPath()
if xmuxClient != nil {
xmuxClient.LeftRequests.Add(-1)
}
conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient.OpenStream(ctx, requestURL.String(), sessionId, reader, false)
if err != nil { // browser dialer only
return nil, err
}
return stat.Connection(&conn), nil
} else { // stream-down
if xmuxClient2 != nil {
xmuxClient2.LeftRequests.Add(-1)
}
conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(ctx, requestURL2.String(), sessionId, nil, false)
if err != nil { // browser dialer only
return nil, err
}
}
if mode == "stream-up" {
if xmuxClient != nil {
xmuxClient.LeftRequests.Add(-1)
}
_, _, _, err = httpClient.OpenStream(ctx, requestURL.String(), sessionId, reader, true)
if err != nil { // browser dialer only
return nil, err
}
return stat.Connection(&conn), nil
}
scMaxEachPostBytes := transportConfiguration.GetNormalizedScMaxEachPostBytes()
scMinPostsIntervalMs := transportConfiguration.GetNormalizedScMinPostsIntervalMs()
if scMaxEachPostBytes.From <= 0 {
panic("`scMaxEachPostBytes` should be bigger than 0")
}
maxUploadSize := scMaxEachPostBytes.rand()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
// uploadWriter wrapper, exact size limits can be enforced
// uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(max(0, maxUploadSize-buf.Size)))
conn.writer = uploadWriter{
uploadPipeWriter,
maxUploadSize,
}
go func() {
var seq int64
var lastWrite time.Time
for {
// by offloading the uploads into a buffered pipe, multiple conn.Write
// calls get automatically batched together into larger POST requests.
// without batching, bandwidth is extremely limited.
remainder, err := uploadPipeReader.ReadMultiBuffer()
if err != nil {
break
}
doSplit := atomic.Bool{}
for doSplit.Store(true); doSplit.Load(); {
var chunk buf.MultiBuffer
remainder, chunk = buf.SplitSize(remainder, maxUploadSize)
if chunk.IsEmpty() {
break
}
wroteRequest := done.New()
ctx := httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
WroteRequest: func(httptrace.WroteRequestInfo) {
wroteRequest.Close()
},
})
seqStr := strconv.FormatInt(seq, 10)
seq += 1
if scMinPostsIntervalMs.From > 0 {
time.Sleep(time.Duration(scMinPostsIntervalMs.rand())*time.Millisecond - time.Since(lastWrite))
}
lastWrite = time.Now()
if xmuxClient != nil && (xmuxClient.LeftRequests.Add(-1) <= 0 ||
(xmuxClient.UnreusableAt != time.Time{} && lastWrite.After(xmuxClient.UnreusableAt))) {
httpClient, xmuxClient = getHTTPClient(ctx, dest, streamSettings)
}
go func() {
err := httpClient.PostPacket(
ctx,
requestURL.String(),
sessionId,
seqStr,
chunk,
)
wroteRequest.Close()
if err != nil {
errors.LogInfoInner(ctx, err, "failed to send upload")
uploadPipeReader.Interrupt()
doSplit.Store(false)
}
}()
if _, ok := httpClient.(*DefaultDialerClient); ok {
<-wroteRequest.Wait()
}
}
}
}()
return stat.Connection(&conn), nil
}
// A wrapper around pipe that ensures the size limit is exactly honored.
//
// The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that
// single MultiBuffer exceeds the size limit, and then starts blocking on the
// next WriteMultiBuffer call. This means that ReadMultiBuffer can return more
// bytes than the size limit. We work around this by splitting a potentially
// too large write up into multiple.
type uploadWriter struct {
*pipe.Writer
maxLen int32
}
func (w uploadWriter) Write(b []byte) (int, error) {
/*
capacity := int(w.maxLen - w.Len())
if capacity > 0 && capacity < len(b) {
b = b[:capacity]
}
*/
buffer := buf.MultiBufferContainer{}
common.Must2(buffer.Write(b))
var writed int
for _, buff := range buffer.MultiBuffer {
err := w.WriteMultiBuffer(buf.MultiBuffer{buff})
if err != nil {
return writed, err
}
writed += int(buff.Len())
}
return writed, nil
}