Merge pull request #712 from wumode/tobypasstrackers

This commit is contained in:
jxxghp
2025-03-18 16:47:21 +08:00
committed by GitHub
3 changed files with 136 additions and 22 deletions

View File

@@ -395,12 +395,13 @@
"name": "绕过Trackers",
"description": "提供tracker服务器IP地址列表帮助IPv6连接绕过OpenClash",
"labels": "工具",
"version": "1.0",
"version": "1.1",
"icon": "Clash_A.png",
"author": "wumode",
"level": 2,
"history": {
"v1.0": "支持自定义Trackers"
"v1.0": "支持自定义Trackers",
"v1.1": "更新列表后发送通知"
}
}
}
}

View File

@@ -57,6 +57,7 @@ class ToBypassTrackers(_PluginBase):
_notify = False
_onlyonce: bool = False
_custom_trackers: str = ""
_exempted_domains: str = ""
_bypassed_sites: list = []
_china_ip_route: bool = True
_china_ipv6_route: bool = True
@@ -87,6 +88,7 @@ class ToBypassTrackers(_PluginBase):
self._onlyonce = config.get("onlyonce")
self._notify = config.get("notify")
self._custom_trackers = config.get("custom_trackers")
self._exempted_domains = config.get("exempted_domains")
self._bypassed_sites = config.get("bypassed_sites") or []
self._bypass_ipv4 = config.get("bypass_ipv4")
self._bypass_ipv6 = config.get("bypass_ipv6")
@@ -107,7 +109,7 @@ class ToBypassTrackers(_PluginBase):
)
self._onlyonce = False
self.__update_config()
self._scheduler.print_jobs()
# self._scheduler.print_jobs()
self._scheduler.start()
def get_state(self) -> bool:
@@ -122,6 +124,7 @@ class ToBypassTrackers(_PluginBase):
"onlyonce": self._onlyonce,
"bypassed_sites": self._bypassed_sites,
"custom_trackers": self._custom_trackers,
"exempted_domains": self._exempted_domains,
"notify": self._notify,
"dns_input": self._dns_input,
"china_ip_route": self._china_ip_route,
@@ -348,7 +351,8 @@ class ToBypassTrackers(_PluginBase):
{
'component': 'VCol',
'props': {
'cols': 12
'cols': 12,
'md': 6
},
'content': [
{
@@ -361,6 +365,24 @@ class ToBypassTrackers(_PluginBase):
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'exempted_domains',
'label': '排除的域名和IP',
'rows': 3,
'placeholder': '每行一个域名或IP'
}
}
]
}
]
},
@@ -378,11 +400,11 @@ class ToBypassTrackers(_PluginBase):
'props': {
'type': 'info',
'variant': 'tonal',
'text': 'DNS 服务器示例: '
'1、94.140.14.140;'
'2、94.140.14.140:53;'
'3、[2a10:50c0::1:ff]:53;'
'4、https://unfiltered.adguard-dns.com/dns-query。'
'text': 'DNS 服务器示例 (仅填一个): '
'94.140.14.140」、'
'94.140.14.140:53」、'
'[2a10:50c0::1:ff]:53」、'
'https://unfiltered.adguard-dns.com/dns-query'
'仅支持UDP和HTTPS方法, 留空使用本地DNS查询。'
}
}
@@ -405,8 +427,9 @@ class ToBypassTrackers(_PluginBase):
'type': 'info',
'variant': 'tonal',
'text': '【订阅URL】'
'IPv4: {Moviepilot地址}/api/v1/plugin/ToBypassTrackers/bypassed_ips?apikey={apikey}&protocol=4; '
'IPv6: {Moviepilot地址}/api/v1/plugin/ToBypassTrackers/bypassed_ips?apikey={apikey}&protocol=6'
'IPv4 Api」: /api/v1/plugin/ToBypassTrackers/bypassed_ips?apikey=moviepilot&protocol=4; '
'IPv6 Api」: /api/v1/plugin/ToBypassTrackers/bypassed_ips?apikey=moviepilot&protocol=6; '
'其中moviepilot修改为实际配置中的API_TOKEN的值。'
}
}
]
@@ -428,8 +451,8 @@ class ToBypassTrackers(_PluginBase):
'type': 'info',
'variant': 'tonal',
'text': '【如何使用】'
'在OpenClash->插件设置->中国大陆IP路由 选择 绕过中国大陆; '
'在OpenClash->插件设置->Chnroute Update 填入订阅URL。'
'OpenClash->插件设置->中国大陆IP路由选择绕过中国大陆; '
'OpenClash->插件设置->Chnroute Update填入订阅URL'
}
}
]
@@ -445,6 +468,7 @@ class ToBypassTrackers(_PluginBase):
"cron": "0 4 * * *",
"bypassed_sites": [],
"custom_trackers": "",
"exempted_domains": "",
"dns_input": "",
"china_ip_route": True,
"china_ipv6_route": True,
@@ -511,23 +535,53 @@ class ToBypassTrackers(_PluginBase):
@eventmanager.register(EventType.PluginAction)
def update_ips(self):
def __is_ip_in_subnet(ip: str, subnet: str) -> bool:
def __is_ip_in_subnet(ip_input: str, su_bnet: str) -> bool:
"""
Check if the given IP address is in the specified subnet.
:param ip: IP address as a string (e.g., '192.168.1.1')
:param subnet: Subnet in CIDR notation (e.g., '192.168.1.0/24')
:param ip_input: IP address as a string (e.g., '192.168.1.1')
:param su_bnet: Subnet in CIDR notation (e.g., '192.168.1.0/24')
:return: True if IP is in the subnet, False otherwise
"""
ip_obj = ipaddress.ip_address(ip)
subnet_obj = ipaddress.ip_network(subnet, strict=False)
ip_obj = ipaddress.ip_address(ip_input)
subnet_obj = ipaddress.ip_network(su_bnet, strict=False)
return ip_obj in subnet_obj
def __search_ip(ip, ips_list):
i = 0
for ip_range in ips_list:
if __is_ip_in_subnet(ip, ip_range):
return i
i += 1
return -1
def __exclude_ip_range(range_b: str, range_a: str):
"""
Exclude IP range A from IP range B and return the remaining subranges.
:param range_b: The larger IP range in CIDR notation (must include range_a).
:param range_a: The smaller IP range to exclude in CIDR notation.
:return: List of remaining IP subranges in CIDR notation.
"""
net_b = ipaddress.ip_network(range_b, strict=False)
net_a = ipaddress.ip_network(range_a, strict=False)
if not (net_a.subnet_of(net_b)):
raise ValueError("Range A is not fully contained within Range B.")
remaining_ranges = list(net_b.address_exclude(net_a))
return [str(sub_net) for sub_net in remaining_ranges]
# replacing = data.get('replace')
chnroute6_lists_url = 'https://ispip.clang.cn/all_cn_ipv6.txt'
chnroute_lists_url = 'https://ispip.clang.cn/all_cn.txt'
ipv6_list = []
ip_list = []
domains = []
success_msg = []
failed_msg = []
results = {}
unsupported_msg = []
if self._china_ipv6_route:
# Load Chnroute6 Lists
res = RequestUtils().get_res(url=chnroute6_lists_url)
@@ -543,12 +597,17 @@ class ToBypassTrackers(_PluginBase):
for ipr in chnroute_lists:
ip_list.append(ipr)
do_sites = {site.domain: site.name for site in self.siteoper.list_order_by_pri() if site.id in self._bypassed_sites}
domain_name_map = {}
for site in do_sites:
site_domains = self.trackers.get(site)
results[do_sites[site]] = True
if site_domains:
domains.extend(site_domains)
for domain in site_domains:
domain_name_map[domain] = do_sites[site]
else:
logger.warn(f"不支持的站点: {do_sites[site]}({site})")
unsupported_msg.append(f'{do_sites[site]}】不支持的站点')
for custom_tracker in self._custom_trackers.split('\n'):
if custom_tracker:
try:
@@ -567,6 +626,8 @@ class ToBypassTrackers(_PluginBase):
ipv6_addresses = DnsHelper.query_domain(domain, self._dns_input, 'AAAA')
if ipv6_addresses is None:
logger.warn(f"{domain} AAAA 记录查询失败")
failed_msg.append(f"{domain_name_map.get(domain, domain)}{domain}: AAAA记录查询失败")
results[{domain_name_map.get(domain, domain)}] = False
continue
for address in ipv6_addresses:
has_flag = False
@@ -576,11 +637,13 @@ class ToBypassTrackers(_PluginBase):
break
if not has_flag:
ipv6_list.append(ipaddress.ip_network(f"{address}/128", strict=False).compressed)
logger.info(f"{address} ({domain}) 已被添加")
logger.info(f"{domain_name_map.get(domain, domain)}{address} ({domain}) 已被添加")
if self._bypass_ipv4:
ip_addresses = DnsHelper.query_domain(domain,self._dns_input, 'A')
ip_addresses = DnsHelper.query_domain(domain, self._dns_input, 'A')
if ip_addresses is None:
logger.warn(f"{domain} A 记录查询失败")
failed_msg.append(f"{domain_name_map.get(domain, '')}{domain}: A记录查询失败")
results[{domain_name_map.get(domain, domain)}] = False
continue
for address in ip_addresses:
has_flag = False
@@ -590,8 +653,58 @@ class ToBypassTrackers(_PluginBase):
break
if not has_flag:
ip_list.append(f"{address}/32")
logger.info(f"{address} ({domain}) 已被添加")
logger.info(f"{domain_name_map.get(domain, domain)}{address} ({domain}) 已被添加")
for result in results:
if results[result]:
success_msg.append(f"{result}】 Trackers已被添加")
exempted_ip = []
exempted_ipv6 = []
for exempted_domain in self._exempted_domains.split('\n'):
if exempted_domain:
try:
socket.inet_pton(socket.AF_INET, exempted_domain)
if self._bypass_ipv4:
exempted_ip.append(f"{exempted_domain}")
except socket.error:
try:
socket.inet_pton(socket.AF_INET6, exempted_domain)
if self._bypass_ipv6:
exempted_ipv6.append(f"{exempted_domain}")
except socket.error:
ipv6_addresses = DnsHelper.query_domain(exempted_domain, self._dns_input, 'AAAA')
if ipv6_addresses:
exempted_ipv6.extend(ipv6_addresses)
ipv4_addresses = DnsHelper.query_domain(exempted_domain, self._dns_input, 'A')
if ipv4_addresses:
exempted_ip.extend(ipv4_addresses)
for ip in exempted_ip:
index = __search_ip(ip, ip_list)
if index == -1:
continue
ip_larger = ip_list[index]
ip_list.pop(index)
length = int(ip_larger.split('/')[1])
if length < 12:
remaining_ip = __exclude_ip_range(ip_larger, f'{ip}/{length + 8}')
ip_list.extend(remaining_ip)
for ip in exempted_ipv6:
index = __search_ip(ip, ipv6_list)
if index == -1:
continue
ip_larger = ipv6_list[index]
ipv6_list.pop(index)
length = int(ip_larger.split('/')[1])
if length < 32:
remaining_ip = __exclude_ip_range(ip_larger, f'{ip}/{min(32, length + 8)}')
ipv6_list.extend(remaining_ip)
self.ipv4_txt = "\n".join(ip_list)
self.ipv6_txt = "\n".join(ipv6_list)
self.save_data("ipv4_txt", self.ipv4_txt)
self.save_data("ipv6_txt", self.ipv6_txt)
if self._notify:
res_message = success_msg + failed_msg
res_message = "\n".join(res_message)
self.post_message(title=f"【绕过Trackers】",
mtype=NotificationType.SiteMessage,
text=f"{res_message}"
)