From 4e44a66df3af8436624ef32e161420c643b27f9d Mon Sep 17 00:00:00 2001 From: wumode Date: Sat, 21 Jun 2025 14:35:57 +0800 Subject: [PATCH] =?UTF-8?q?update(ClashRuleProvider):=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=20V2ray=20=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.v2.json | 3 +- plugins.v2/clashruleprovider/__init__.py | 13 +- .../clashruleprovider/clash_rule_parser.py | 505 ++++++++++++++++++ 3 files changed, 516 insertions(+), 5 deletions(-) diff --git a/package.v2.json b/package.v2.json index 86495dd..4c52b82 100644 --- a/package.v2.json +++ b/package.v2.json @@ -450,11 +450,12 @@ "name": "Clash Rule Provider", "description": "随时为Clash添加一些额外的规则。", "labels": "工具", - "version": "1.1.0", + "version": "1.1.1", "icon": "Mihomo_Meta_A.png", "author": "wumode", "level": 1, "history": { + "v1.1.1": "支持解析 V2ray 订阅", "v1.1.0": "支持规则集合; 添加ACL4SSR规则集; 配置说明", "v1.0.1": "支持规则搜索, 优化细节", "v1.0.0": "支持: 规则分页; 导入规则; 代理组; 附加出站代理; 按区域分组", diff --git a/plugins.v2/clashruleprovider/__init__.py b/plugins.v2/clashruleprovider/__init__.py index fc5390c..1065bfd 100644 --- a/plugins.v2/clashruleprovider/__init__.py +++ b/plugins.v2/clashruleprovider/__init__.py @@ -19,7 +19,7 @@ from app.log import logger from app.plugins import _PluginBase from app.schemas.types import NotificationType from app.utils.http import RequestUtils -from app.plugins.clashruleprovider.clash_rule_parser import ClashRuleParser +from app.plugins.clashruleprovider.clash_rule_parser import ClashRuleParser, Converter from app.plugins.clashruleprovider.clash_rule_parser import Action, RuleType, ClashRule, MatchRule, LogicRule from app.plugins.clashruleprovider.clash_rule_parser import ProxyGroup, RuleProvider @@ -32,7 +32,7 @@ class ClashRuleProvider(_PluginBase): # 插件图标 plugin_icon = "Mihomo_Meta_A.png" # 插件版本 - plugin_version = "1.1.0" + plugin_version = "1.1.1" # 插件作者 plugin_author = "wumode" # 作者主页 @@ -852,7 +852,12 @@ class ClashRuleProvider(_PluginBase): return False try: rs = yaml.load(ret.content, Loader=yaml.FullLoader) - logger.debug(f"{type(rs)} => {rs}") + if type(rs) is str: + all_proxies = {'name': "All Proxies", 'type': 'select', 'include-all-proxies': True} + proxies = Converter.convert_v2ray(ret.content) + if not proxies: + raise ValueError(f"Unknown content: {rs}") + rs = {'proxies': proxies, 'proxy-groups': [all_proxies, ]} if rs.get('rules') is None: rs['rules'] = [] if self._discard_rules: @@ -950,7 +955,7 @@ class ClashRuleProvider(_PluginBase): logger.warn(f"关键词过滤后无可用节点,跳过过滤") removed_proxies = [] for proxy_group in clash_config.get("proxy-groups", []): - proxy_group['proxies'] = [x for x in proxy_group.get('proxies') if x not in removed_proxies] + proxy_group['proxies'] = [x for x in proxy_group.get('proxies', []) if x not in removed_proxies] return clash_config def clash_config(self) -> Optional[Dict[str, Any]]: diff --git a/plugins.v2/clashruleprovider/clash_rule_parser.py b/plugins.v2/clashruleprovider/clash_rule_parser.py index 579f883..b0ef22a 100644 --- a/plugins.v2/clashruleprovider/clash_rule_parser.py +++ b/plugins.v2/clashruleprovider/clash_rule_parser.py @@ -2,6 +2,10 @@ import re from typing import List, Dict, Any, Optional, Union, Callable, Literal from dataclasses import dataclass from enum import Enum +from urllib.parse import urlparse, parse_qs, unquote, parse_qsl +import json +import base64 +import binascii from pydantic import BaseModel, Field, validator, HttpUrl @@ -635,3 +639,504 @@ class ClashRuleParser: for i in range(target_index, moved_index): self.rules[i].priority += 1 self.rules.sort(key=lambda x: x.priority) + + +class Converter: + """ + Converter for V2Ray Subscription + + Reference: + https://github.com/MetaCubeX/mihomo/blob/Alpha/common/convert/converter.go + https://github.com/SubConv/SubConv/blob/main/modules/convert/converter.py + """ + @staticmethod + def decode_base64(data): + # 添加适配不同 padding 的容错机制 + data = data.strip() + missing_padding = len(data) % 4 + if missing_padding: + data += '=' * (4 - missing_padding) + return base64.b64decode(data) + + @staticmethod + def try_decode_base64_json(data): + try: + return json.loads(Converter.decode_base64(data).decode('utf-8')) + except (binascii.Error, UnicodeDecodeError, json.JSONDecodeError, TypeError): + return None + + @staticmethod + def unique_name(name_map, name): + index = name_map.get(name, 0) + name_map[name] = index + 1 + if index > 0: + return f"{name}-{index:02d}" + return name + + @staticmethod + def strtobool(val): + val = val.lower() + if val in ("y", "yes", "t", "true", "on", "1"): + return True + elif val in ("n", "no", "f", "false", "off", "0"): + return False + else: + raise ValueError(f"invalid truth value {val!r}") + + @staticmethod + def convert_v2ray(buf: bytes): + decoded = Converter.decode_base64(buf).decode("utf-8") + lines = decoded.strip().splitlines() + proxies = [] + names = {} + + for line in lines: + line = line.strip() + if not line: + continue + + if "://" not in line: + continue + + scheme, body = line.split("://", 1) + scheme = scheme.lower() + + if scheme == "vmess": + vmess_data = Converter.try_decode_base64_json(body) + if not vmess_data: + continue + + name = Converter.unique_name(names, vmess_data.get("ps", "vmess")) + net = str(vmess_data.get("net", "")).lower() + fake_type = str(vmess_data.get("type", "")).lower() + tls_mode = str(vmess_data.get("tls", "")).lower() + cipher = vmess_data.get("scy", "auto") or "auto" + alter_id = vmess_data.get("aid", 0) + + # 智能调整 network 类型 + if fake_type == "http": + net = "http" + elif net == "http": + net = "h2" + + proxy = { + "name": name, + "type": "vmess", + "server": vmess_data.get("add"), + "port": vmess_data.get("port"), + "uuid": vmess_data.get("id"), + "alterId": alter_id, + "cipher": cipher, + "tls": tls_mode.endswith("tls") or tls_mode == "reality", + "udp": True, + "xudp": True, + "skip-cert-verify": False, + "network": net + } + + # === TLS、Reality 扩展 === + if proxy["tls"]: + proxy["client-fingerprint"] = vmess_data.get("fp", "chrome") or "chrome" + alpn = vmess_data.get("alpn") + if alpn: + proxy["alpn"] = alpn.split(",") if isinstance(alpn, str) else alpn + sni = vmess_data.get("sni") + if sni: + proxy["servername"] = sni + + if tls_mode == "reality": + proxy["reality-opts"] = { + "public-key": vmess_data.get("pbk", ""), + "short-id": vmess_data.get("sid", "") + } + + path = vmess_data.get("path", "/") + host = vmess_data.get("host") + + # === 不同 network 的扩展字段处理 === + if net == "tcp": + if fake_type == "http": + proxy["http-opts"] = { + "path": path, + "headers": {"Host": host} if host else {} + } + elif net == "http": + proxy["network"] = "http" + proxy["http-opts"] = { + "path": path, + "headers": {"Host": host} if host else {} + } + elif net == "h2": + proxy["h2-opts"] = { + "path": path, + "host": [host] if host else [] + } + + elif net == "ws": + ws_headers = {"Host": host} if host else {} + ws_headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" # 可选伪装 + ws_opts = { + "path": path, + "headers": ws_headers + } + # 补充 early-data 配置 + early_data = vmess_data.get("ed") + if early_data: + try: + ws_opts["max-early-data"] = int(early_data) + except ValueError: + pass + early_data_header = vmess_data.get("edh") + if early_data_header: + ws_opts["early-data-header-name"] = early_data_header + proxy["ws-opts"] = ws_opts + + elif net == "grpc": + proxy["grpc-opts"] = { + "grpc-service-name": path + } + proxies.append(proxy) + elif scheme == "vless": + try: + parsed = urlparse(line) + query = parse_qs(parsed.query) + + uuid = parsed.username or "" + server = parsed.hostname or "" + port = parsed.port or 443 + tls_mode = query.get("security", [""])[0].lower() + tls = tls_mode == "tls" or tls_mode == "reality" + sni = query.get("sni", [""])[0] + flow = query.get("flow", [""])[0] + network = query.get("type", [""])[0] + path = query.get("path", [""])[0] + host = query.get("host", [""])[0] + + name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) + + proxy = { + "name": name, + "type": "vless", + "server": server, + "port": str(port), + "uuid": uuid, + "tls": tls, + "udp": True + } + + if sni: + proxy["sni"] = sni + if flow: + proxy["flow"] = flow + + if network: + proxy["network"] = network + if network == "ws": + proxy["ws-opts"] = { + "path": path, + "headers": {"Host": host} + } + elif network == "grpc": + proxy["grpc-opts"] = { + "grpc-service-name": path + } + + if tls_mode == "reality": + proxy["reality-opts"] = { + "public-key": query.get("pbk", [""])[0], + "short-id": query.get("sid", [""])[0] + } + proxy["client-fingerprint"] = query.get("fp", ["chrome"])[0] + alpn = query.get("alpn", [""])[0] + if alpn: + proxy["alpn"] = alpn.split(",") + + return proxy + + except Exception as e: + print(f"VLESS parse error: {e}") + return None + elif scheme == "trojan": + try: + parsed = urlparse(line) + query = dict(parse_qsl(parsed.query)) + + name = Converter.unique_name(names, unquote(parsed.fragment or f"{parsed.hostname}:{parsed.port}")) + + trojan = { + "name": name, + "type": "trojan", + "server": parsed.hostname, + "port": parsed.port or 443, + "password": parsed.username or "", + "udp": True, + } + + # skip-cert-verify + try: + trojan["skip-cert-verify"] = Converter.strtobool(query.get("allowInsecure", "0")) + except ValueError: + trojan["skip-cert-verify"] = False + + # optional fields + if "sni" in query: + trojan["sni"] = query["sni"] + + alpn = query.get("alpn", "") + if alpn: + trojan["alpn"] = alpn.split(",") + + network = query.get("type", "").lower() + if network: + trojan["network"] = network + + if network == "ws": + headers = {"User-Agent": "clash"} # 或 RandUserAgent() + trojan["ws-opts"] = { + "path": query.get("path", "/"), + "headers": headers + } + + elif network == "grpc": + trojan["grpc-opts"] = { + "grpc-service-name": query.get("serviceName", "") + } + + fp = query.get("fp", "") + trojan["client-fingerprint"] = fp if fp else "chrome" + + proxies.append(trojan) + + except Exception as e: + print(f"Error parsing trojan:// link: {e}") + elif scheme == "hysteria": + try: + parsed = urlparse(line) + query = dict(parse_qsl(parsed.query)) + + name = Converter.unique_name(names, unquote(parsed.fragment or f"{parsed.hostname}:{parsed.port}")) + hysteria = { + "name": name, + "type": "hysteria", + "server": parsed.hostname, + "port": parsed.port, + "auth_str": parsed.username or query.get("auth", ""), + "obfs": query.get("obfs", ""), + "sni": query.get("peer", ""), + "protocol": query.get("protocol", "") + } + + up = query.get("up", "") + down = query.get("down", "") + if not up: + up = query.get("upmbps", "") + if not down: + down = query.get("downmbps", "") + hysteria["up"] = up + hysteria["down"] = down + + # alpn split + alpn = query.get("alpn", "") + if alpn: + hysteria["alpn"] = alpn.split(",") + + # skip-cert-verify + try: + hysteria["skip-cert-verify"] = Converter.strtobool(query.get("insecure", "false")) + except ValueError: + hysteria["skip-cert-verify"] = False + + proxies.append(hysteria) + except Exception as e: + print(f"Hysteria parse error: {e}") + elif scheme in ("socks", "socks5", "socks5h"): + try: + parsed = urlparse(line) + server = parsed.hostname + port = parsed.port + username = parsed.username or "" + password = parsed.password or "" + name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) + + proxy = { + "name": name, + "type": "socks5", + "server": server, + "port": str(port), + "username": username, + "password": password, + "udp": True + } + proxies.append(proxy) + except Exception as e: + print(f"SOCKS5 parse error: {e}") + elif scheme == "ss": + try: + # 兼容 ss://base64 或 ss://base64#name + if "#" in body: + body, fragment = body.split("#", 1) + name = Converter.unique_name(names, unquote(fragment)) + else: + name = Converter.unique_name(names, "ss") + + if "@" in body: + userinfo, server = body.split("@", 1) + else: + decoded = Converter.decode_base64(body).decode() + userinfo, server = decoded.rsplit("@", 1) + + cipher, password = userinfo.split(":") + server_host, server_port = server.split(":") + + proxy = { + "name": name, + "type": "ss", + "server": server_host, + "port": server_port, + "cipher": cipher, + "password": password, + "udp": True + } + proxies.append(proxy) + except Exception as e: + print(f"SS parse error: {e}") + elif scheme == "ssr": + try: + decoded = Converter.decode_base64(body).decode() + parts, _, params_str = decoded.partition("/?") + host, port, protocol, method, obfs, password_enc = parts.split(":") + + password = Converter.decode_base64(password_enc).decode() + params = parse_qs(params_str) + + remarks = Converter.decode_base64(params.get("remarks", [""])[0]).decode() + obfsparam = Converter.decode_base64(params.get("obfsparam", [""])[0]).decode() + protoparam = Converter.decode_base64(params.get("protoparam", [""])[0]).decode() + + name = Converter.unique_name(names, remarks or f"{host}:{port}") + + proxy = { + "name": name, + "type": "ssr", + "server": host, + "port": port, + "cipher": method, + "password": password, + "obfs": obfs, + "protocol": protocol, + "udp": True + } + + if obfsparam: + proxy["obfs-param"] = obfsparam + if protoparam: + proxy["protocol-param"] = protoparam + + proxies.append(proxy) + except Exception as e: + print(f"SSR parse error: {e}") + elif scheme == "tuic": + try: + parsed = urlparse(line) + query = parse_qs(parsed.query) + + user = parsed.username or "" + password = parsed.password or "" + server = parsed.hostname + port = parsed.port or 443 + + name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) + proxy = { + "name": name, + "type": "tuic", + "server": server, + "port": str(port), + "udp": True + } + + if password: + proxy["uuid"] = user + proxy["password"] = password + else: + proxy["token"] = user + + if "congestion_control" in query: + proxy["congestion-controller"] = query["congestion_control"][0] + if "alpn" in query: + proxy["alpn"] = query["alpn"][0].split(",") + if "sni" in query: + proxy["sni"] = query["sni"][0] + if query.get("disable_sni", ["0"])[0] == "1": + proxy["disable-sni"] = True + if "udp_relay_mode" in query: + proxy["udp-relay-mode"] = query["udp_relay_mode"][0] + + proxies.append(proxy) + except Exception as e: + print(f"TUIC parse error: {e}") + elif scheme == "anytls": + try: + parsed = urlparse(line) + query = parse_qs(parsed.query) + + username = parsed.username or "" + password = parsed.password or username + server = parsed.hostname + port = parsed.port + insecure = query.get("insecure", ["0"])[0] == "1" + sni = query.get("sni", [""])[0] + fingerprint = query.get("hpkp", [""])[0] + + name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) + proxy = { + "name": name, + "type": "anytls", + "server": server, + "port": str(port), + "username": username, + "password": password, + "sni": sni, + "fingerprint": fingerprint, + "skip-cert-verify": insecure, + "udp": True + } + + proxies.append(proxy) + except Exception as e: + print(f"AnyTLS parse error: {e}") + elif scheme in ("hysteria2", "hy2"): + try: + parsed = urlparse(line) + query = parse_qs(parsed.query) + + password = parsed.username or "" + server = parsed.hostname + port = parsed.port or 443 + + name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) + proxy = { + "name": name, + "type": "hysteria2", + "server": server, + "port": str(port), + "password": password, + "obfs": query.get("obfs", [""])[0], + "obfs-password": query.get("obfs-password", [""])[0], + "sni": query.get("sni", [""])[0], + "skip-cert-verify": query.get("insecure", ["false"])[0] == "true", + "down": query.get("down", [""])[0], + "up": query.get("up", [""])[0], + "fingerprint": query.get("pinSHA256", [""])[0] + } + + if "alpn" in query: + proxy["alpn"] = query["alpn"][0].split(",") + + proxies.append(proxy) + except Exception as e: + print(f"Hysteria2 parse error: {e}") + + if not proxies: + raise ValueError("convert v2ray subscribe error: format invalid") + + return proxies \ No newline at end of file