From 78fc2865ea115ca7ade7995486b4368a07dedb93 Mon Sep 17 00:00:00 2001 From: Yury Kastov Date: Sat, 7 Mar 2026 13:49:46 +0300 Subject: [PATCH] Routing: Add `webhook` to `rules` (#5722) https://github.com/XTLS/Xray-core/pull/5722#issuecomment-3953836108 --- app/router/config.go | 1 + app/router/config.pb.go | 174 +++++++++++++++++------- app/router/config.proto | 7 + app/router/router.go | 60 +++++++++ app/router/webhook.go | 287 ++++++++++++++++++++++++++++++++++++++++ infra/conf/router.go | 47 ++++--- 6 files changed, 513 insertions(+), 63 deletions(-) create mode 100644 app/router/webhook.go diff --git a/app/router/config.go b/app/router/config.go index 4288f2af..4acbaf41 100644 --- a/app/router/config.go +++ b/app/router/config.go @@ -18,6 +18,7 @@ type Rule struct { RuleTag string Balancer *Balancer Condition Condition + Webhook *WebhookNotifier } func (r *Rule) GetTag() (string, error) { diff --git a/app/router/config.pb.go b/app/router/config.pb.go index 4ee7165b..40676024 100644 --- a/app/router/config.pb.go +++ b/app/router/config.pb.go @@ -129,7 +129,7 @@ func (x Config_DomainStrategy) Number() protoreflect.EnumNumber { // Deprecated: Use Config_DomainStrategy.Descriptor instead. func (Config_DomainStrategy) EnumDescriptor() ([]byte, []int) { - return file_app_router_config_proto_rawDescGZIP(), []int{10, 0} + return file_app_router_config_proto_rawDescGZIP(), []int{11, 0} } // Domain for routing decision. @@ -483,6 +483,7 @@ type RoutingRule struct { LocalPortList *net.PortList `protobuf:"bytes,18,opt,name=local_port_list,json=localPortList,proto3" json:"local_port_list,omitempty"` VlessRouteList *net.PortList `protobuf:"bytes,20,opt,name=vless_route_list,json=vlessRouteList,proto3" json:"vless_route_list,omitempty"` Process []string `protobuf:"bytes,21,rep,name=process,proto3" json:"process,omitempty"` + Webhook *WebhookConfig `protobuf:"bytes,22,opt,name=webhook,proto3" json:"webhook,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -647,6 +648,13 @@ func (x *RoutingRule) GetProcess() []string { return nil } +func (x *RoutingRule) GetWebhook() *WebhookConfig { + if x != nil { + return x.Webhook + } + return nil +} + type isRoutingRule_TargetTag interface { isRoutingRule_TargetTag() } @@ -665,6 +673,66 @@ func (*RoutingRule_Tag) isRoutingRule_TargetTag() {} func (*RoutingRule_BalancingTag) isRoutingRule_TargetTag() {} +type WebhookConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` + Deduplication uint32 `protobuf:"varint,2,opt,name=deduplication,proto3" json:"deduplication,omitempty"` + Headers map[string]string `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WebhookConfig) Reset() { + *x = WebhookConfig{} + mi := &file_app_router_config_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WebhookConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WebhookConfig) ProtoMessage() {} + +func (x *WebhookConfig) ProtoReflect() protoreflect.Message { + mi := &file_app_router_config_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WebhookConfig.ProtoReflect.Descriptor instead. +func (*WebhookConfig) Descriptor() ([]byte, []int) { + return file_app_router_config_proto_rawDescGZIP(), []int{7} +} + +func (x *WebhookConfig) GetUrl() string { + if x != nil { + return x.Url + } + return "" +} + +func (x *WebhookConfig) GetDeduplication() uint32 { + if x != nil { + return x.Deduplication + } + return 0 +} + +func (x *WebhookConfig) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + type BalancingRule struct { state protoimpl.MessageState `protogen:"open.v1"` Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"` @@ -678,7 +746,7 @@ type BalancingRule struct { func (x *BalancingRule) Reset() { *x = BalancingRule{} - mi := &file_app_router_config_proto_msgTypes[7] + mi := &file_app_router_config_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -690,7 +758,7 @@ func (x *BalancingRule) String() string { func (*BalancingRule) ProtoMessage() {} func (x *BalancingRule) ProtoReflect() protoreflect.Message { - mi := &file_app_router_config_proto_msgTypes[7] + mi := &file_app_router_config_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -703,7 +771,7 @@ func (x *BalancingRule) ProtoReflect() protoreflect.Message { // Deprecated: Use BalancingRule.ProtoReflect.Descriptor instead. func (*BalancingRule) Descriptor() ([]byte, []int) { - return file_app_router_config_proto_rawDescGZIP(), []int{7} + return file_app_router_config_proto_rawDescGZIP(), []int{8} } func (x *BalancingRule) GetTag() string { @@ -752,7 +820,7 @@ type StrategyWeight struct { func (x *StrategyWeight) Reset() { *x = StrategyWeight{} - mi := &file_app_router_config_proto_msgTypes[8] + mi := &file_app_router_config_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -764,7 +832,7 @@ func (x *StrategyWeight) String() string { func (*StrategyWeight) ProtoMessage() {} func (x *StrategyWeight) ProtoReflect() protoreflect.Message { - mi := &file_app_router_config_proto_msgTypes[8] + mi := &file_app_router_config_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -777,7 +845,7 @@ func (x *StrategyWeight) ProtoReflect() protoreflect.Message { // Deprecated: Use StrategyWeight.ProtoReflect.Descriptor instead. func (*StrategyWeight) Descriptor() ([]byte, []int) { - return file_app_router_config_proto_rawDescGZIP(), []int{8} + return file_app_router_config_proto_rawDescGZIP(), []int{9} } func (x *StrategyWeight) GetRegexp() bool { @@ -819,7 +887,7 @@ type StrategyLeastLoadConfig struct { func (x *StrategyLeastLoadConfig) Reset() { *x = StrategyLeastLoadConfig{} - mi := &file_app_router_config_proto_msgTypes[9] + mi := &file_app_router_config_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -831,7 +899,7 @@ func (x *StrategyLeastLoadConfig) String() string { func (*StrategyLeastLoadConfig) ProtoMessage() {} func (x *StrategyLeastLoadConfig) ProtoReflect() protoreflect.Message { - mi := &file_app_router_config_proto_msgTypes[9] + mi := &file_app_router_config_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -844,7 +912,7 @@ func (x *StrategyLeastLoadConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use StrategyLeastLoadConfig.ProtoReflect.Descriptor instead. func (*StrategyLeastLoadConfig) Descriptor() ([]byte, []int) { - return file_app_router_config_proto_rawDescGZIP(), []int{9} + return file_app_router_config_proto_rawDescGZIP(), []int{10} } func (x *StrategyLeastLoadConfig) GetCosts() []*StrategyWeight { @@ -893,7 +961,7 @@ type Config struct { func (x *Config) Reset() { *x = Config{} - mi := &file_app_router_config_proto_msgTypes[10] + mi := &file_app_router_config_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -905,7 +973,7 @@ func (x *Config) String() string { func (*Config) ProtoMessage() {} func (x *Config) ProtoReflect() protoreflect.Message { - mi := &file_app_router_config_proto_msgTypes[10] + mi := &file_app_router_config_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -918,7 +986,7 @@ func (x *Config) ProtoReflect() protoreflect.Message { // Deprecated: Use Config.ProtoReflect.Descriptor instead. func (*Config) Descriptor() ([]byte, []int) { - return file_app_router_config_proto_rawDescGZIP(), []int{10} + return file_app_router_config_proto_rawDescGZIP(), []int{11} } func (x *Config) GetDomainStrategy() Config_DomainStrategy { @@ -956,7 +1024,7 @@ type Domain_Attribute struct { func (x *Domain_Attribute) Reset() { *x = Domain_Attribute{} - mi := &file_app_router_config_proto_msgTypes[11] + mi := &file_app_router_config_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -968,7 +1036,7 @@ func (x *Domain_Attribute) String() string { func (*Domain_Attribute) ProtoMessage() {} func (x *Domain_Attribute) ProtoReflect() protoreflect.Message { - mi := &file_app_router_config_proto_msgTypes[11] + mi := &file_app_router_config_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1066,7 +1134,7 @@ const file_app_router_config_proto_rawDesc = "" + "\fcountry_code\x18\x01 \x01(\tR\vcountryCode\x12/\n" + "\x06domain\x18\x02 \x03(\v2\x17.xray.app.router.DomainR\x06domain\"=\n" + "\vGeoSiteList\x12.\n" + - "\x05entry\x18\x01 \x03(\v2\x18.xray.app.router.GeoSiteR\x05entry\"\x82\a\n" + + "\x05entry\x18\x01 \x03(\v2\x18.xray.app.router.GeoSiteR\x05entry\"\xbc\a\n" + "\vRoutingRule\x12\x12\n" + "\x03tag\x18\x01 \x01(\tH\x00R\x03tag\x12%\n" + "\rbalancing_tag\x18\f \x01(\tH\x00R\fbalancingTag\x12\x19\n" + @@ -1090,12 +1158,20 @@ const file_app_router_config_proto_rawDesc = "" + "localGeoip\x12A\n" + "\x0flocal_port_list\x18\x12 \x01(\v2\x19.xray.common.net.PortListR\rlocalPortList\x12C\n" + "\x10vless_route_list\x18\x14 \x01(\v2\x19.xray.common.net.PortListR\x0evlessRouteList\x12\x18\n" + - "\aprocess\x18\x15 \x03(\tR\aprocess\x1a=\n" + + "\aprocess\x18\x15 \x03(\tR\aprocess\x128\n" + + "\awebhook\x18\x16 \x01(\v2\x1e.xray.app.router.WebhookConfigR\awebhook\x1a=\n" + "\x0fAttributesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01B\f\n" + "\n" + - "target_tag\"\xdc\x01\n" + + "target_tag\"\xca\x01\n" + + "\rWebhookConfig\x12\x10\n" + + "\x03url\x18\x01 \x01(\tR\x03url\x12$\n" + + "\rdeduplication\x18\x02 \x01(\rR\rdeduplication\x12E\n" + + "\aheaders\x18\x03 \x03(\v2+.xray.app.router.WebhookConfig.HeadersEntryR\aheaders\x1a:\n" + + "\fHeadersEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xdc\x01\n" + "\rBalancingRule\x12\x10\n" + "\x03tag\x18\x01 \x01(\tR\x03tag\x12+\n" + "\x11outbound_selector\x18\x02 \x03(\tR\x10outboundSelector\x12\x1a\n" + @@ -1136,7 +1212,7 @@ func file_app_router_config_proto_rawDescGZIP() []byte { } var file_app_router_config_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_app_router_config_proto_msgTypes = make([]protoimpl.MessageInfo, 13) +var file_app_router_config_proto_msgTypes = make([]protoimpl.MessageInfo, 15) var file_app_router_config_proto_goTypes = []any{ (Domain_Type)(0), // 0: xray.app.router.Domain.Type (Config_DomainStrategy)(0), // 1: xray.app.router.Config.DomainStrategy @@ -1147,43 +1223,47 @@ var file_app_router_config_proto_goTypes = []any{ (*GeoSite)(nil), // 6: xray.app.router.GeoSite (*GeoSiteList)(nil), // 7: xray.app.router.GeoSiteList (*RoutingRule)(nil), // 8: xray.app.router.RoutingRule - (*BalancingRule)(nil), // 9: xray.app.router.BalancingRule - (*StrategyWeight)(nil), // 10: xray.app.router.StrategyWeight - (*StrategyLeastLoadConfig)(nil), // 11: xray.app.router.StrategyLeastLoadConfig - (*Config)(nil), // 12: xray.app.router.Config - (*Domain_Attribute)(nil), // 13: xray.app.router.Domain.Attribute - nil, // 14: xray.app.router.RoutingRule.AttributesEntry - (*net.PortList)(nil), // 15: xray.common.net.PortList - (net.Network)(0), // 16: xray.common.net.Network - (*serial.TypedMessage)(nil), // 17: xray.common.serial.TypedMessage + (*WebhookConfig)(nil), // 9: xray.app.router.WebhookConfig + (*BalancingRule)(nil), // 10: xray.app.router.BalancingRule + (*StrategyWeight)(nil), // 11: xray.app.router.StrategyWeight + (*StrategyLeastLoadConfig)(nil), // 12: xray.app.router.StrategyLeastLoadConfig + (*Config)(nil), // 13: xray.app.router.Config + (*Domain_Attribute)(nil), // 14: xray.app.router.Domain.Attribute + nil, // 15: xray.app.router.RoutingRule.AttributesEntry + nil, // 16: xray.app.router.WebhookConfig.HeadersEntry + (*net.PortList)(nil), // 17: xray.common.net.PortList + (net.Network)(0), // 18: xray.common.net.Network + (*serial.TypedMessage)(nil), // 19: xray.common.serial.TypedMessage } var file_app_router_config_proto_depIdxs = []int32{ 0, // 0: xray.app.router.Domain.type:type_name -> xray.app.router.Domain.Type - 13, // 1: xray.app.router.Domain.attribute:type_name -> xray.app.router.Domain.Attribute + 14, // 1: xray.app.router.Domain.attribute:type_name -> xray.app.router.Domain.Attribute 3, // 2: xray.app.router.GeoIP.cidr:type_name -> xray.app.router.CIDR 4, // 3: xray.app.router.GeoIPList.entry:type_name -> xray.app.router.GeoIP 2, // 4: xray.app.router.GeoSite.domain:type_name -> xray.app.router.Domain 6, // 5: xray.app.router.GeoSiteList.entry:type_name -> xray.app.router.GeoSite 2, // 6: xray.app.router.RoutingRule.domain:type_name -> xray.app.router.Domain 4, // 7: xray.app.router.RoutingRule.geoip:type_name -> xray.app.router.GeoIP - 15, // 8: xray.app.router.RoutingRule.port_list:type_name -> xray.common.net.PortList - 16, // 9: xray.app.router.RoutingRule.networks:type_name -> xray.common.net.Network + 17, // 8: xray.app.router.RoutingRule.port_list:type_name -> xray.common.net.PortList + 18, // 9: xray.app.router.RoutingRule.networks:type_name -> xray.common.net.Network 4, // 10: xray.app.router.RoutingRule.source_geoip:type_name -> xray.app.router.GeoIP - 15, // 11: xray.app.router.RoutingRule.source_port_list:type_name -> xray.common.net.PortList - 14, // 12: xray.app.router.RoutingRule.attributes:type_name -> xray.app.router.RoutingRule.AttributesEntry + 17, // 11: xray.app.router.RoutingRule.source_port_list:type_name -> xray.common.net.PortList + 15, // 12: xray.app.router.RoutingRule.attributes:type_name -> xray.app.router.RoutingRule.AttributesEntry 4, // 13: xray.app.router.RoutingRule.local_geoip:type_name -> xray.app.router.GeoIP - 15, // 14: xray.app.router.RoutingRule.local_port_list:type_name -> xray.common.net.PortList - 15, // 15: xray.app.router.RoutingRule.vless_route_list:type_name -> xray.common.net.PortList - 17, // 16: xray.app.router.BalancingRule.strategy_settings:type_name -> xray.common.serial.TypedMessage - 10, // 17: xray.app.router.StrategyLeastLoadConfig.costs:type_name -> xray.app.router.StrategyWeight - 1, // 18: xray.app.router.Config.domain_strategy:type_name -> xray.app.router.Config.DomainStrategy - 8, // 19: xray.app.router.Config.rule:type_name -> xray.app.router.RoutingRule - 9, // 20: xray.app.router.Config.balancing_rule:type_name -> xray.app.router.BalancingRule - 21, // [21:21] is the sub-list for method output_type - 21, // [21:21] is the sub-list for method input_type - 21, // [21:21] is the sub-list for extension type_name - 21, // [21:21] is the sub-list for extension extendee - 0, // [0:21] is the sub-list for field type_name + 17, // 14: xray.app.router.RoutingRule.local_port_list:type_name -> xray.common.net.PortList + 17, // 15: xray.app.router.RoutingRule.vless_route_list:type_name -> xray.common.net.PortList + 9, // 16: xray.app.router.RoutingRule.webhook:type_name -> xray.app.router.WebhookConfig + 16, // 17: xray.app.router.WebhookConfig.headers:type_name -> xray.app.router.WebhookConfig.HeadersEntry + 19, // 18: xray.app.router.BalancingRule.strategy_settings:type_name -> xray.common.serial.TypedMessage + 11, // 19: xray.app.router.StrategyLeastLoadConfig.costs:type_name -> xray.app.router.StrategyWeight + 1, // 20: xray.app.router.Config.domain_strategy:type_name -> xray.app.router.Config.DomainStrategy + 8, // 21: xray.app.router.Config.rule:type_name -> xray.app.router.RoutingRule + 10, // 22: xray.app.router.Config.balancing_rule:type_name -> xray.app.router.BalancingRule + 23, // [23:23] is the sub-list for method output_type + 23, // [23:23] is the sub-list for method input_type + 23, // [23:23] is the sub-list for extension type_name + 23, // [23:23] is the sub-list for extension extendee + 0, // [0:23] is the sub-list for field type_name } func init() { file_app_router_config_proto_init() } @@ -1195,7 +1275,7 @@ func file_app_router_config_proto_init() { (*RoutingRule_Tag)(nil), (*RoutingRule_BalancingTag)(nil), } - file_app_router_config_proto_msgTypes[11].OneofWrappers = []any{ + file_app_router_config_proto_msgTypes[12].OneofWrappers = []any{ (*Domain_Attribute_BoolValue)(nil), (*Domain_Attribute_IntValue)(nil), } @@ -1205,7 +1285,7 @@ func file_app_router_config_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_app_router_config_proto_rawDesc), len(file_app_router_config_proto_rawDesc)), NumEnums: 2, - NumMessages: 13, + NumMessages: 15, NumExtensions: 0, NumServices: 0, }, diff --git a/app/router/config.proto b/app/router/config.proto index 20da23ba..07fe4c51 100644 --- a/app/router/config.proto +++ b/app/router/config.proto @@ -114,6 +114,13 @@ message RoutingRule { xray.common.net.PortList vless_route_list = 20; repeated string process = 21; + WebhookConfig webhook = 22; +} + +message WebhookConfig { + string url = 1; + uint32 deduplication = 2; + map headers = 3; } message BalancingRule { diff --git a/app/router/router.go b/app/router/router.go index 790bf8e2..df770a3a 100644 --- a/app/router/router.go +++ b/app/router/router.go @@ -57,6 +57,7 @@ func (r *Router) Init(ctx context.Context, config *Config, d dns.Client, ohm out for _, rule := range config.Rule { cond, err := rule.BuildCondition() if err != nil { + r.closeWebhooks() return err } rr := &Rule{ @@ -64,10 +65,22 @@ func (r *Router) Init(ctx context.Context, config *Config, d dns.Client, ohm out Tag: rule.GetTag(), RuleTag: rule.GetRuleTag(), } + if wh := rule.GetWebhook(); wh != nil { + notifier, err := NewWebhookNotifier(wh) + if err != nil { + r.closeWebhooks() + return err + } + rr.Webhook = notifier + } btag := rule.GetBalancingTag() if len(btag) > 0 { brule, found := r.balancers[btag] if !found { + if rr.Webhook != nil { + rr.Webhook.Close() + } + r.closeWebhooks() return errors.New("balancer ", btag, " not found") } rr.Balancer = brule @@ -80,6 +93,7 @@ func (r *Router) Init(ctx context.Context, config *Config, d dns.Client, ohm out // PickRoute implements routing.Router. func (r *Router) PickRoute(ctx routing.Context) (routing.Route, error) { + originalCtx := ctx rule, ctx, err := r.pickRouteInternal(ctx) if err != nil { return nil, err @@ -88,6 +102,9 @@ func (r *Router) PickRoute(ctx routing.Context) (routing.Route, error) { if err != nil { return nil, err } + if rule.Webhook != nil { + rule.Webhook.Fire(originalCtx, tag) + } return &Route{Context: ctx, outboundTag: tag, ruleTag: rule.RuleTag}, nil } @@ -109,6 +126,11 @@ func (r *Router) ReloadRules(config *Config, shouldAppend bool) error { defer r.mu.Unlock() if !shouldAppend { + for _, rule := range r.rules { + if rule.Webhook != nil { + rule.Webhook.Close() + } + } r.balancers = make(map[string]*Balancer, len(config.BalancingRule)) r.rules = make([]*Rule, 0, len(config.Rule)) } @@ -125,12 +147,24 @@ func (r *Router) ReloadRules(config *Config, shouldAppend bool) error { r.balancers[rule.Tag] = balancer } + startIdx := len(r.rules) + closeNewWebhooks := func() { + for i := startIdx; i < len(r.rules); i++ { + if r.rules[i].Webhook != nil { + r.rules[i].Webhook.Close() + } + } + r.rules = r.rules[:startIdx] + } + for _, rule := range config.Rule { if r.RuleExists(rule.GetRuleTag()) { + closeNewWebhooks() return errors.New("duplicate ruleTag ", rule.GetRuleTag()) } cond, err := rule.BuildCondition() if err != nil { + closeNewWebhooks() return err } rr := &Rule{ @@ -138,10 +172,22 @@ func (r *Router) ReloadRules(config *Config, shouldAppend bool) error { Tag: rule.GetTag(), RuleTag: rule.GetRuleTag(), } + if wh := rule.GetWebhook(); wh != nil { + notifier, err := NewWebhookNotifier(wh) + if err != nil { + closeNewWebhooks() + return err + } + rr.Webhook = notifier + } btag := rule.GetBalancingTag() if len(btag) > 0 { brule, found := r.balancers[btag] if !found { + if rr.Webhook != nil { + rr.Webhook.Close() + } + closeNewWebhooks() return errors.New("balancer ", btag, " not found") } rr.Balancer = brule @@ -173,6 +219,8 @@ func (r *Router) RemoveRule(tag string) error { for _, rule := range r.rules { if rule.RuleTag != tag { newRules = append(newRules, rule) + } else if rule.Webhook != nil { + rule.Webhook.Close() } } r.rules = newRules @@ -233,8 +281,20 @@ func (r *Router) Start() error { return nil } +// closeWebhooks closes all webhook notifiers in the current rule set. +func (r *Router) closeWebhooks() { + for _, rule := range r.rules { + if rule.Webhook != nil { + rule.Webhook.Close() + } + } +} + // Close implements common.Closable. func (r *Router) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + r.closeWebhooks() return nil } diff --git a/app/router/webhook.go b/app/router/webhook.go new file mode 100644 index 00000000..32ae2887 --- /dev/null +++ b/app/router/webhook.go @@ -0,0 +1,287 @@ +package router + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net" + "net/http" + "path/filepath" + "runtime" + "strings" + "sync" + "syscall" + "time" + + "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/features/routing" + routing_session "github.com/xtls/xray-core/features/routing/session" +) + +// parseURL splits a webhook URL into an HTTP URL and an optional Unix socket +// path. For regular http/https URLs the input is returned unchanged with an +// empty socketPath. For Unix sockets the format is: +// +// /path/to/socket.sock:/http/path +// @abstract:/http/path +// @@padded:/http/path +// +// The :/ separator after the socket path delimits the HTTP request path. +// If omitted, "/" is used. +func parseURL(raw string) (httpURL, socketPath string) { + if len(raw) == 0 || (!filepath.IsAbs(raw) && raw[0] != '@') { + return raw, "" + } + if idx := strings.Index(raw, ":/"); idx >= 0 { + return "http://localhost" + raw[idx+1:], raw[:idx] + } + return "http://localhost/", raw +} + +// resolveSocketPath applies platform-specific transformations to a Unix +// socket path, matching the behaviour of the listen side in +// transport/internet/system_listener.go. +// +// For abstract sockets (prefix @) on Linux/Android: +// - single @ — used as-is (lock-free abstract socket) +// - double @@ — stripped to single @ and padded to +// syscall.RawSockaddrUnix{}.Path length (HAProxy compat) +func resolveSocketPath(path string) string { + if len(path) == 0 || path[0] != '@' { + return path + } + if runtime.GOOS != "linux" && runtime.GOOS != "android" { + return path + } + if len(path) > 1 && path[1] == '@' { + fullAddr := make([]byte, len(syscall.RawSockaddrUnix{}.Path)) + copy(fullAddr, path[1:]) + return string(fullAddr) + } + return path +} + +func ptr[T any](v T) *T { return &v } + +type event struct { + Email *string `json:"email"` + Level *uint32 `json:"level"` + Protocol *string `json:"protocol"` + Network *string `json:"network"` + Source *string `json:"source"` + Destination *string `json:"destination"` + OriginalTarget *string `json:"originalTarget"` + RouteTarget *string `json:"routeTarget"` + InboundTag *string `json:"inboundTag"` + InboundName *string `json:"inboundName"` + InboundLocal *string `json:"inboundLocal"` + OutboundTag *string `json:"outboundTag"` + Timestamp int64 `json:"ts"` +} + +type WebhookNotifier struct { + url string + headers map[string]string + deduplication uint32 + client *http.Client + seen sync.Map + done chan struct{} + wg sync.WaitGroup + closeOnce sync.Once +} + +func NewWebhookNotifier(cfg *WebhookConfig) (*WebhookNotifier, error) { + if cfg == nil || cfg.Url == "" { + return nil, nil + } + + httpURL, socketPath := parseURL(cfg.Url) + h := &WebhookNotifier{ + url: httpURL, + deduplication: cfg.Deduplication, + client: &http.Client{ + Timeout: 5 * time.Second, + }, + done: make(chan struct{}), + } + + if socketPath != "" { + dialAddr := resolveSocketPath(socketPath) + h.client.Transport = &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", dialAddr) + }, + } + } + + if len(cfg.Headers) > 0 { + h.headers = make(map[string]string, len(cfg.Headers)) + for k, v := range cfg.Headers { + h.headers[k] = v + } + } + + if h.deduplication > 0 { + h.wg.Add(1) + go h.cleanupLoop() + } + + return h, nil +} + +func (h *WebhookNotifier) Fire(ctx routing.Context, outboundTag string) { + ev := buildEvent(ctx, outboundTag) + + email := "" + if ev.Email != nil { + email = *ev.Email + } + if h.isDuplicate(email) { + return + } + + h.wg.Add(1) + select { + case <-h.done: + h.wg.Done() + return + default: + } + go func() { + defer h.wg.Done() + h.post(ev) + }() +} + +func buildEvent(ctx routing.Context, outboundTag string) *event { + ev := &event{ + Timestamp: time.Now().Unix(), + OutboundTag: ptr(outboundTag), + InboundTag: ptr(ctx.GetInboundTag()), + Protocol: ptr(ctx.GetProtocol()), + Network: ptr(ctx.GetNetwork().SystemString()), + } + + if user := ctx.GetUser(); user != "" { + ev.Email = ptr(user) + } + + if srcIPs := ctx.GetSourceIPs(); len(srcIPs) > 0 { + srcPort := ctx.GetSourcePort() + ev.Source = ptr(net.JoinHostPort(srcIPs[0].String(), srcPort.String())) + } + + targetPort := ctx.GetTargetPort() + if domain := ctx.GetTargetDomain(); domain != "" { + ev.Destination = ptr(net.JoinHostPort(domain, targetPort.String())) + } else if targetIPs := ctx.GetTargetIPs(); len(targetIPs) > 0 { + ev.Destination = ptr(net.JoinHostPort(targetIPs[0].String(), targetPort.String())) + } + + if localIPs := ctx.GetLocalIPs(); len(localIPs) > 0 { + localPort := ctx.GetLocalPort() + ev.InboundLocal = ptr(net.JoinHostPort(localIPs[0].String(), localPort.String())) + } + + if sctx, ok := ctx.(*routing_session.Context); ok { + enrichFromSession(ev, sctx) + } + + return ev +} + +func enrichFromSession(ev *event, sctx *routing_session.Context) { + if sctx.Inbound != nil { + ev.InboundName = ptr(sctx.Inbound.Name) + if sctx.Inbound.User != nil { + ev.Level = ptr(sctx.Inbound.User.Level) + } + } + if sctx.Outbound != nil { + if sctx.Outbound.OriginalTarget.Address != nil { + ev.OriginalTarget = ptr(sctx.Outbound.OriginalTarget.String()) + } + if sctx.Outbound.RouteTarget.Address != nil { + ev.RouteTarget = ptr(sctx.Outbound.RouteTarget.String()) + } + } +} + +func (h *WebhookNotifier) post(ev *event) { + body, err := json.Marshal(ev) + if err != nil { + errors.LogWarning(context.Background(), "webhook: marshal failed: ", err) + return + } + + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, h.url, bytes.NewReader(body)) + if err != nil { + errors.LogWarning(context.Background(), "webhook: request build failed: ", err) + return + } + req.Header.Set("Content-Type", "application/json") + + for k, v := range h.headers { + req.Header.Set(k, v) + } + + resp, err := h.client.Do(req) + if err != nil { + errors.LogInfo(context.Background(), "webhook: POST failed: ", err) + return + } + defer func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + if resp.StatusCode >= 400 { + errors.LogWarning(context.Background(), "webhook: POST returned status ", resp.StatusCode) + } +} + +func (h *WebhookNotifier) isDuplicate(email string) bool { + if h.deduplication == 0 || email == "" { + return false + } + ttl := time.Duration(h.deduplication) * time.Second + now := time.Now() + if v, loaded := h.seen.LoadOrStore(email, now); loaded { + if now.Sub(v.(time.Time)) < ttl { + return true + } + h.seen.Store(email, now) + } + return false +} + +func (h *WebhookNotifier) cleanupLoop() { + defer h.wg.Done() + ttl := time.Duration(h.deduplication) * time.Second + ticker := time.NewTicker(ttl) + defer ticker.Stop() + for { + select { + case <-h.done: + return + case <-ticker.C: + now := time.Now() + h.seen.Range(func(key, value any) bool { + if now.Sub(value.(time.Time)) >= ttl { + h.seen.Delete(key) + } + return true + }) + } + } +} + +func (h *WebhookNotifier) Close() error { + h.closeOnce.Do(func() { + close(h.done) + }) + h.wg.Wait() + h.client.CloseIdleConnections() + return nil +} diff --git a/infra/conf/router.go b/infra/conf/router.go index bc324610..a488d397 100644 --- a/infra/conf/router.go +++ b/infra/conf/router.go @@ -522,25 +522,32 @@ func ToCidrList(ips StringList) ([]*router.GeoIP, error) { return geoipList, nil } +type WebhookRuleConfig struct { + URL string `json:"url"` + Deduplication uint32 `json:"deduplication"` + Headers map[string]string `json:"headers"` +} + func parseFieldRule(msg json.RawMessage) (*router.RoutingRule, error) { type RawFieldRule struct { RouterRule - Domain *StringList `json:"domain"` - Domains *StringList `json:"domains"` - IP *StringList `json:"ip"` - Port *PortList `json:"port"` - Network *NetworkList `json:"network"` - SourceIP *StringList `json:"sourceIP"` - Source *StringList `json:"source"` - SourcePort *PortList `json:"sourcePort"` - User *StringList `json:"user"` - VlessRoute *PortList `json:"vlessRoute"` - InboundTag *StringList `json:"inboundTag"` - Protocols *StringList `json:"protocol"` - Attributes map[string]string `json:"attrs"` - LocalIP *StringList `json:"localIP"` - LocalPort *PortList `json:"localPort"` - Process *StringList `json:"process"` + Domain *StringList `json:"domain"` + Domains *StringList `json:"domains"` + IP *StringList `json:"ip"` + Port *PortList `json:"port"` + Network *NetworkList `json:"network"` + SourceIP *StringList `json:"sourceIP"` + Source *StringList `json:"source"` + SourcePort *PortList `json:"sourcePort"` + User *StringList `json:"user"` + VlessRoute *PortList `json:"vlessRoute"` + InboundTag *StringList `json:"inboundTag"` + Protocols *StringList `json:"protocol"` + Attributes map[string]string `json:"attrs"` + LocalIP *StringList `json:"localIP"` + LocalPort *PortList `json:"localPort"` + Process *StringList `json:"process"` + Webhook *WebhookRuleConfig `json:"webhook"` } rawFieldRule := new(RawFieldRule) err := json.Unmarshal(msg, rawFieldRule) @@ -657,6 +664,14 @@ func parseFieldRule(msg json.RawMessage) (*router.RoutingRule, error) { rule.Process = *rawFieldRule.Process } + if rawFieldRule.Webhook != nil && rawFieldRule.Webhook.URL != "" { + rule.Webhook = &router.WebhookConfig{ + Url: rawFieldRule.Webhook.URL, + Deduplication: rawFieldRule.Webhook.Deduplication, + Headers: rawFieldRule.Webhook.Headers, + } + } + return rule, nil }