diff --git a/plugins.v2/tobypasstrackers/__init__.py b/plugins.v2/tobypasstrackers/__init__.py index ac7297f..6ec5be2 100644 --- a/plugins.v2/tobypasstrackers/__init__.py +++ b/plugins.v2/tobypasstrackers/__init__.py @@ -57,6 +57,7 @@ class ToBypassTrackers(_PluginBase): _notify = False _onlyonce: bool = False _custom_trackers: str = "" + _exempted_domains: str = "" _bypassed_sites: list = [] _china_ip_route: bool = True _china_ipv6_route: bool = True @@ -87,6 +88,7 @@ class ToBypassTrackers(_PluginBase): self._onlyonce = config.get("onlyonce") self._notify = config.get("notify") self._custom_trackers = config.get("custom_trackers") + self._exempted_domains = config.get("exempted_domains") self._bypassed_sites = config.get("bypassed_sites") or [] self._bypass_ipv4 = config.get("bypass_ipv4") self._bypass_ipv6 = config.get("bypass_ipv6") @@ -107,7 +109,7 @@ class ToBypassTrackers(_PluginBase): ) self._onlyonce = False self.__update_config() - self._scheduler.print_jobs() + # self._scheduler.print_jobs() self._scheduler.start() def get_state(self) -> bool: @@ -122,6 +124,7 @@ class ToBypassTrackers(_PluginBase): "onlyonce": self._onlyonce, "bypassed_sites": self._bypassed_sites, "custom_trackers": self._custom_trackers, + "exempted_domains": self._exempted_domains, "notify": self._notify, "dns_input": self._dns_input, "china_ip_route": self._china_ip_route, @@ -348,7 +351,8 @@ class ToBypassTrackers(_PluginBase): { 'component': 'VCol', 'props': { - 'cols': 12 + 'cols': 12, + 'md': 6 }, 'content': [ { @@ -361,6 +365,24 @@ class ToBypassTrackers(_PluginBase): } } ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 6 + }, + 'content': [ + { + 'component': 'VTextarea', + 'props': { + 'model': 'exempted_domains', + 'label': '排除的域名和IP', + 'rows': 3, + 'placeholder': '每行一个域名或IP' + } + } + ] } ] }, @@ -378,11 +400,11 @@ class ToBypassTrackers(_PluginBase): 'props': { 'type': 'info', 'variant': 'tonal', - 'text': 'DNS 服务器示例: ' - '1、94.140.14.140;' - '2、94.140.14.140:53;' - '3、[2a10:50c0::1:ff]:53;' - '4、https://unfiltered.adguard-dns.com/dns-query。' + 'text': 'DNS 服务器示例 (仅填一个): ' + '「94.140.14.140」、' + '「94.140.14.140:53」、' + '「[2a10:50c0::1:ff]:53」、' + '「https://unfiltered.adguard-dns.com/dns-query」。' '仅支持UDP和HTTPS方法, 留空使用本地DNS查询。' } } @@ -405,8 +427,9 @@ class ToBypassTrackers(_PluginBase): 'type': 'info', 'variant': 'tonal', 'text': '【订阅URL】' - 'IPv4: {Moviepilot地址}/api/v1/plugin/ToBypassTrackers/bypassed_ips?apikey={apikey}&protocol=4; ' - 'IPv6: {Moviepilot地址}/api/v1/plugin/ToBypassTrackers/bypassed_ips?apikey={apikey}&protocol=6。' + '「IPv4 Api」: /api/v1/plugin/ToBypassTrackers/bypassed_ips?apikey=moviepilot&protocol=4; ' + '「IPv6 Api」: /api/v1/plugin/ToBypassTrackers/bypassed_ips?apikey=moviepilot&protocol=6; ' + '其中moviepilot修改为实际配置中的API_TOKEN的值。' } } ] @@ -428,8 +451,8 @@ class ToBypassTrackers(_PluginBase): 'type': 'info', 'variant': 'tonal', 'text': '【如何使用】' - '在OpenClash->插件设置->中国大陆IP路由 选择 绕过中国大陆; ' - '在OpenClash->插件设置->Chnroute Update 填入订阅URL。' + '在「OpenClash->插件设置->中国大陆IP路由」选择「绕过中国大陆」; ' + '在「OpenClash->插件设置->Chnroute Update」填入「订阅URL」。' } } ] @@ -445,6 +468,7 @@ class ToBypassTrackers(_PluginBase): "cron": "0 4 * * *", "bypassed_sites": [], "custom_trackers": "", + "exempted_domains": "", "dns_input": "", "china_ip_route": True, "china_ipv6_route": True, @@ -511,23 +535,53 @@ class ToBypassTrackers(_PluginBase): @eventmanager.register(EventType.PluginAction) def update_ips(self): - def __is_ip_in_subnet(ip: str, subnet: str) -> bool: + def __is_ip_in_subnet(ip_input: str, su_bnet: str) -> bool: """ Check if the given IP address is in the specified subnet. - :param ip: IP address as a string (e.g., '192.168.1.1') - :param subnet: Subnet in CIDR notation (e.g., '192.168.1.0/24') + :param ip_input: IP address as a string (e.g., '192.168.1.1') + :param su_bnet: Subnet in CIDR notation (e.g., '192.168.1.0/24') :return: True if IP is in the subnet, False otherwise """ - ip_obj = ipaddress.ip_address(ip) - subnet_obj = ipaddress.ip_network(subnet, strict=False) + ip_obj = ipaddress.ip_address(ip_input) + subnet_obj = ipaddress.ip_network(su_bnet, strict=False) return ip_obj in subnet_obj + + def __search_ip(ip, ips_list): + i = 0 + for ip_range in ips_list: + if __is_ip_in_subnet(ip, ip_range): + return i + i += 1 + return -1 + + def __exclude_ip_range(range_b: str, range_a: str): + """ + Exclude IP range A from IP range B and return the remaining subranges. + + :param range_b: The larger IP range in CIDR notation (must include range_a). + :param range_a: The smaller IP range to exclude in CIDR notation. + :return: List of remaining IP subranges in CIDR notation. + """ + net_b = ipaddress.ip_network(range_b, strict=False) + net_a = ipaddress.ip_network(range_a, strict=False) + + if not (net_a.subnet_of(net_b)): + raise ValueError("Range A is not fully contained within Range B.") + + remaining_ranges = list(net_b.address_exclude(net_a)) + + return [str(sub_net) for sub_net in remaining_ranges] # replacing = data.get('replace') chnroute6_lists_url = 'https://ispip.clang.cn/all_cn_ipv6.txt' chnroute_lists_url = 'https://ispip.clang.cn/all_cn.txt' ipv6_list = [] ip_list = [] domains = [] + success_msg = [] + failed_msg = [] + results = {} + unsupported_msg = [] if self._china_ipv6_route: # Load Chnroute6 Lists res = RequestUtils().get_res(url=chnroute6_lists_url) @@ -543,12 +597,17 @@ class ToBypassTrackers(_PluginBase): for ipr in chnroute_lists: ip_list.append(ipr) do_sites = {site.domain: site.name for site in self.siteoper.list_order_by_pri() if site.id in self._bypassed_sites} + domain_name_map = {} for site in do_sites: site_domains = self.trackers.get(site) + results[do_sites[site]] = True if site_domains: domains.extend(site_domains) + for domain in site_domains: + domain_name_map[domain] = do_sites[site] else: logger.warn(f"不支持的站点: {do_sites[site]}({site})") + unsupported_msg.append(f'【{do_sites[site]}】不支持的站点') for custom_tracker in self._custom_trackers.split('\n'): if custom_tracker: try: @@ -567,6 +626,8 @@ class ToBypassTrackers(_PluginBase): ipv6_addresses = DnsHelper.query_domain(domain, self._dns_input, 'AAAA') if ipv6_addresses is None: logger.warn(f"{domain} AAAA 记录查询失败") + failed_msg.append(f"【{domain_name_map.get(domain, domain)}】 {domain}: AAAA记录查询失败") + results[{domain_name_map.get(domain, domain)}] = False continue for address in ipv6_addresses: has_flag = False @@ -576,11 +637,13 @@ class ToBypassTrackers(_PluginBase): break if not has_flag: ipv6_list.append(ipaddress.ip_network(f"{address}/128", strict=False).compressed) - logger.info(f"{address} ({domain}) 已被添加") + logger.info(f"【{domain_name_map.get(domain, domain)}】{address} ({domain}) 已被添加") if self._bypass_ipv4: - ip_addresses = DnsHelper.query_domain(domain,self._dns_input, 'A') + ip_addresses = DnsHelper.query_domain(domain, self._dns_input, 'A') if ip_addresses is None: logger.warn(f"{domain} A 记录查询失败") + failed_msg.append(f"【{domain_name_map.get(domain, '')}】 {domain}: A记录查询失败") + results[{domain_name_map.get(domain, domain)}] = False continue for address in ip_addresses: has_flag = False @@ -590,8 +653,58 @@ class ToBypassTrackers(_PluginBase): break if not has_flag: ip_list.append(f"{address}/32") - logger.info(f"{address} ({domain}) 已被添加") + logger.info(f"【{domain_name_map.get(domain, domain)}】{address} ({domain}) 已被添加") + for result in results: + if results[result]: + success_msg.append(f"【{result}】 Trackers已被添加") + exempted_ip = [] + exempted_ipv6 = [] + for exempted_domain in self._exempted_domains.split('\n'): + if exempted_domain: + try: + socket.inet_pton(socket.AF_INET, exempted_domain) + if self._bypass_ipv4: + exempted_ip.append(f"{exempted_domain}") + except socket.error: + try: + socket.inet_pton(socket.AF_INET6, exempted_domain) + if self._bypass_ipv6: + exempted_ipv6.append(f"{exempted_domain}") + except socket.error: + ipv6_addresses = DnsHelper.query_domain(exempted_domain, self._dns_input, 'AAAA') + if ipv6_addresses: + exempted_ipv6.extend(ipv6_addresses) + ipv4_addresses = DnsHelper.query_domain(exempted_domain, self._dns_input, 'A') + if ipv4_addresses: + exempted_ip.extend(ipv4_addresses) + for ip in exempted_ip: + index = __search_ip(ip, ip_list) + if index == -1: + continue + ip_larger = ip_list[index] + ip_list.pop(index) + length = int(ip_larger.split('/')[1]) + if length < 12: + remaining_ip = __exclude_ip_range(ip_larger, f'{ip}/{length + 8}') + ip_list.extend(remaining_ip) + for ip in exempted_ipv6: + index = __search_ip(ip, ipv6_list) + if index == -1: + continue + ip_larger = ipv6_list[index] + ipv6_list.pop(index) + length = int(ip_larger.split('/')[1]) + if length < 32: + remaining_ip = __exclude_ip_range(ip_larger, f'{ip}/{min(32, length + 8)}') + ipv6_list.extend(remaining_ip) self.ipv4_txt = "\n".join(ip_list) self.ipv6_txt = "\n".join(ipv6_list) self.save_data("ipv4_txt", self.ipv4_txt) self.save_data("ipv6_txt", self.ipv6_txt) + if self._notify: + res_message = success_msg + failed_msg + res_message = "\n".join(res_message) + self.post_message(title=f"【绕过Trackers】", + mtype=NotificationType.SiteMessage, + text=f"{res_message}" + )