mirror of
https://github.com/XTLS/Xray-core.git
synced 2026-05-08 14:13:22 +00:00
Agent-Logs-Url: https://github.com/XTLS/Xray-core/sessions/84d72770-6ad0-447a-8d86-94d692972a05 Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
648 lines
19 KiB
Go
648 lines
19 KiB
Go
package splithttp
|
|
|
|
import (
|
|
"context"
|
|
gotls "crypto/tls"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/http/httptrace"
|
|
"net/url"
|
|
reflect "reflect"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/apernet/quic-go"
|
|
"github.com/apernet/quic-go/http3"
|
|
"github.com/xtls/xray-core/common"
|
|
"github.com/xtls/xray-core/common/buf"
|
|
"github.com/xtls/xray-core/common/errors"
|
|
"github.com/xtls/xray-core/common/net"
|
|
"github.com/xtls/xray-core/common/signal/done"
|
|
"github.com/xtls/xray-core/common/uuid"
|
|
"github.com/xtls/xray-core/transport/internet"
|
|
"github.com/xtls/xray-core/transport/internet/browser_dialer"
|
|
"github.com/xtls/xray-core/transport/internet/hysteria/congestion"
|
|
"github.com/xtls/xray-core/transport/internet/hysteria/congestion/bbr"
|
|
"github.com/xtls/xray-core/transport/internet/hysteria/udphop"
|
|
"github.com/xtls/xray-core/transport/internet/reality"
|
|
"github.com/xtls/xray-core/transport/internet/stat"
|
|
"github.com/xtls/xray-core/transport/internet/tls"
|
|
"github.com/xtls/xray-core/transport/pipe"
|
|
"golang.org/x/net/http2"
|
|
)
|
|
|
|
type dialerConf struct {
|
|
net.Destination
|
|
*internet.MemoryStreamConfig
|
|
}
|
|
|
|
type errorDialerClient struct {
|
|
err error
|
|
}
|
|
|
|
func (c *errorDialerClient) IsClosed() bool { return true }
|
|
|
|
func (c *errorDialerClient) OpenStream(context.Context, string, string, io.Reader, bool) (io.ReadCloser, net.Addr, net.Addr, error) {
|
|
return nil, nil, nil, c.err
|
|
}
|
|
|
|
func (c *errorDialerClient) PostPacket(context.Context, string, string, string, buf.MultiBuffer) error {
|
|
return c.err
|
|
}
|
|
|
|
var (
|
|
globalDialerMap map[dialerConf]*XmuxManager
|
|
globalDialerAccess sync.Mutex
|
|
)
|
|
|
|
func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (DialerClient, *XmuxClient) {
|
|
realityConfig := reality.ConfigFromStreamSettings(streamSettings)
|
|
browserDialer := ""
|
|
if streamSettings.SocketSettings != nil {
|
|
if browser_dialer.IsBrowserDialerProxy(streamSettings.SocketSettings.DialerProxy) {
|
|
browserDialer = streamSettings.SocketSettings.DialerProxy
|
|
}
|
|
}
|
|
|
|
if browserDialer != "" && realityConfig == nil {
|
|
transportConfig := streamSettings.ProtocolSettings.(*Config)
|
|
if transportConfig.Mode != "auto" && transportConfig.Mode != "packet-up" {
|
|
return &errorDialerClient{
|
|
err: errors.New("browserDialer with splithttp only supports modes \"auto\" or \"packet-up\", got: \"", transportConfig.Mode, "\""),
|
|
}, nil
|
|
}
|
|
return &BrowserDialerClient{
|
|
transportConfig: transportConfig,
|
|
browserDialer: browserDialer,
|
|
}, nil
|
|
}
|
|
|
|
globalDialerAccess.Lock()
|
|
defer globalDialerAccess.Unlock()
|
|
|
|
if globalDialerMap == nil {
|
|
globalDialerMap = make(map[dialerConf]*XmuxManager)
|
|
}
|
|
|
|
key := dialerConf{dest, streamSettings}
|
|
|
|
xmuxManager, found := globalDialerMap[key]
|
|
|
|
if !found {
|
|
transportConfig := streamSettings.ProtocolSettings.(*Config)
|
|
var xmuxConfig XmuxConfig
|
|
if transportConfig.Xmux != nil {
|
|
xmuxConfig = *transportConfig.Xmux
|
|
}
|
|
|
|
xmuxManager = NewXmuxManager(xmuxConfig, func() XmuxConn {
|
|
return createHTTPClient(dest, streamSettings)
|
|
})
|
|
globalDialerMap[key] = xmuxManager
|
|
}
|
|
|
|
xmuxClient := xmuxManager.GetXmuxClient(ctx)
|
|
return xmuxClient.XmuxConn.(DialerClient), xmuxClient
|
|
}
|
|
|
|
func decideHTTPVersion(tlsConfig *tls.Config, realityConfig *reality.Config) string {
|
|
if realityConfig != nil {
|
|
return "2"
|
|
}
|
|
if tlsConfig == nil {
|
|
return "1.1"
|
|
}
|
|
if len(tlsConfig.NextProtocol) != 1 {
|
|
return "2"
|
|
}
|
|
if tlsConfig.NextProtocol[0] == "http/1.1" {
|
|
return "1.1"
|
|
}
|
|
if tlsConfig.NextProtocol[0] == "h3" {
|
|
return "3"
|
|
}
|
|
return "2"
|
|
}
|
|
|
|
func createHTTPClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient {
|
|
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
|
|
realityConfig := reality.ConfigFromStreamSettings(streamSettings)
|
|
|
|
httpVersion := decideHTTPVersion(tlsConfig, realityConfig)
|
|
if httpVersion == "3" {
|
|
dest.Network = net.Network_UDP // better to keep this line
|
|
}
|
|
|
|
var gotlsConfig *gotls.Config
|
|
|
|
if tlsConfig != nil {
|
|
gotlsConfig = tlsConfig.GetTLSConfig(tls.WithDestination(dest))
|
|
}
|
|
|
|
transportConfig := streamSettings.ProtocolSettings.(*Config)
|
|
|
|
dialContext := func(ctxInner context.Context) (net.Conn, error) {
|
|
conn, err := internet.DialSystem(ctxInner, dest, streamSettings.SocketSettings)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if streamSettings.TcpmaskManager != nil {
|
|
newConn, err := streamSettings.TcpmaskManager.WrapConnClient(conn)
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, errors.New("mask err").Base(err)
|
|
}
|
|
conn = newConn
|
|
}
|
|
|
|
if realityConfig != nil {
|
|
return reality.UClient(conn, realityConfig, ctxInner, dest)
|
|
}
|
|
|
|
if gotlsConfig != nil {
|
|
if fingerprint := tls.GetFingerprint(tlsConfig.Fingerprint); fingerprint != nil {
|
|
conn = tls.UClient(conn, gotlsConfig, fingerprint)
|
|
if err := conn.(*tls.UConn).HandshakeContext(ctxInner); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
conn = tls.Client(conn, gotlsConfig)
|
|
}
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
var keepAlivePeriod time.Duration
|
|
if streamSettings.ProtocolSettings.(*Config).Xmux != nil {
|
|
keepAlivePeriod = time.Duration(streamSettings.ProtocolSettings.(*Config).Xmux.HKeepAlivePeriod) * time.Second
|
|
}
|
|
|
|
var transport http.RoundTripper
|
|
|
|
if httpVersion == "3" {
|
|
quicParams := streamSettings.QuicParams
|
|
if quicParams == nil {
|
|
quicParams = &internet.QuicParams{
|
|
BbrProfile: string(bbr.ProfileStandard),
|
|
UdpHop: &internet.UdpHop{},
|
|
}
|
|
}
|
|
|
|
quicConfig := &quic.Config{
|
|
InitialStreamReceiveWindow: quicParams.InitStreamReceiveWindow,
|
|
MaxStreamReceiveWindow: quicParams.MaxStreamReceiveWindow,
|
|
InitialConnectionReceiveWindow: quicParams.InitConnReceiveWindow,
|
|
MaxConnectionReceiveWindow: quicParams.MaxConnReceiveWindow,
|
|
MaxIdleTimeout: time.Duration(quicParams.MaxIdleTimeout) * time.Second,
|
|
KeepAlivePeriod: time.Duration(quicParams.KeepAlivePeriod) * time.Second,
|
|
MaxIncomingStreams: quicParams.MaxIncomingStreams,
|
|
DisablePathMTUDiscovery: quicParams.DisablePathMtuDiscovery,
|
|
}
|
|
if quicParams.MaxIdleTimeout == 0 {
|
|
quicConfig.MaxIdleTimeout = net.ConnIdleTimeout
|
|
}
|
|
if quicParams.KeepAlivePeriod == 0 {
|
|
if keepAlivePeriod == 0 {
|
|
quicConfig.KeepAlivePeriod = net.QuicgoH3KeepAlivePeriod
|
|
}
|
|
}
|
|
if quicParams.MaxIncomingStreams == 0 {
|
|
// these two are defaults of quic-go/http3. the default of quic-go (no
|
|
// http3) is different, so it is hardcoded here for clarity.
|
|
// https://github.com/quic-go/quic-go/blob/b8ea5c798155950fb5bbfdd06cad1939c9355878/http3/client.go#L36-L39
|
|
quicConfig.MaxIncomingStreams = -1
|
|
}
|
|
|
|
transport = &http3.Transport{
|
|
QUICConfig: quicConfig,
|
|
TLSClientConfig: gotlsConfig,
|
|
Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (*quic.Conn, error) {
|
|
udphopDialer := func(addr *net.UDPAddr) (net.PacketConn, error) {
|
|
conn, err := internet.DialSystem(ctx, net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)), streamSettings.SocketSettings)
|
|
if err != nil {
|
|
errors.LogDebug(context.Background(), "skip hop: failed to dial to dest")
|
|
conn.Close()
|
|
return nil, errors.New()
|
|
}
|
|
|
|
var udpConn net.PacketConn
|
|
|
|
switch c := conn.(type) {
|
|
case *internet.PacketConnWrapper:
|
|
udpConn = c.PacketConn
|
|
case *net.UDPConn:
|
|
udpConn = c
|
|
default:
|
|
errors.LogDebug(context.Background(), "skip hop: udphop requires being at the outermost level ", reflect.TypeOf(c))
|
|
conn.Close()
|
|
return nil, errors.New()
|
|
}
|
|
|
|
return udpConn, nil
|
|
}
|
|
|
|
var index int
|
|
if len(quicParams.UdpHop.Ports) > 0 {
|
|
index = rand.Intn(len(quicParams.UdpHop.Ports))
|
|
dest.Port = net.Port(quicParams.UdpHop.Ports[index])
|
|
}
|
|
|
|
conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var udpConn net.PacketConn
|
|
var udpAddr *net.UDPAddr
|
|
|
|
switch c := conn.(type) {
|
|
case *internet.PacketConnWrapper:
|
|
udpConn = c.PacketConn
|
|
udpAddr, err = net.ResolveUDPAddr("udp", c.Dest.String())
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
case *net.UDPConn:
|
|
udpConn = c
|
|
udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
default:
|
|
udpConn = &internet.FakePacketConn{Conn: c}
|
|
udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
|
|
if len(quicParams.UdpHop.Ports) > 0 {
|
|
conn.Close()
|
|
return nil, errors.New("udphop requires being at the outermost level ", reflect.TypeOf(c))
|
|
}
|
|
}
|
|
|
|
if len(quicParams.UdpHop.Ports) > 0 {
|
|
addr := &udphop.UDPHopAddr{
|
|
IP: udpAddr.IP,
|
|
Ports: quicParams.UdpHop.Ports,
|
|
}
|
|
udpConn, err = udphop.NewUDPHopPacketConn(addr, index, quicParams.UdpHop.IntervalMin, quicParams.UdpHop.IntervalMax, udphopDialer, udpConn)
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, errors.New("udphop err").Base(err)
|
|
}
|
|
}
|
|
|
|
if streamSettings.UdpmaskManager != nil {
|
|
udpConn, err = streamSettings.UdpmaskManager.WrapPacketConnClient(udpConn)
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, errors.New("mask err").Base(err)
|
|
}
|
|
}
|
|
|
|
quicConn, err := quic.DialEarly(ctx, udpConn, udpAddr, tlsCfg, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch quicParams.Congestion {
|
|
case "force-brutal":
|
|
errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion brutal bytes per second ", quicParams.BrutalUp)
|
|
congestion.UseBrutal(quicConn, quicParams.BrutalUp)
|
|
case "reno":
|
|
errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion reno")
|
|
default:
|
|
errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion bbr ", quicParams.BbrProfile)
|
|
congestion.UseBBR(quicConn, bbr.Profile(quicParams.BbrProfile))
|
|
}
|
|
|
|
return quicConn, nil
|
|
},
|
|
}
|
|
} else if httpVersion == "2" {
|
|
if keepAlivePeriod == 0 {
|
|
keepAlivePeriod = net.ChromeH2KeepAlivePeriod
|
|
}
|
|
if keepAlivePeriod < 0 {
|
|
keepAlivePeriod = 0
|
|
}
|
|
transport = &http2.Transport{
|
|
DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) {
|
|
return dialContext(ctxInner)
|
|
},
|
|
IdleConnTimeout: net.ConnIdleTimeout,
|
|
ReadIdleTimeout: keepAlivePeriod,
|
|
}
|
|
} else {
|
|
httpDialContext := func(ctxInner context.Context, network string, addr string) (net.Conn, error) {
|
|
return dialContext(ctxInner)
|
|
}
|
|
|
|
transport = &http.Transport{
|
|
DialTLSContext: httpDialContext,
|
|
DialContext: httpDialContext,
|
|
IdleConnTimeout: net.ConnIdleTimeout,
|
|
// chunked transfer download with KeepAlives is buggy with
|
|
// http.Client and our custom dial context.
|
|
DisableKeepAlives: true,
|
|
}
|
|
}
|
|
|
|
client := &DefaultDialerClient{
|
|
transportConfig: transportConfig,
|
|
client: &http.Client{
|
|
Transport: transport,
|
|
},
|
|
httpVersion: httpVersion,
|
|
uploadRawPool: &sync.Pool{},
|
|
dialUploadConn: dialContext,
|
|
}
|
|
|
|
return client
|
|
}
|
|
|
|
func init() {
|
|
common.Must(internet.RegisterTransportDialer(protocolName, Dial))
|
|
}
|
|
|
|
func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
|
|
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
|
|
realityConfig := reality.ConfigFromStreamSettings(streamSettings)
|
|
|
|
httpVersion := decideHTTPVersion(tlsConfig, realityConfig)
|
|
if httpVersion == "3" {
|
|
dest.Network = net.Network_UDP
|
|
}
|
|
|
|
transportConfiguration := streamSettings.ProtocolSettings.(*Config)
|
|
var requestURL url.URL
|
|
|
|
if tlsConfig != nil || realityConfig != nil {
|
|
requestURL.Scheme = "https"
|
|
} else {
|
|
requestURL.Scheme = "http"
|
|
}
|
|
requestURL.Host = transportConfiguration.Host
|
|
if requestURL.Host == "" && tlsConfig != nil {
|
|
requestURL.Host = tlsConfig.ServerName
|
|
}
|
|
if requestURL.Host == "" && realityConfig != nil {
|
|
requestURL.Host = realityConfig.ServerName
|
|
}
|
|
if requestURL.Host == "" {
|
|
requestURL.Host = dest.Address.String()
|
|
}
|
|
|
|
requestURL.Path = transportConfiguration.GetNormalizedPath()
|
|
requestURL.RawQuery = transportConfiguration.GetNormalizedQuery()
|
|
|
|
httpClient, xmuxClient := getHTTPClient(ctx, dest, streamSettings)
|
|
|
|
mode := transportConfiguration.Mode
|
|
if mode == "" || mode == "auto" {
|
|
mode = "packet-up"
|
|
if realityConfig != nil {
|
|
mode = "stream-one"
|
|
if transportConfiguration.DownloadSettings != nil {
|
|
mode = "stream-up"
|
|
}
|
|
}
|
|
}
|
|
|
|
sessionId := ""
|
|
if mode != "stream-one" {
|
|
sessionIdUuid := uuid.New()
|
|
sessionId = sessionIdUuid.String()
|
|
}
|
|
|
|
errors.LogInfo(ctx, fmt.Sprintf("XHTTP is dialing to %s, mode %s, HTTP version %s, host %s", dest, mode, httpVersion, requestURL.Host))
|
|
|
|
requestURL2 := requestURL
|
|
httpClient2 := httpClient
|
|
xmuxClient2 := xmuxClient
|
|
if transportConfiguration.DownloadSettings != nil {
|
|
globalDialerAccess.Lock()
|
|
if streamSettings.DownloadSettings == nil {
|
|
streamSettings.DownloadSettings = common.Must2(internet.ToMemoryStreamConfig(transportConfiguration.DownloadSettings))
|
|
if streamSettings.SocketSettings != nil && streamSettings.SocketSettings.Penetrate {
|
|
streamSettings.DownloadSettings.SocketSettings = streamSettings.SocketSettings
|
|
}
|
|
}
|
|
globalDialerAccess.Unlock()
|
|
memory2 := streamSettings.DownloadSettings
|
|
dest2 := *memory2.Destination // just panic
|
|
tlsConfig2 := tls.ConfigFromStreamSettings(memory2)
|
|
realityConfig2 := reality.ConfigFromStreamSettings(memory2)
|
|
httpVersion2 := decideHTTPVersion(tlsConfig2, realityConfig2)
|
|
if httpVersion2 == "3" {
|
|
dest2.Network = net.Network_UDP
|
|
}
|
|
if tlsConfig2 != nil || realityConfig2 != nil {
|
|
requestURL2.Scheme = "https"
|
|
} else {
|
|
requestURL2.Scheme = "http"
|
|
}
|
|
config2 := memory2.ProtocolSettings.(*Config)
|
|
requestURL2.Host = config2.Host
|
|
if requestURL2.Host == "" && tlsConfig2 != nil {
|
|
requestURL2.Host = tlsConfig2.ServerName
|
|
}
|
|
if requestURL2.Host == "" && realityConfig2 != nil {
|
|
requestURL2.Host = realityConfig2.ServerName
|
|
}
|
|
if requestURL2.Host == "" {
|
|
requestURL2.Host = dest2.Address.String()
|
|
}
|
|
requestURL2.Path = config2.GetNormalizedPath()
|
|
requestURL2.RawQuery = config2.GetNormalizedQuery()
|
|
httpClient2, xmuxClient2 = getHTTPClient(ctx, dest2, memory2)
|
|
errors.LogInfo(ctx, fmt.Sprintf("XHTTP is downloading from %s, mode %s, HTTP version %s, host %s", dest2, "stream-down", httpVersion2, requestURL2.Host))
|
|
}
|
|
|
|
if xmuxClient != nil {
|
|
xmuxClient.OpenUsage.Add(1)
|
|
}
|
|
if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
|
|
xmuxClient2.OpenUsage.Add(1)
|
|
}
|
|
var closed atomic.Int32
|
|
|
|
reader, writer := io.Pipe()
|
|
conn := splitConn{
|
|
writer: writer,
|
|
onClose: func() {
|
|
if closed.Add(1) > 1 {
|
|
return
|
|
}
|
|
if xmuxClient != nil {
|
|
xmuxClient.OpenUsage.Add(-1)
|
|
}
|
|
if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
|
|
xmuxClient2.OpenUsage.Add(-1)
|
|
}
|
|
},
|
|
}
|
|
|
|
var err error
|
|
if mode == "stream-one" {
|
|
requestURL.Path = transportConfiguration.GetNormalizedPath()
|
|
if xmuxClient != nil {
|
|
xmuxClient.LeftRequests.Add(-1)
|
|
}
|
|
conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient.OpenStream(ctx, requestURL.String(), sessionId, reader, false)
|
|
if err != nil { // browser dialer only
|
|
return nil, err
|
|
}
|
|
return stat.Connection(&conn), nil
|
|
} else { // stream-down
|
|
if xmuxClient2 != nil {
|
|
xmuxClient2.LeftRequests.Add(-1)
|
|
}
|
|
conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(ctx, requestURL2.String(), sessionId, nil, false)
|
|
if err != nil { // browser dialer only
|
|
return nil, err
|
|
}
|
|
}
|
|
if mode == "stream-up" {
|
|
if xmuxClient != nil {
|
|
xmuxClient.LeftRequests.Add(-1)
|
|
}
|
|
_, _, _, err = httpClient.OpenStream(ctx, requestURL.String(), sessionId, reader, true)
|
|
if err != nil { // browser dialer only
|
|
return nil, err
|
|
}
|
|
return stat.Connection(&conn), nil
|
|
}
|
|
|
|
scMaxEachPostBytes := transportConfiguration.GetNormalizedScMaxEachPostBytes()
|
|
scMinPostsIntervalMs := transportConfiguration.GetNormalizedScMinPostsIntervalMs()
|
|
|
|
if scMaxEachPostBytes.From <= 0 {
|
|
panic("`scMaxEachPostBytes` should be bigger than 0")
|
|
}
|
|
|
|
maxUploadSize := scMaxEachPostBytes.rand()
|
|
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
|
|
// code relies on this behavior. Subtract 1 so that together with
|
|
// uploadWriter wrapper, exact size limits can be enforced
|
|
// uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))
|
|
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(max(0, maxUploadSize-buf.Size)))
|
|
|
|
conn.writer = uploadWriter{
|
|
uploadPipeWriter,
|
|
maxUploadSize,
|
|
}
|
|
|
|
go func() {
|
|
var seq int64
|
|
var lastWrite time.Time
|
|
|
|
for {
|
|
// by offloading the uploads into a buffered pipe, multiple conn.Write
|
|
// calls get automatically batched together into larger POST requests.
|
|
// without batching, bandwidth is extremely limited.
|
|
remainder, err := uploadPipeReader.ReadMultiBuffer()
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
doSplit := atomic.Bool{}
|
|
for doSplit.Store(true); doSplit.Load(); {
|
|
var chunk buf.MultiBuffer
|
|
remainder, chunk = buf.SplitSize(remainder, maxUploadSize)
|
|
if chunk.IsEmpty() {
|
|
break
|
|
}
|
|
|
|
wroteRequest := done.New()
|
|
|
|
ctx := httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
|
|
WroteRequest: func(httptrace.WroteRequestInfo) {
|
|
wroteRequest.Close()
|
|
},
|
|
})
|
|
|
|
seqStr := strconv.FormatInt(seq, 10)
|
|
seq += 1
|
|
|
|
if scMinPostsIntervalMs.From > 0 {
|
|
time.Sleep(time.Duration(scMinPostsIntervalMs.rand())*time.Millisecond - time.Since(lastWrite))
|
|
}
|
|
|
|
lastWrite = time.Now()
|
|
|
|
if xmuxClient != nil && (xmuxClient.LeftRequests.Add(-1) <= 0 ||
|
|
(xmuxClient.UnreusableAt != time.Time{} && lastWrite.After(xmuxClient.UnreusableAt))) {
|
|
httpClient, xmuxClient = getHTTPClient(ctx, dest, streamSettings)
|
|
}
|
|
|
|
go func() {
|
|
err := httpClient.PostPacket(
|
|
ctx,
|
|
requestURL.String(),
|
|
sessionId,
|
|
seqStr,
|
|
chunk,
|
|
)
|
|
wroteRequest.Close()
|
|
if err != nil {
|
|
errors.LogInfoInner(ctx, err, "failed to send upload")
|
|
uploadPipeReader.Interrupt()
|
|
doSplit.Store(false)
|
|
}
|
|
}()
|
|
|
|
if _, ok := httpClient.(*DefaultDialerClient); ok {
|
|
<-wroteRequest.Wait()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return stat.Connection(&conn), nil
|
|
}
|
|
|
|
// A wrapper around pipe that ensures the size limit is exactly honored.
|
|
//
|
|
// The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that
|
|
// single MultiBuffer exceeds the size limit, and then starts blocking on the
|
|
// next WriteMultiBuffer call. This means that ReadMultiBuffer can return more
|
|
// bytes than the size limit. We work around this by splitting a potentially
|
|
// too large write up into multiple.
|
|
type uploadWriter struct {
|
|
*pipe.Writer
|
|
maxLen int32
|
|
}
|
|
|
|
func (w uploadWriter) Write(b []byte) (int, error) {
|
|
/*
|
|
capacity := int(w.maxLen - w.Len())
|
|
if capacity > 0 && capacity < len(b) {
|
|
b = b[:capacity]
|
|
}
|
|
*/
|
|
|
|
buffer := buf.MultiBufferContainer{}
|
|
common.Must2(buffer.Write(b))
|
|
|
|
var writed int
|
|
for _, buff := range buffer.MultiBuffer {
|
|
err := w.WriteMultiBuffer(buf.MultiBuffer{buff})
|
|
if err != nil {
|
|
return writed, err
|
|
}
|
|
writed += int(buff.Len())
|
|
}
|
|
return writed, nil
|
|
}
|