mirror of
https://github.com/d0zingcat/MoviePilot-Plugins.git
synced 2026-05-13 23:16:47 +00:00
update(ClashRuleProvider): 支持解析 V2ray 订阅
This commit is contained in:
@@ -19,7 +19,7 @@ from app.log import logger
|
||||
from app.plugins import _PluginBase
|
||||
from app.schemas.types import NotificationType
|
||||
from app.utils.http import RequestUtils
|
||||
from app.plugins.clashruleprovider.clash_rule_parser import ClashRuleParser
|
||||
from app.plugins.clashruleprovider.clash_rule_parser import ClashRuleParser, Converter
|
||||
from app.plugins.clashruleprovider.clash_rule_parser import Action, RuleType, ClashRule, MatchRule, LogicRule
|
||||
from app.plugins.clashruleprovider.clash_rule_parser import ProxyGroup, RuleProvider
|
||||
|
||||
@@ -32,7 +32,7 @@ class ClashRuleProvider(_PluginBase):
|
||||
# 插件图标
|
||||
plugin_icon = "Mihomo_Meta_A.png"
|
||||
# 插件版本
|
||||
plugin_version = "1.1.0"
|
||||
plugin_version = "1.1.1"
|
||||
# 插件作者
|
||||
plugin_author = "wumode"
|
||||
# 作者主页
|
||||
@@ -852,7 +852,12 @@ class ClashRuleProvider(_PluginBase):
|
||||
return False
|
||||
try:
|
||||
rs = yaml.load(ret.content, Loader=yaml.FullLoader)
|
||||
logger.debug(f"{type(rs)} => {rs}")
|
||||
if type(rs) is str:
|
||||
all_proxies = {'name': "All Proxies", 'type': 'select', 'include-all-proxies': True}
|
||||
proxies = Converter.convert_v2ray(ret.content)
|
||||
if not proxies:
|
||||
raise ValueError(f"Unknown content: {rs}")
|
||||
rs = {'proxies': proxies, 'proxy-groups': [all_proxies, ]}
|
||||
if rs.get('rules') is None:
|
||||
rs['rules'] = []
|
||||
if self._discard_rules:
|
||||
@@ -950,7 +955,7 @@ class ClashRuleProvider(_PluginBase):
|
||||
logger.warn(f"关键词过滤后无可用节点,跳过过滤")
|
||||
removed_proxies = []
|
||||
for proxy_group in clash_config.get("proxy-groups", []):
|
||||
proxy_group['proxies'] = [x for x in proxy_group.get('proxies') if x not in removed_proxies]
|
||||
proxy_group['proxies'] = [x for x in proxy_group.get('proxies', []) if x not in removed_proxies]
|
||||
return clash_config
|
||||
|
||||
def clash_config(self) -> Optional[Dict[str, Any]]:
|
||||
|
||||
@@ -2,6 +2,10 @@ import re
|
||||
from typing import List, Dict, Any, Optional, Union, Callable, Literal
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from urllib.parse import urlparse, parse_qs, unquote, parse_qsl
|
||||
import json
|
||||
import base64
|
||||
import binascii
|
||||
|
||||
from pydantic import BaseModel, Field, validator, HttpUrl
|
||||
|
||||
@@ -635,3 +639,504 @@ class ClashRuleParser:
|
||||
for i in range(target_index, moved_index):
|
||||
self.rules[i].priority += 1
|
||||
self.rules.sort(key=lambda x: x.priority)
|
||||
|
||||
|
||||
class Converter:
|
||||
"""
|
||||
Converter for V2Ray Subscription
|
||||
|
||||
Reference:
|
||||
https://github.com/MetaCubeX/mihomo/blob/Alpha/common/convert/converter.go
|
||||
https://github.com/SubConv/SubConv/blob/main/modules/convert/converter.py
|
||||
"""
|
||||
@staticmethod
|
||||
def decode_base64(data):
|
||||
# 添加适配不同 padding 的容错机制
|
||||
data = data.strip()
|
||||
missing_padding = len(data) % 4
|
||||
if missing_padding:
|
||||
data += '=' * (4 - missing_padding)
|
||||
return base64.b64decode(data)
|
||||
|
||||
@staticmethod
|
||||
def try_decode_base64_json(data):
|
||||
try:
|
||||
return json.loads(Converter.decode_base64(data).decode('utf-8'))
|
||||
except (binascii.Error, UnicodeDecodeError, json.JSONDecodeError, TypeError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def unique_name(name_map, name):
|
||||
index = name_map.get(name, 0)
|
||||
name_map[name] = index + 1
|
||||
if index > 0:
|
||||
return f"{name}-{index:02d}"
|
||||
return name
|
||||
|
||||
@staticmethod
|
||||
def strtobool(val):
|
||||
val = val.lower()
|
||||
if val in ("y", "yes", "t", "true", "on", "1"):
|
||||
return True
|
||||
elif val in ("n", "no", "f", "false", "off", "0"):
|
||||
return False
|
||||
else:
|
||||
raise ValueError(f"invalid truth value {val!r}")
|
||||
|
||||
@staticmethod
|
||||
def convert_v2ray(buf: bytes):
|
||||
decoded = Converter.decode_base64(buf).decode("utf-8")
|
||||
lines = decoded.strip().splitlines()
|
||||
proxies = []
|
||||
names = {}
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if "://" not in line:
|
||||
continue
|
||||
|
||||
scheme, body = line.split("://", 1)
|
||||
scheme = scheme.lower()
|
||||
|
||||
if scheme == "vmess":
|
||||
vmess_data = Converter.try_decode_base64_json(body)
|
||||
if not vmess_data:
|
||||
continue
|
||||
|
||||
name = Converter.unique_name(names, vmess_data.get("ps", "vmess"))
|
||||
net = str(vmess_data.get("net", "")).lower()
|
||||
fake_type = str(vmess_data.get("type", "")).lower()
|
||||
tls_mode = str(vmess_data.get("tls", "")).lower()
|
||||
cipher = vmess_data.get("scy", "auto") or "auto"
|
||||
alter_id = vmess_data.get("aid", 0)
|
||||
|
||||
# 智能调整 network 类型
|
||||
if fake_type == "http":
|
||||
net = "http"
|
||||
elif net == "http":
|
||||
net = "h2"
|
||||
|
||||
proxy = {
|
||||
"name": name,
|
||||
"type": "vmess",
|
||||
"server": vmess_data.get("add"),
|
||||
"port": vmess_data.get("port"),
|
||||
"uuid": vmess_data.get("id"),
|
||||
"alterId": alter_id,
|
||||
"cipher": cipher,
|
||||
"tls": tls_mode.endswith("tls") or tls_mode == "reality",
|
||||
"udp": True,
|
||||
"xudp": True,
|
||||
"skip-cert-verify": False,
|
||||
"network": net
|
||||
}
|
||||
|
||||
# === TLS、Reality 扩展 ===
|
||||
if proxy["tls"]:
|
||||
proxy["client-fingerprint"] = vmess_data.get("fp", "chrome") or "chrome"
|
||||
alpn = vmess_data.get("alpn")
|
||||
if alpn:
|
||||
proxy["alpn"] = alpn.split(",") if isinstance(alpn, str) else alpn
|
||||
sni = vmess_data.get("sni")
|
||||
if sni:
|
||||
proxy["servername"] = sni
|
||||
|
||||
if tls_mode == "reality":
|
||||
proxy["reality-opts"] = {
|
||||
"public-key": vmess_data.get("pbk", ""),
|
||||
"short-id": vmess_data.get("sid", "")
|
||||
}
|
||||
|
||||
path = vmess_data.get("path", "/")
|
||||
host = vmess_data.get("host")
|
||||
|
||||
# === 不同 network 的扩展字段处理 ===
|
||||
if net == "tcp":
|
||||
if fake_type == "http":
|
||||
proxy["http-opts"] = {
|
||||
"path": path,
|
||||
"headers": {"Host": host} if host else {}
|
||||
}
|
||||
elif net == "http":
|
||||
proxy["network"] = "http"
|
||||
proxy["http-opts"] = {
|
||||
"path": path,
|
||||
"headers": {"Host": host} if host else {}
|
||||
}
|
||||
elif net == "h2":
|
||||
proxy["h2-opts"] = {
|
||||
"path": path,
|
||||
"host": [host] if host else []
|
||||
}
|
||||
|
||||
elif net == "ws":
|
||||
ws_headers = {"Host": host} if host else {}
|
||||
ws_headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" # 可选伪装
|
||||
ws_opts = {
|
||||
"path": path,
|
||||
"headers": ws_headers
|
||||
}
|
||||
# 补充 early-data 配置
|
||||
early_data = vmess_data.get("ed")
|
||||
if early_data:
|
||||
try:
|
||||
ws_opts["max-early-data"] = int(early_data)
|
||||
except ValueError:
|
||||
pass
|
||||
early_data_header = vmess_data.get("edh")
|
||||
if early_data_header:
|
||||
ws_opts["early-data-header-name"] = early_data_header
|
||||
proxy["ws-opts"] = ws_opts
|
||||
|
||||
elif net == "grpc":
|
||||
proxy["grpc-opts"] = {
|
||||
"grpc-service-name": path
|
||||
}
|
||||
proxies.append(proxy)
|
||||
elif scheme == "vless":
|
||||
try:
|
||||
parsed = urlparse(line)
|
||||
query = parse_qs(parsed.query)
|
||||
|
||||
uuid = parsed.username or ""
|
||||
server = parsed.hostname or ""
|
||||
port = parsed.port or 443
|
||||
tls_mode = query.get("security", [""])[0].lower()
|
||||
tls = tls_mode == "tls" or tls_mode == "reality"
|
||||
sni = query.get("sni", [""])[0]
|
||||
flow = query.get("flow", [""])[0]
|
||||
network = query.get("type", [""])[0]
|
||||
path = query.get("path", [""])[0]
|
||||
host = query.get("host", [""])[0]
|
||||
|
||||
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
|
||||
|
||||
proxy = {
|
||||
"name": name,
|
||||
"type": "vless",
|
||||
"server": server,
|
||||
"port": str(port),
|
||||
"uuid": uuid,
|
||||
"tls": tls,
|
||||
"udp": True
|
||||
}
|
||||
|
||||
if sni:
|
||||
proxy["sni"] = sni
|
||||
if flow:
|
||||
proxy["flow"] = flow
|
||||
|
||||
if network:
|
||||
proxy["network"] = network
|
||||
if network == "ws":
|
||||
proxy["ws-opts"] = {
|
||||
"path": path,
|
||||
"headers": {"Host": host}
|
||||
}
|
||||
elif network == "grpc":
|
||||
proxy["grpc-opts"] = {
|
||||
"grpc-service-name": path
|
||||
}
|
||||
|
||||
if tls_mode == "reality":
|
||||
proxy["reality-opts"] = {
|
||||
"public-key": query.get("pbk", [""])[0],
|
||||
"short-id": query.get("sid", [""])[0]
|
||||
}
|
||||
proxy["client-fingerprint"] = query.get("fp", ["chrome"])[0]
|
||||
alpn = query.get("alpn", [""])[0]
|
||||
if alpn:
|
||||
proxy["alpn"] = alpn.split(",")
|
||||
|
||||
return proxy
|
||||
|
||||
except Exception as e:
|
||||
print(f"VLESS parse error: {e}")
|
||||
return None
|
||||
elif scheme == "trojan":
|
||||
try:
|
||||
parsed = urlparse(line)
|
||||
query = dict(parse_qsl(parsed.query))
|
||||
|
||||
name = Converter.unique_name(names, unquote(parsed.fragment or f"{parsed.hostname}:{parsed.port}"))
|
||||
|
||||
trojan = {
|
||||
"name": name,
|
||||
"type": "trojan",
|
||||
"server": parsed.hostname,
|
||||
"port": parsed.port or 443,
|
||||
"password": parsed.username or "",
|
||||
"udp": True,
|
||||
}
|
||||
|
||||
# skip-cert-verify
|
||||
try:
|
||||
trojan["skip-cert-verify"] = Converter.strtobool(query.get("allowInsecure", "0"))
|
||||
except ValueError:
|
||||
trojan["skip-cert-verify"] = False
|
||||
|
||||
# optional fields
|
||||
if "sni" in query:
|
||||
trojan["sni"] = query["sni"]
|
||||
|
||||
alpn = query.get("alpn", "")
|
||||
if alpn:
|
||||
trojan["alpn"] = alpn.split(",")
|
||||
|
||||
network = query.get("type", "").lower()
|
||||
if network:
|
||||
trojan["network"] = network
|
||||
|
||||
if network == "ws":
|
||||
headers = {"User-Agent": "clash"} # 或 RandUserAgent()
|
||||
trojan["ws-opts"] = {
|
||||
"path": query.get("path", "/"),
|
||||
"headers": headers
|
||||
}
|
||||
|
||||
elif network == "grpc":
|
||||
trojan["grpc-opts"] = {
|
||||
"grpc-service-name": query.get("serviceName", "")
|
||||
}
|
||||
|
||||
fp = query.get("fp", "")
|
||||
trojan["client-fingerprint"] = fp if fp else "chrome"
|
||||
|
||||
proxies.append(trojan)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error parsing trojan:// link: {e}")
|
||||
elif scheme == "hysteria":
|
||||
try:
|
||||
parsed = urlparse(line)
|
||||
query = dict(parse_qsl(parsed.query))
|
||||
|
||||
name = Converter.unique_name(names, unquote(parsed.fragment or f"{parsed.hostname}:{parsed.port}"))
|
||||
hysteria = {
|
||||
"name": name,
|
||||
"type": "hysteria",
|
||||
"server": parsed.hostname,
|
||||
"port": parsed.port,
|
||||
"auth_str": parsed.username or query.get("auth", ""),
|
||||
"obfs": query.get("obfs", ""),
|
||||
"sni": query.get("peer", ""),
|
||||
"protocol": query.get("protocol", "")
|
||||
}
|
||||
|
||||
up = query.get("up", "")
|
||||
down = query.get("down", "")
|
||||
if not up:
|
||||
up = query.get("upmbps", "")
|
||||
if not down:
|
||||
down = query.get("downmbps", "")
|
||||
hysteria["up"] = up
|
||||
hysteria["down"] = down
|
||||
|
||||
# alpn split
|
||||
alpn = query.get("alpn", "")
|
||||
if alpn:
|
||||
hysteria["alpn"] = alpn.split(",")
|
||||
|
||||
# skip-cert-verify
|
||||
try:
|
||||
hysteria["skip-cert-verify"] = Converter.strtobool(query.get("insecure", "false"))
|
||||
except ValueError:
|
||||
hysteria["skip-cert-verify"] = False
|
||||
|
||||
proxies.append(hysteria)
|
||||
except Exception as e:
|
||||
print(f"Hysteria parse error: {e}")
|
||||
elif scheme in ("socks", "socks5", "socks5h"):
|
||||
try:
|
||||
parsed = urlparse(line)
|
||||
server = parsed.hostname
|
||||
port = parsed.port
|
||||
username = parsed.username or ""
|
||||
password = parsed.password or ""
|
||||
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
|
||||
|
||||
proxy = {
|
||||
"name": name,
|
||||
"type": "socks5",
|
||||
"server": server,
|
||||
"port": str(port),
|
||||
"username": username,
|
||||
"password": password,
|
||||
"udp": True
|
||||
}
|
||||
proxies.append(proxy)
|
||||
except Exception as e:
|
||||
print(f"SOCKS5 parse error: {e}")
|
||||
elif scheme == "ss":
|
||||
try:
|
||||
# 兼容 ss://base64 或 ss://base64#name
|
||||
if "#" in body:
|
||||
body, fragment = body.split("#", 1)
|
||||
name = Converter.unique_name(names, unquote(fragment))
|
||||
else:
|
||||
name = Converter.unique_name(names, "ss")
|
||||
|
||||
if "@" in body:
|
||||
userinfo, server = body.split("@", 1)
|
||||
else:
|
||||
decoded = Converter.decode_base64(body).decode()
|
||||
userinfo, server = decoded.rsplit("@", 1)
|
||||
|
||||
cipher, password = userinfo.split(":")
|
||||
server_host, server_port = server.split(":")
|
||||
|
||||
proxy = {
|
||||
"name": name,
|
||||
"type": "ss",
|
||||
"server": server_host,
|
||||
"port": server_port,
|
||||
"cipher": cipher,
|
||||
"password": password,
|
||||
"udp": True
|
||||
}
|
||||
proxies.append(proxy)
|
||||
except Exception as e:
|
||||
print(f"SS parse error: {e}")
|
||||
elif scheme == "ssr":
|
||||
try:
|
||||
decoded = Converter.decode_base64(body).decode()
|
||||
parts, _, params_str = decoded.partition("/?")
|
||||
host, port, protocol, method, obfs, password_enc = parts.split(":")
|
||||
|
||||
password = Converter.decode_base64(password_enc).decode()
|
||||
params = parse_qs(params_str)
|
||||
|
||||
remarks = Converter.decode_base64(params.get("remarks", [""])[0]).decode()
|
||||
obfsparam = Converter.decode_base64(params.get("obfsparam", [""])[0]).decode()
|
||||
protoparam = Converter.decode_base64(params.get("protoparam", [""])[0]).decode()
|
||||
|
||||
name = Converter.unique_name(names, remarks or f"{host}:{port}")
|
||||
|
||||
proxy = {
|
||||
"name": name,
|
||||
"type": "ssr",
|
||||
"server": host,
|
||||
"port": port,
|
||||
"cipher": method,
|
||||
"password": password,
|
||||
"obfs": obfs,
|
||||
"protocol": protocol,
|
||||
"udp": True
|
||||
}
|
||||
|
||||
if obfsparam:
|
||||
proxy["obfs-param"] = obfsparam
|
||||
if protoparam:
|
||||
proxy["protocol-param"] = protoparam
|
||||
|
||||
proxies.append(proxy)
|
||||
except Exception as e:
|
||||
print(f"SSR parse error: {e}")
|
||||
elif scheme == "tuic":
|
||||
try:
|
||||
parsed = urlparse(line)
|
||||
query = parse_qs(parsed.query)
|
||||
|
||||
user = parsed.username or ""
|
||||
password = parsed.password or ""
|
||||
server = parsed.hostname
|
||||
port = parsed.port or 443
|
||||
|
||||
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
|
||||
proxy = {
|
||||
"name": name,
|
||||
"type": "tuic",
|
||||
"server": server,
|
||||
"port": str(port),
|
||||
"udp": True
|
||||
}
|
||||
|
||||
if password:
|
||||
proxy["uuid"] = user
|
||||
proxy["password"] = password
|
||||
else:
|
||||
proxy["token"] = user
|
||||
|
||||
if "congestion_control" in query:
|
||||
proxy["congestion-controller"] = query["congestion_control"][0]
|
||||
if "alpn" in query:
|
||||
proxy["alpn"] = query["alpn"][0].split(",")
|
||||
if "sni" in query:
|
||||
proxy["sni"] = query["sni"][0]
|
||||
if query.get("disable_sni", ["0"])[0] == "1":
|
||||
proxy["disable-sni"] = True
|
||||
if "udp_relay_mode" in query:
|
||||
proxy["udp-relay-mode"] = query["udp_relay_mode"][0]
|
||||
|
||||
proxies.append(proxy)
|
||||
except Exception as e:
|
||||
print(f"TUIC parse error: {e}")
|
||||
elif scheme == "anytls":
|
||||
try:
|
||||
parsed = urlparse(line)
|
||||
query = parse_qs(parsed.query)
|
||||
|
||||
username = parsed.username or ""
|
||||
password = parsed.password or username
|
||||
server = parsed.hostname
|
||||
port = parsed.port
|
||||
insecure = query.get("insecure", ["0"])[0] == "1"
|
||||
sni = query.get("sni", [""])[0]
|
||||
fingerprint = query.get("hpkp", [""])[0]
|
||||
|
||||
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
|
||||
proxy = {
|
||||
"name": name,
|
||||
"type": "anytls",
|
||||
"server": server,
|
||||
"port": str(port),
|
||||
"username": username,
|
||||
"password": password,
|
||||
"sni": sni,
|
||||
"fingerprint": fingerprint,
|
||||
"skip-cert-verify": insecure,
|
||||
"udp": True
|
||||
}
|
||||
|
||||
proxies.append(proxy)
|
||||
except Exception as e:
|
||||
print(f"AnyTLS parse error: {e}")
|
||||
elif scheme in ("hysteria2", "hy2"):
|
||||
try:
|
||||
parsed = urlparse(line)
|
||||
query = parse_qs(parsed.query)
|
||||
|
||||
password = parsed.username or ""
|
||||
server = parsed.hostname
|
||||
port = parsed.port or 443
|
||||
|
||||
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
|
||||
proxy = {
|
||||
"name": name,
|
||||
"type": "hysteria2",
|
||||
"server": server,
|
||||
"port": str(port),
|
||||
"password": password,
|
||||
"obfs": query.get("obfs", [""])[0],
|
||||
"obfs-password": query.get("obfs-password", [""])[0],
|
||||
"sni": query.get("sni", [""])[0],
|
||||
"skip-cert-verify": query.get("insecure", ["false"])[0] == "true",
|
||||
"down": query.get("down", [""])[0],
|
||||
"up": query.get("up", [""])[0],
|
||||
"fingerprint": query.get("pinSHA256", [""])[0]
|
||||
}
|
||||
|
||||
if "alpn" in query:
|
||||
proxy["alpn"] = query["alpn"][0].split(",")
|
||||
|
||||
proxies.append(proxy)
|
||||
except Exception as e:
|
||||
print(f"Hysteria2 parse error: {e}")
|
||||
|
||||
if not proxies:
|
||||
raise ValueError("convert v2ray subscribe error: format invalid")
|
||||
|
||||
return proxies
|
||||
Reference in New Issue
Block a user