Files
MoviePilot-Plugins/plugins.v2/clashruleprovider/clashruleparser.py

1217 lines
48 KiB
Python

import re
from typing import List, Dict, Any, Optional, Union, Callable, Literal
from dataclasses import dataclass
from enum import Enum
from urllib.parse import urlparse, parse_qs, unquote, parse_qsl, urlencode, urlunparse
import json
import base64
import binascii
from pydantic import BaseModel, Field, validator, HttpUrl
class RuleProvider(BaseModel):
type: Literal["http", "file", "inline"] = Field(..., description="Provider type")
url: Optional[HttpUrl] = Field(None, description="Must be configured if the type is http")
path: Optional[str] = Field(None, description="Optional, file path, must be unique.")
interval: Optional[int] = Field(None, ge=0, description="The update interval for the provider, in seconds.")
proxy: Optional[str] = Field(None, description="Download/update through the specified proxy.")
behavior: Optional[Literal["domain", "ipcidr", "classical"]] = Field(None,
description="Behavior of the rule provider")
format: Literal["yaml", "text", "mrs"] = Field("yaml", description="Format of the rule provider file")
size_limit: int = Field(0, ge=0, description="The maximum size of downloadable files in bytes (0 for no limit)")
payload: Optional[List[str]] = Field(None, description="Content, only effective when type is inline")
@validator("url", pre=True, always=True, allow_reuse=True)
def check_url_for_http_type(cls, v, values):
if values.get("type") == "http" and v is None:
raise ValueError("url must be configured if the type is 'http'")
return v
@validator("path", pre=True, always=True, allow_reuse=True)
def check_path_for_file_type(cls, v, values):
if values.get("type") == "file" and v is None:
raise ValueError("path must be configured if the type is 'file'")
return v
@validator("payload", pre=True, always=True, allow_reuse=True)
def handle_payload_for_non_inline_type(cls, v, values):
# If type is not inline, payload should be ignored (set to None)
if values.get("type") != "inline" and v is not None:
return None
return v
@validator("payload", allow_reuse=True)
def check_payload_type_for_inline(cls, v, values):
if values.get("type") == "inline" and v is not None and not isinstance(v, list):
raise ValueError("payload must be a list of strings when type is 'inline'")
if values.get("type") == "inline" and v is None:
raise ValueError("payload must be configured if the type is 'inline'")
return v
@validator("format", allow_reuse=True)
def check_format_with_behavior(cls, v, values):
behavior = values.get("behavior")
if v == "mrs" and behavior not in ["domain", "ipcidr"]:
raise ValueError("mrs format only supports 'domain' or 'ipcidr' behavior")
return v
class RuleProviders(BaseModel):
__root__: dict[str, RuleProvider]
class ProxyGroupBase(BaseModel):
"""
包含所有代理组类型共有的通用字段。
"""
# Required field
name: str = Field(..., description="The name of the proxy group.")
# Proxy and provider references
proxies: Optional[List[str]] = Field(None, description="References to outbound proxies or other proxy groups.")
use: Optional[List[str]] = Field(None, description="References to proxy provider sets.")
# Health check fields
url: Optional[str] = Field(None, description="Health check test address.")
interval: Optional[int] = Field(None, description="Health check interval in seconds.")
lazy: bool = Field(True, description="If not selected, no health checks are performed.")
timeout: Optional[int] = Field(5000, description="Health check timeout in milliseconds.")
max_failed_times: Optional[int] = Field(5, description="Maximum number of failures before a forced health check.")
expected_status: Optional[str] = Field(None, description="Expected HTTP response status code for health checks.")
# Network and routing fields
disable_udp: Optional[bool] = Field(False, description="Disables UDP for this proxy group.")
interface_name: Optional[str] = Field(None, description="DEPRECATED. Specifies the outbound interface.")
routing_mark: Optional[int] = Field(None, description="DEPRECATED. The routing mark for outbound connections.")
# Dynamic proxy inclusion
include_all: Optional[bool] = Field(False, description="Includes all outbound proxies and proxy sets.")
include_all_proxies: Optional[bool] = Field(False, description="Includes all outbound proxies.")
include_all_providers: Optional[bool] = Field(False, description="Includes all proxy provider sets.")
# Filtering
filter: Optional[str] = Field(None, description="Regex to filter nodes from providers.")
exclude_filter: Optional[str] = Field(None, description="Regex to exclude nodes.")
exclude_type: Optional[str] = Field(None, description="Exclude nodes by adapter type, separated by '|'.")
# UI fields
hidden: Optional[bool] = Field(False, description="Hides the proxy group in the API.")
icon: Optional[str] = Field(None, description="Icon string for the proxy group, for UI use.")
@validator('expected_status', allow_reuse=True)
def validate_expected_status(cls, v: Optional[str]) -> Optional[str]:
if v is None or v == '*':
return v
pattern = re.compile(r'^\d{3}([-/]\d{3})*$')
if not pattern.match(v):
raise ValueError("Invalid format for expected-status.")
parts = re.split(r'[/]', v)
for part in parts:
if '-' in part:
start, end = part.split('-')
if not (start.isdigit() and end.isdigit() and 100 <= int(start) < 600 and 100 <= int(end) < 600 and int(start) <= int(end)):
raise ValueError(f"Invalid status code range: {part}")
elif not (part.isdigit() and 100 <= int(part) < 600):
raise ValueError(f"Invalid status code: {part}")
return v
class SelectGroup(ProxyGroupBase):
type: Literal['select']
class RelayGroup(ProxyGroupBase):
type: Literal['relay']
class FallbackGroup(ProxyGroupBase):
type: Literal['fallback']
class UrlTestGroup(ProxyGroupBase):
type: Literal['url-test']
tolerance: Optional[int] = Field(None, description="proxies switch tolerance, measured in milliseconds (ms).")
class LoadBalanceGroup(ProxyGroupBase):
type: Literal['load-balance']
strategy: Optional[Literal['round-robin', 'consistent-hashing', 'sticky-sessions']] = Field(
'round-robin',
description="Load balancing strategy."
)
# --- Discriminated Union ---
ProxyGroupUnion = Union[SelectGroup, RelayGroup, FallbackGroup, UrlTestGroup, LoadBalanceGroup]
class ProxyGroup(BaseModel):
__root__: ProxyGroupUnion
class AdditionalParam(Enum):
NO_RESOLVE = 'no-resolve'
SRC = 'src'
class RuleType(Enum):
"""Enumeration of all supported Clash rule types"""
DOMAIN = "DOMAIN"
DOMAIN_SUFFIX = "DOMAIN-SUFFIX"
DOMAIN_KEYWORD = "DOMAIN-KEYWORD"
DOMAIN_REGEX = "DOMAIN-REGEX"
GEOSITE = "GEOSITE"
IP_CIDR = "IP-CIDR"
IP_CIDR6 = "IP-CIDR6"
IP_SUFFIX = "IP-SUFFIX"
IP_ASN = "IP-ASN"
GEOIP = "GEOIP"
SRC_GEOIP = "SRC-GEOIP"
SRC_IP_ASN = "SRC-IP-ASN"
SRC_IP_CIDR = "SRC-IP-CIDR"
SRC_IP_SUFFIX = "SRC-IP-SUFFIX"
DST_PORT = "DST-PORT"
SRC_PORT = "SRC-PORT"
IN_PORT = "IN-PORT"
IN_TYPE = "IN-TYPE"
IN_USER = "IN-USER"
IN_NAME = "IN-NAME"
PROCESS_PATH = "PROCESS-PATH"
PROCESS_PATH_REGEX = "PROCESS-PATH-REGEX"
PROCESS_NAME = "PROCESS-NAME"
PROCESS_NAME_REGEX = "PROCESS-NAME-REGEX"
UID = "UID"
NETWORK = "NETWORK"
DSCP = "DSCP"
RULE_SET = "RULE-SET"
AND = "AND"
OR = "OR"
NOT = "NOT"
SUB_RULE = "SUB-RULE"
MATCH = "MATCH"
class Action(Enum):
"""Enumeration of rule actions"""
DIRECT = "DIRECT"
REJECT = "REJECT"
REJECT_DROP = "REJECT-DROP"
PASS = "PASS"
COMPATIBLE = "COMPATIBLE"
@dataclass
class ClashRule:
"""Represents a parsed Clash routing rule"""
rule_type: RuleType
payload: str
action: Union[Action, str] # Can be Action enum or custom proxy group name
additional_params: Optional[AdditionalParam] = None
raw_rule: str = ""
priority: int = 0
def condition_string(self) -> str:
return f"{self.rule_type.value},{self.payload}"
@dataclass
class LogicRule:
"""Represents a logic rule (AND, OR, NOT)"""
logic_type: RuleType
conditions: List[Union[ClashRule, 'LogicRule']]
action: Union[Action, str]
raw_rule: str = ""
priority: int = 0
def condition_string(self) -> str:
conditions_str = ','.join([f"({c.condition_string()})" for c in self.conditions])
return f"{self.logic_type.value},({conditions_str})"
@dataclass
class MatchRule:
"""Represents a match rule"""
action: Union[Action, str]
raw_rule: str = ""
priority: int = 0
rule_type: RuleType = RuleType.MATCH
@staticmethod
def condition_string() -> str:
return "MATCH"
class ClashRuleParser:
"""Parser for Clash routing rules"""
def __init__(self):
self.rules: List[Union[ClashRule, LogicRule, MatchRule]] = []
@staticmethod
def parse_rule_line(line: str) -> Optional[Union[ClashRule, LogicRule, MatchRule]]:
"""Parse a single rule line"""
line = line.strip()
try:
# Handle logic rules (AND, OR, NOT)
if line.startswith(('AND,', 'OR,', 'NOT,')):
return ClashRuleParser._parse_logic_rule(line)
elif line.startswith('MATCH'):
return ClashRuleParser._parse_match_rule(line)
# Handle regular rules
return ClashRuleParser._parse_regular_rule(line)
except Exception as e:
return None
@staticmethod
def parse_rule_dict(clash_rule: Dict[str, Any]) -> Optional[Union[ClashRule, LogicRule, MatchRule]]:
if not clash_rule:
return None
if clash_rule.get("type") in ('AND', 'OR', 'NOT'):
conditions = clash_rule.get("conditions")
if not conditions:
return None
conditions_str = ''
for condition in conditions:
conditions_str += f'({condition.get("type")},{condition.get("payload")})'
conditions_str = f"({conditions_str})"
raw_rule = f"{clash_rule.get('type')},{conditions_str},{clash_rule.get('action')}"
rule = ClashRuleParser._parse_logic_rule(raw_rule)
elif clash_rule.get("type") == 'MATCH':
raw_rule = f"{clash_rule.get('type')},{clash_rule.get('action')}"
rule = ClashRuleParser._parse_match_rule(raw_rule)
else:
raw_rule = f"{clash_rule.get('type')},{clash_rule.get('payload')},{clash_rule.get('action')}"
if clash_rule.get('additional_params'):
raw_rule += f",{clash_rule.get('additional_params')}"
rule = ClashRuleParser._parse_regular_rule(raw_rule)
if rule and 'priority' in clash_rule:
rule.priority = clash_rule['priority']
return rule
@staticmethod
def _parse_match_rule(line: str) -> MatchRule:
parts = line.split(',')
if len(parts) < 2:
raise ValueError(f"Invalid rule format: {line}")
action = parts[1]
# Validate rule type
try:
action_enum = Action(action.upper())
final_action = action_enum
except ValueError:
final_action = action
return MatchRule(
action=final_action,
raw_rule=line
)
@staticmethod
def _parse_regular_rule(line: str) -> ClashRule:
"""Parse a regular (non-logic) rule"""
parts = line.split(',')
if len(parts) < 3 or len(parts) > 4:
raise ValueError(f"Invalid rule format: {line}")
rule_type_str = parts[0].upper()
payload = parts[1]
action = parts[2]
if not payload or not rule_type_str:
raise ValueError(f"Invalid rule format: {line}")
additional_params = parts[3] if len(parts) > 3 else None
# Validate rule type
try:
rule_type = RuleType(rule_type_str)
except ValueError:
raise ValueError(f"Unknown rule type: {rule_type_str}")
# Try to convert action to enum, otherwise keep as string (custom proxy group)
try:
action_enum = Action(action.upper())
final_action = action_enum
except ValueError:
final_action = action
return ClashRule(
rule_type=rule_type,
payload=payload,
action=final_action,
additional_params=additional_params,
raw_rule=line
)
@staticmethod
def _parse_logic_rule(line: str) -> LogicRule:
"""Parse a logic rule (AND, OR, NOT)"""
# Extract logic type
logic_rule_match = re.match(r'^(AND|OR|NOT),\((.+)\),([^,]+)$', line)
if not logic_rule_match:
raise ValueError(f"Cannot extract action from logic rule: {line}")
logic_type_str = logic_rule_match.group(1).upper()
logic_type = RuleType(logic_type_str)
action = logic_rule_match.group(3)
# Try to convert action to enum
try:
action_enum = Action(action.upper())
final_action = action_enum
except ValueError:
final_action = action
conditions_str = logic_rule_match.group(2)
conditions = ClashRuleParser._parse_logic_conditions(conditions_str)
return LogicRule(
logic_type=logic_type,
conditions=conditions,
action=final_action,
raw_rule=line
)
@staticmethod
def _parse_logic_conditions(conditions_str: str) -> List[ClashRule]:
"""Parse conditions within logic rules"""
conditions = []
# Simple parser for conditions like (DOMAIN,baidu.com),(NETWORK,UDP)
# This is a basic implementation - more complex nested logic would need a proper parser
condition_pattern = r'\(([^,]+),([^)]+)\)'
matches = re.findall(condition_pattern, conditions_str)
for rule_type_str, payload in matches:
try:
rule_type = RuleType(rule_type_str.upper())
condition = ClashRule(
rule_type=rule_type,
payload=payload,
action="", # Logic conditions don't have actions
raw_rule=f"{rule_type_str},{payload}"
)
conditions.append(condition)
except ValueError:
continue
return conditions
def parse_rules(self, rules_text: str) -> List[Union[ClashRule, LogicRule, MatchRule]]:
"""Parse multiple rules from text, preserving order and priority"""
self.rules = []
lines = rules_text.strip().split('\n')
priority = 0
for line in lines:
rule = self.parse_rule_line(line)
if rule:
rule.priority = priority # Assign priority based on position
self.rules.append(rule)
priority += 1
return self.rules
def parse_rules_from_list(self, rules_list: List[str]) -> List[Union[ClashRule, LogicRule, MatchRule]]:
"""Parse rules from a list of rule strings, preserving order and priority"""
self.rules = []
for priority, rule_str in enumerate(rules_list):
rule = self.parse_rule_line(rule_str)
if rule:
rule.priority = priority # Assign priority based on list position
self.rules.append(rule)
return self.rules
def validate_rule(self, rule: ClashRule) -> bool:
"""Validate a parsed rule"""
try:
# Basic validation based on rule type
if rule.rule_type in [RuleType.IP_CIDR, RuleType.IP_CIDR6]:
# Validate CIDR format
return '/' in rule.payload
elif rule.rule_type == RuleType.DST_PORT or rule.rule_type == RuleType.SRC_PORT:
# Validate port number/range
return rule.payload.isdigit() or '-' in rule.payload
elif rule.rule_type == RuleType.NETWORK:
# Validate network type
return rule.payload.lower() in ['tcp', 'udp']
elif rule.rule_type == RuleType.DOMAIN_REGEX or rule.rule_type == RuleType.PROCESS_PATH_REGEX:
# Try to compile regex
re.compile(rule.payload)
return True
return True
except Exception:
return False
def to_list(self) -> List[str]:
result = []
for rule in self.rules:
result.append(rule.raw_rule)
return result
def to_dict(self) -> List[Dict[str, Any]]:
"""Convert parsed rules to dictionary format"""
result = []
for rule in self.rules:
if isinstance(rule, ClashRule):
rule_dict = {
'type': rule.rule_type.value,
'payload': rule.payload,
'action': rule.action.value if isinstance(rule.action, Action) else rule.action,
'additional_params': rule.additional_params,
'priority': rule.priority,
'raw': rule.raw_rule
}
result.append(rule_dict)
elif isinstance(rule, LogicRule):
conditions_dict = []
for condition in rule.conditions:
if isinstance(condition, ClashRule):
conditions_dict.append({
'type': condition.rule_type.value,
'payload': condition.payload
})
rule_dict = {
'type': rule.logic_type.value,
'conditions': conditions_dict,
'action': rule.action.value if isinstance(rule.action, Action) else rule.action,
'priority': rule.priority,
'raw': rule.raw_rule
}
result.append(rule_dict)
elif isinstance(rule, MatchRule):
rule_dict = {
'type': 'MATCH',
'action': rule.action.value if isinstance(rule.action, Action) else rule.action,
'priority': rule.priority,
'raw': rule.raw_rule
}
result.append(rule_dict)
return result
def get_rules_by_priority(self) -> List[Union[ClashRule, LogicRule, MatchRule]]:
"""Get rules sorted by priority (highest priority first)"""
return sorted(self.rules, key=lambda rule: rule.priority)
def append_rule(self, rule: Union[ClashRule, LogicRule, MatchRule]) -> None:
max_priority = max(rule.priority for rule in self.rules) if len(self.rules) else 0
rule.priority = max_priority + 1
self.rules.append(rule)
# Re-sort rules to maintain order
self.rules.sort(key=lambda r: r.priority)
def append_rules(self, rules: List[Union[ClashRule, LogicRule, MatchRule]]) -> None:
max_priority = max(rule.priority for rule in self.rules) if len(self.rules) else 0
priority = max_priority + 1
for rule in rules:
rule.priority = priority
self.rules.append(rule)
priority += 1
self.rules.sort(key=lambda r: r.priority)
def insert_rule_at_priority(self, rule: Union[ClashRule, LogicRule, MatchRule], priority: int):
"""Insert a rule at a specific priority position, adjusting other rules"""
# Adjust priorities of existing rules
for existing_rule in self.rules:
if existing_rule.priority >= priority:
existing_rule.priority += 1
rule.priority = priority
self.rules.append(rule)
# Re-sort rules to maintain order
self.rules.sort(key=lambda r: r.priority)
def update_rule_at_priority(self, clash_rule: Union[ClashRule, LogicRule], priority: int) -> bool:
if clash_rule.priority == priority:
for index, existing_rule in enumerate(self.rules):
if existing_rule.priority == priority:
self.rules[index] = clash_rule
self.rules[index].priority = priority
return True
return False
else:
removed = self.remove_rule_at_priority(priority)
if not removed:
return False
self.insert_rule_at_priority(clash_rule, clash_rule.priority)
return True
def get_rule_at_priority(self, priority: int) -> Optional[Union[ClashRule, LogicRule, MatchRule]]:
for rule in self.rules:
if rule.priority == priority:
return rule
return None
def remove_rule_at_priority(self, priority: int) -> Optional[Union[ClashRule, LogicRule, MatchRule]]:
"""Remove rule at specific priority and adjust remaining priorities"""
rule_to_remove = None
for rule in self.rules:
if rule.priority == priority:
rule_to_remove = rule
break
if rule_to_remove:
self.rules.remove(rule_to_remove)
# Adjust priorities of remaining rules
for rule in self.rules:
if rule.priority > priority:
rule.priority -= 1
return rule_to_remove
return None
def remove_rules(self, condition: Callable[[Union[ClashRule, LogicRule, MatchRule]], bool]):
"""Remove rules by lambda"""
i = 0
while i < len(self.rules):
if condition(self.rules[i]):
priority = self.rules[i].priority
for rule in self.rules:
if rule.priority > priority:
rule.priority -= 1
del self.rules[i]
else:
i += 1
def move_rule_priority(self, from_priority: int, to_priority: int) -> bool:
"""Move a rule from one priority position to another"""
rule_to_move = None
for rule in self.rules:
if rule.priority == from_priority:
rule_to_move = rule
break
if not rule_to_move:
return False
# Remove rule temporarily
self.remove_rule_at_priority(from_priority)
# Insert at new priority
self.insert_rule_at_priority(rule_to_move, to_priority)
return True
def filter_rules_by_type(self, rule_type: RuleType) -> List[ClashRule]:
"""Filter rules by type"""
return [rule for rule in self.rules
if isinstance(rule, ClashRule) and rule.rule_type == rule_type]
def filter_rules_by_action(self, action: Union[Action, str]) -> List[Union[ClashRule, LogicRule, MatchRule]]:
"""Filter rules by action"""
return [rule for rule in self.rules if rule.action == action]
def has_rule(self, clash_rule: Union[ClashRule, LogicRule, MatchRule]) -> bool:
for rule in self.rules:
if rule.rule_type == clash_rule.rule_type and rule.action == clash_rule.action \
and rule.payload == clash_rule.payload:
return True
return False
def reorder_rules(
self,
moved_rule_priority: int,
target_priority: int,
):
"""
Reorder rules
:param moved_rule_priority: 被移动规则的原始优先级
:param target_priority: 目标位置的优先级
"""
moved_index = next(i for i, r in enumerate(self.rules) if r.priority == moved_rule_priority)
target_index = next(
(i for i, r in enumerate(self.rules) if r.priority == target_priority),
len(self.rules)
)
# 直接修改被移动规则的优先级
moved_rule = self.rules[moved_index]
moved_rule.priority = target_priority
if moved_index < target_index:
# 向后移动:原位置到目标位置之间的规则优先级 -1
for i in range(moved_index + 1, target_index + 1):
self.rules[i].priority -= 1
elif moved_index > target_index:
# 向前移动:目标位置到原位置之间的规则优先级 +1
for i in range(target_index, moved_index):
self.rules[i].priority += 1
self.rules.sort(key=lambda x: x.priority)
class Converter:
"""
Converter for V2Ray Subscription
Reference:
https://github.com/MetaCubeX/mihomo/blob/Alpha/common/convert/converter.go
https://github.com/SubConv/SubConv/blob/main/modules/convert/converter.py
"""
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome'
@staticmethod
def decode_base64(data):
# 添加适配不同 padding 的容错机制
data = data.strip()
missing_padding = len(data) % 4
if missing_padding:
data += '=' * (4 - missing_padding)
return base64.b64decode(data)
@staticmethod
def try_decode_base64_json(data):
try:
return json.loads(Converter.decode_base64(data).decode('utf-8'))
except (binascii.Error, UnicodeDecodeError, json.JSONDecodeError, TypeError):
return None
@staticmethod
def unique_name(name_map, name):
index = name_map.get(name, 0)
name_map[name] = index + 1
if index > 0:
return f"{name}-{index:02d}"
return name
@staticmethod
def strtobool(val):
val = val.lower()
if val in ("y", "yes", "t", "true", "on", "1"):
return True
elif val in ("n", "no", "f", "false", "off", "0"):
return False
else:
raise ValueError(f"invalid truth value {val!r}")
@staticmethod
def convert_v2ray(v2ray_link: Union[list, bytes], skip_exception: bool = True) -> List[Dict[str, Any]]:
if isinstance(v2ray_link, bytes):
decoded = Converter.decode_base64(v2ray_link).decode("utf-8")
lines = decoded.strip().splitlines()
else:
lines = v2ray_link
proxies = []
names = {}
for line in lines:
line = line.strip()
if not line:
continue
if "://" not in line:
continue
scheme, body = line.split("://", 1)
scheme = scheme.lower()
if scheme == "vmess":
try:
vmess_data = Converter.try_decode_base64_json(body)
name = Converter.unique_name(names, vmess_data.get("ps", "vmess"))
net = str(vmess_data.get("net", "")).lower()
fake_type = str(vmess_data.get("type", "")).lower()
tls_mode = str(vmess_data.get("tls", "")).lower()
cipher = vmess_data.get("scy", "auto") or "auto"
alter_id = vmess_data.get("aid", 0)
# 调整 network 类型
if fake_type == "http":
net = "http"
elif net == "http":
net = "h2"
proxy = {
"name": name,
"type": "vmess",
"server": vmess_data.get("add"),
"port": vmess_data.get("port"),
"uuid": vmess_data.get("id"),
"alterId": alter_id,
"cipher": cipher,
"tls": tls_mode.endswith("tls") or tls_mode == "reality",
"udp": True,
"xudp": True,
"skip-cert-verify": False,
"network": net
}
# TLS Reality 扩展
if proxy["tls"]:
proxy["client-fingerprint"] = vmess_data.get("fp", "chrome") or "chrome"
alpn = vmess_data.get("alpn")
if alpn:
proxy["alpn"] = alpn.split(",") if isinstance(alpn, str) else alpn
sni = vmess_data.get("sni")
if sni:
proxy["servername"] = sni
if tls_mode == "reality":
proxy["reality-opts"] = {
"public-key": vmess_data.get("pbk", ""),
"short-id": vmess_data.get("sid", "")
}
path = vmess_data.get("path", "/")
host = vmess_data.get("host")
# 不同 network 的扩展字段处理
if net == "tcp":
if fake_type == "http":
proxy["http-opts"] = {
"path": path,
"headers": {"Host": host} if host else {}
}
elif net == "http":
proxy["network"] = "http"
proxy["http-opts"] = {
"path": path,
"headers": {"Host": host} if host else {}
}
elif net == "h2":
proxy["h2-opts"] = {
"path": path,
"host": [host] if host else []
}
elif net == "ws":
ws_headers = {"Host": host} if host else {}
ws_headers["User-Agent"] = Converter.user_agent
ws_opts = {
"path": path,
"headers": ws_headers
}
# 补充 early-data 配置
early_data = vmess_data.get("ed")
if early_data:
try:
ws_opts["max-early-data"] = int(early_data)
except ValueError:
pass
early_data_header = vmess_data.get("edh")
if early_data_header:
ws_opts["early-data-header-name"] = early_data_header
proxy["ws-opts"] = ws_opts
elif net == "grpc":
proxy["grpc-opts"] = {
"grpc-service-name": path
}
proxies.append(proxy)
except Exception as e:
if not skip_exception:
raise ValueError(f"VMESS parse error: {e}") from e
elif scheme == "vless":
try:
parsed = urlparse(line)
query = dict(parse_qsl(parsed.query))
uuid = parsed.username or ""
server = parsed.hostname or ""
port = parsed.port or 443
tls_mode = query.get("security", "").lower()
tls = tls_mode == "tls" or tls_mode == "reality"
sni = query.get("sni", "")
flow = query.get("flow", "")
network = query.get("type", "tcp")
path = query.get("path", "")
host = query.get("host", "")
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
proxy: Dict[str, Any] = {
"name": name,
"type": "vless",
"server": server,
"port": port,
"uuid": uuid,
"tls": tls,
"udp": True
}
if sni:
proxy["servername"] = sni
if flow:
proxy["flow"] = flow
if tls:
proxy["skip-cert-verify"] = Converter.strtobool(query.get("allowInsecure", "0"))
if network:
proxy["network"] = network
if network in ["ws", "httpupgrade"]:
headers = {"User-Agent": Converter.user_agent}
if host:
headers["Host"] = host
ws_opts:Dict[str, Any] = { "path": path, "headers": headers }
try:
parsed_path = urlparse(path)
q = dict(parse_qsl(parsed_path.query))
if "ed" in q:
med = int(q["ed"])
if network == "ws":
ws_opts["max-early-data"] = med
ws_opts["early-data-header-name"] = q.get("eh", "Sec-WebSocket-Protocol")
elif network == "httpupgrade":
ws_opts["v2ray-http-upgrade-fast-open"] = True
if "eh" in q and q["eh"]:
ws_opts["early-data-header-name"] = q["eh"]
except Exception:
pass
proxy["ws-opts"] = ws_opts
elif network == "grpc":
proxy["grpc-opts"] = {
"grpc-service-name": query.get("serviceName", "")
}
if tls_mode == "reality":
proxy["reality-opts"] = {
"public-key": query.get("pbk", "")
}
if query.get("sid"):
proxy["reality-opts"]["short-id"] = query.get("sid", "")
proxy["client-fingerprint"] = query.get("fp", "chrome")
alpn = query.get("alpn", "")
if alpn:
proxy["alpn"] = alpn.split(",")
if tls_mode.endswith("tls"):
proxy["client-fingerprint"] = query.get("fp", "chrome")
alpn = query.get("alpn", "")
if alpn:
proxy["alpn"] = alpn.split(",")
proxies.append(proxy)
except Exception as e:
if not skip_exception:
raise ValueError(f"VLESS parse error: {e}") from e
elif scheme == "trojan":
try:
parsed = urlparse(line)
query = dict(parse_qsl(parsed.query))
name = Converter.unique_name(names, unquote(parsed.fragment or f"{parsed.hostname}:{parsed.port}"))
trojan = {
"name": name,
"type": "trojan",
"server": parsed.hostname,
"port": parsed.port or 443,
"password": parsed.username or "",
"udp": True,
}
# skip-cert-verify
try:
trojan["skip-cert-verify"] = Converter.strtobool(query.get("allowInsecure", "0"))
except ValueError:
trojan["skip-cert-verify"] = False
# optional fields
if "sni" in query:
trojan["sni"] = query["sni"]
alpn = query.get("alpn", "")
if alpn:
trojan["alpn"] = alpn.split(",")
network = query.get("type", "").lower()
if network:
trojan["network"] = network
if network == "ws":
headers = {"User-Agent": Converter.user_agent}
trojan["ws-opts"] = {
"path": query.get("path", "/"),
"headers": headers
}
elif network == "grpc":
trojan["grpc-opts"] = {
"grpc-service-name": query.get("serviceName", "")
}
fp = query.get("fp", "")
trojan["client-fingerprint"] = fp if fp else "chrome"
proxies.append(trojan)
except Exception as e:
if not skip_exception:
raise ValueError(f"Trojan parse error: {e}") from e
elif scheme == "hysteria":
try:
parsed = urlparse(line)
query = dict(parse_qsl(parsed.query))
name = Converter.unique_name(names, unquote(parsed.fragment or f"{parsed.hostname}:{parsed.port}"))
hysteria = {
"name": name,
"type": "hysteria",
"server": parsed.hostname,
"port": parsed.port,
"auth_str": parsed.username or query.get("auth", ""),
"obfs": query.get("obfs", ""),
"sni": query.get("peer", ""),
"protocol": query.get("protocol", "")
}
up = query.get("up", "")
down = query.get("down", "")
if not up:
up = query.get("upmbps", "")
if not down:
down = query.get("downmbps", "")
hysteria["up"] = up
hysteria["down"] = down
# alpn split
alpn = query.get("alpn", "")
if alpn:
hysteria["alpn"] = alpn.split(",")
# skip-cert-verify
try:
hysteria["skip-cert-verify"] = Converter.strtobool(query.get("insecure", "false"))
except ValueError:
hysteria["skip-cert-verify"] = False
proxies.append(hysteria)
except Exception as e:
if not skip_exception:
raise ValueError(f"Hysteria parse error: {e}") from e
elif scheme in ("socks", "socks5", "socks5h"):
try:
parsed = urlparse(line)
server = parsed.hostname
port = parsed.port
username = parsed.username or ""
password = parsed.password or ""
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
proxy = {
"name": name,
"type": "socks5",
"server": server,
"port": port,
"username": username,
"password": password,
"udp": True
}
proxies.append(proxy)
except Exception as e:
if not skip_exception:
raise ValueError(f"SOCKS5 parse error: {e}") from e
elif scheme == "ss":
try:
parsed = urlparse(line)
# 兼容 ss://base64 或 ss://base64#name
if parsed.fragment:
name = Converter.unique_name(names, unquote(parsed.fragment))
else:
name = Converter.unique_name(names, "ss")
if parsed.port is None:
base64_body = body.split("#")[0]
parsed = urlparse(f"ss://{Converter.decode_base64(base64_body).decode('utf-8')}")
cipher_raw = parsed.username
cipher = cipher_raw
password = parsed.password
if not password:
dc_buf = Converter.decode_base64(cipher_raw).decode('utf-8')
if dc_buf.startswith("ss://"):
dc_buf = dc_buf[len("ss://"):]
dc_buf = Converter.decode_base64(dc_buf).decode('utf-8')
cipher, password = dc_buf.split(":", 1)
server = parsed.hostname
port = parsed.port
query = dict(parse_qsl(parsed.query))
proxy = {
"name": name,
"type": "ss",
"server": server,
"port": port,
"cipher": cipher,
"password": password,
"udp": True
}
plugin = query.get("plugin")
if plugin and ";" in plugin:
query_string = "pluginName=" + plugin.replace(";", "&")
plugin_info = parse_qs(query_string)
plugin_name = plugin_info.get("pluginName", [""])[0]
if "obfs" in plugin_name:
proxy["plugin"] = "obfs"
proxy["plugin-opts"] = {
"mode": plugin_info.get("obfs", [""])[0],
"host": plugin_info.get("obfs-host", [""])[0],
}
elif "v2ray-plugin" in plugin_name:
proxy["plugin"] = "v2ray-plugin"
proxy["plugin-opts"] = {
"mode": plugin_info.get("mode", [""])[0],
"host": plugin_info.get("host", [""])[0],
"path": plugin_info.get("path", [""])[0],
"tls": "tls" in plugin,
}
proxies.append(proxy)
except Exception as e:
if not skip_exception:
raise ValueError(f"SS parse error: {e}") from e
elif scheme == "ssr":
try:
decoded = Converter.decode_base64(body).decode()
parts, _, params_str = decoded.partition("/?")
host, port, protocol, method, obfs, password_enc = parts.split(":")
password = Converter.decode_base64(password_enc).decode()
params = parse_qs(params_str)
remarks = Converter.decode_base64(params.get("remarks", [""])[0]).decode()
obfsparam = Converter.decode_base64(params.get("obfsparam", [""])[0]).decode()
protoparam = Converter.decode_base64(params.get("protoparam", [""])[0]).decode()
name = Converter.unique_name(names, remarks or f"{host}:{port}")
proxy = {
"name": name,
"type": "ssr",
"server": host,
"port": port,
"cipher": method,
"password": password,
"obfs": obfs,
"protocol": protocol,
"udp": True
}
if obfsparam:
proxy["obfs-param"] = obfsparam
if protoparam:
proxy["protocol-param"] = protoparam
proxies.append(proxy)
except Exception as e:
if not skip_exception:
raise ValueError(f"SSR parse error: {e}") from e
elif scheme == "tuic":
try:
parsed = urlparse(line)
query = parse_qs(parsed.query)
user = parsed.username or ""
password = parsed.password or ""
server = parsed.hostname
port = parsed.port or 443
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
proxy = {
"name": name,
"type": "tuic",
"server": server,
"port": port,
"udp": True
}
if password:
proxy["uuid"] = user
proxy["password"] = password
else:
proxy["token"] = user
if "congestion_control" in query:
proxy["congestion-controller"] = query["congestion_control"][0]
if "alpn" in query:
proxy["alpn"] = query["alpn"][0].split(",")
if "sni" in query:
proxy["sni"] = query["sni"][0]
if query.get("disable_sni", ["0"])[0] == "1":
proxy["disable-sni"] = True
if "udp_relay_mode" in query:
proxy["udp-relay-mode"] = query["udp_relay_mode"][0]
proxies.append(proxy)
except Exception as e:
if not skip_exception:
raise ValueError(f"TUIC parse error: {e}") from e
elif scheme == "anytls":
try:
parsed = urlparse(line)
query = parse_qs(parsed.query)
username = parsed.username or ""
password = parsed.password or username
server = parsed.hostname
port = parsed.port
insecure = query.get("insecure", ["0"])[0] == "1"
sni = query.get("sni", [""])[0]
fingerprint = query.get("hpkp", [""])[0]
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
proxy = {
"name": name,
"type": "anytls",
"server": server,
"port": port,
"username": username,
"password": password,
"sni": sni,
"fingerprint": fingerprint,
"skip-cert-verify": insecure,
"udp": True
}
proxies.append(proxy)
except Exception as e:
if not skip_exception:
raise ValueError(f"AnyTLS parse error: {e}") from e
elif scheme in ("hysteria2", "hy2"):
try:
parsed = urlparse(line)
query = dict(parse_qsl(parsed.query))
password = parsed.username or ""
server = parsed.hostname
port = parsed.port or 443
name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}"))
proxy = {
"name": name,
"type": "hysteria2",
"server": server,
"port": port,
"password": password,
"obfs": query.get("obfs", ""),
"obfs-password": query.get("obfs-password", ""),
"sni": query.get("sni", ""),
"skip-cert-verify": Converter.strtobool(query.get("insecure", "false")),
"down": query.get("down", ""),
"up": query.get("up", ""),
}
if "pinSHA256" in query:
proxy["fingerprint"] = query.get("pinSHA256", "")
if "alpn" in query:
proxy["alpn"] = query["alpn"].split(",")
proxies.append(proxy)
except Exception as e:
if not skip_exception:
raise ValueError(f"Hysteria2 parse error: {e}") from e
if not proxies:
if not skip_exception:
raise ValueError("convert v2ray subscribe error: format invalid")
return proxies