mirror of
https://github.com/XTLS/Xray-core.git
synced 2026-05-08 14:13:22 +00:00
Routing: Add webhook to rules (#5722)
https://github.com/XTLS/Xray-core/pull/5722#issuecomment-3953836108
This commit is contained in:
287
app/router/webhook.go
Normal file
287
app/router/webhook.go
Normal file
@@ -0,0 +1,287 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/xtls/xray-core/common/errors"
|
||||
"github.com/xtls/xray-core/features/routing"
|
||||
routing_session "github.com/xtls/xray-core/features/routing/session"
|
||||
)
|
||||
|
||||
// parseURL splits a webhook URL into an HTTP URL and an optional Unix socket
|
||||
// path. For regular http/https URLs the input is returned unchanged with an
|
||||
// empty socketPath. For Unix sockets the format is:
|
||||
//
|
||||
// /path/to/socket.sock:/http/path
|
||||
// @abstract:/http/path
|
||||
// @@padded:/http/path
|
||||
//
|
||||
// The :/ separator after the socket path delimits the HTTP request path.
|
||||
// If omitted, "/" is used.
|
||||
func parseURL(raw string) (httpURL, socketPath string) {
|
||||
if len(raw) == 0 || (!filepath.IsAbs(raw) && raw[0] != '@') {
|
||||
return raw, ""
|
||||
}
|
||||
if idx := strings.Index(raw, ":/"); idx >= 0 {
|
||||
return "http://localhost" + raw[idx+1:], raw[:idx]
|
||||
}
|
||||
return "http://localhost/", raw
|
||||
}
|
||||
|
||||
// resolveSocketPath applies platform-specific transformations to a Unix
|
||||
// socket path, matching the behaviour of the listen side in
|
||||
// transport/internet/system_listener.go.
|
||||
//
|
||||
// For abstract sockets (prefix @) on Linux/Android:
|
||||
// - single @ — used as-is (lock-free abstract socket)
|
||||
// - double @@ — stripped to single @ and padded to
|
||||
// syscall.RawSockaddrUnix{}.Path length (HAProxy compat)
|
||||
func resolveSocketPath(path string) string {
|
||||
if len(path) == 0 || path[0] != '@' {
|
||||
return path
|
||||
}
|
||||
if runtime.GOOS != "linux" && runtime.GOOS != "android" {
|
||||
return path
|
||||
}
|
||||
if len(path) > 1 && path[1] == '@' {
|
||||
fullAddr := make([]byte, len(syscall.RawSockaddrUnix{}.Path))
|
||||
copy(fullAddr, path[1:])
|
||||
return string(fullAddr)
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
func ptr[T any](v T) *T { return &v }
|
||||
|
||||
type event struct {
|
||||
Email *string `json:"email"`
|
||||
Level *uint32 `json:"level"`
|
||||
Protocol *string `json:"protocol"`
|
||||
Network *string `json:"network"`
|
||||
Source *string `json:"source"`
|
||||
Destination *string `json:"destination"`
|
||||
OriginalTarget *string `json:"originalTarget"`
|
||||
RouteTarget *string `json:"routeTarget"`
|
||||
InboundTag *string `json:"inboundTag"`
|
||||
InboundName *string `json:"inboundName"`
|
||||
InboundLocal *string `json:"inboundLocal"`
|
||||
OutboundTag *string `json:"outboundTag"`
|
||||
Timestamp int64 `json:"ts"`
|
||||
}
|
||||
|
||||
type WebhookNotifier struct {
|
||||
url string
|
||||
headers map[string]string
|
||||
deduplication uint32
|
||||
client *http.Client
|
||||
seen sync.Map
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func NewWebhookNotifier(cfg *WebhookConfig) (*WebhookNotifier, error) {
|
||||
if cfg == nil || cfg.Url == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
httpURL, socketPath := parseURL(cfg.Url)
|
||||
h := &WebhookNotifier{
|
||||
url: httpURL,
|
||||
deduplication: cfg.Deduplication,
|
||||
client: &http.Client{
|
||||
Timeout: 5 * time.Second,
|
||||
},
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
if socketPath != "" {
|
||||
dialAddr := resolveSocketPath(socketPath)
|
||||
h.client.Transport = &http.Transport{
|
||||
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
|
||||
var d net.Dialer
|
||||
return d.DialContext(ctx, "unix", dialAddr)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if len(cfg.Headers) > 0 {
|
||||
h.headers = make(map[string]string, len(cfg.Headers))
|
||||
for k, v := range cfg.Headers {
|
||||
h.headers[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if h.deduplication > 0 {
|
||||
h.wg.Add(1)
|
||||
go h.cleanupLoop()
|
||||
}
|
||||
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (h *WebhookNotifier) Fire(ctx routing.Context, outboundTag string) {
|
||||
ev := buildEvent(ctx, outboundTag)
|
||||
|
||||
email := ""
|
||||
if ev.Email != nil {
|
||||
email = *ev.Email
|
||||
}
|
||||
if h.isDuplicate(email) {
|
||||
return
|
||||
}
|
||||
|
||||
h.wg.Add(1)
|
||||
select {
|
||||
case <-h.done:
|
||||
h.wg.Done()
|
||||
return
|
||||
default:
|
||||
}
|
||||
go func() {
|
||||
defer h.wg.Done()
|
||||
h.post(ev)
|
||||
}()
|
||||
}
|
||||
|
||||
func buildEvent(ctx routing.Context, outboundTag string) *event {
|
||||
ev := &event{
|
||||
Timestamp: time.Now().Unix(),
|
||||
OutboundTag: ptr(outboundTag),
|
||||
InboundTag: ptr(ctx.GetInboundTag()),
|
||||
Protocol: ptr(ctx.GetProtocol()),
|
||||
Network: ptr(ctx.GetNetwork().SystemString()),
|
||||
}
|
||||
|
||||
if user := ctx.GetUser(); user != "" {
|
||||
ev.Email = ptr(user)
|
||||
}
|
||||
|
||||
if srcIPs := ctx.GetSourceIPs(); len(srcIPs) > 0 {
|
||||
srcPort := ctx.GetSourcePort()
|
||||
ev.Source = ptr(net.JoinHostPort(srcIPs[0].String(), srcPort.String()))
|
||||
}
|
||||
|
||||
targetPort := ctx.GetTargetPort()
|
||||
if domain := ctx.GetTargetDomain(); domain != "" {
|
||||
ev.Destination = ptr(net.JoinHostPort(domain, targetPort.String()))
|
||||
} else if targetIPs := ctx.GetTargetIPs(); len(targetIPs) > 0 {
|
||||
ev.Destination = ptr(net.JoinHostPort(targetIPs[0].String(), targetPort.String()))
|
||||
}
|
||||
|
||||
if localIPs := ctx.GetLocalIPs(); len(localIPs) > 0 {
|
||||
localPort := ctx.GetLocalPort()
|
||||
ev.InboundLocal = ptr(net.JoinHostPort(localIPs[0].String(), localPort.String()))
|
||||
}
|
||||
|
||||
if sctx, ok := ctx.(*routing_session.Context); ok {
|
||||
enrichFromSession(ev, sctx)
|
||||
}
|
||||
|
||||
return ev
|
||||
}
|
||||
|
||||
func enrichFromSession(ev *event, sctx *routing_session.Context) {
|
||||
if sctx.Inbound != nil {
|
||||
ev.InboundName = ptr(sctx.Inbound.Name)
|
||||
if sctx.Inbound.User != nil {
|
||||
ev.Level = ptr(sctx.Inbound.User.Level)
|
||||
}
|
||||
}
|
||||
if sctx.Outbound != nil {
|
||||
if sctx.Outbound.OriginalTarget.Address != nil {
|
||||
ev.OriginalTarget = ptr(sctx.Outbound.OriginalTarget.String())
|
||||
}
|
||||
if sctx.Outbound.RouteTarget.Address != nil {
|
||||
ev.RouteTarget = ptr(sctx.Outbound.RouteTarget.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *WebhookNotifier) post(ev *event) {
|
||||
body, err := json.Marshal(ev)
|
||||
if err != nil {
|
||||
errors.LogWarning(context.Background(), "webhook: marshal failed: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, h.url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
errors.LogWarning(context.Background(), "webhook: request build failed: ", err)
|
||||
return
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
for k, v := range h.headers {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
resp, err := h.client.Do(req)
|
||||
if err != nil {
|
||||
errors.LogInfo(context.Background(), "webhook: POST failed: ", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
}()
|
||||
if resp.StatusCode >= 400 {
|
||||
errors.LogWarning(context.Background(), "webhook: POST returned status ", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *WebhookNotifier) isDuplicate(email string) bool {
|
||||
if h.deduplication == 0 || email == "" {
|
||||
return false
|
||||
}
|
||||
ttl := time.Duration(h.deduplication) * time.Second
|
||||
now := time.Now()
|
||||
if v, loaded := h.seen.LoadOrStore(email, now); loaded {
|
||||
if now.Sub(v.(time.Time)) < ttl {
|
||||
return true
|
||||
}
|
||||
h.seen.Store(email, now)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (h *WebhookNotifier) cleanupLoop() {
|
||||
defer h.wg.Done()
|
||||
ttl := time.Duration(h.deduplication) * time.Second
|
||||
ticker := time.NewTicker(ttl)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-h.done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
now := time.Now()
|
||||
h.seen.Range(func(key, value any) bool {
|
||||
if now.Sub(value.(time.Time)) >= ttl {
|
||||
h.seen.Delete(key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *WebhookNotifier) Close() error {
|
||||
h.closeOnce.Do(func() {
|
||||
close(h.done)
|
||||
})
|
||||
h.wg.Wait()
|
||||
h.client.CloseIdleConnections()
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user