import re from typing import List, Dict, Any, Optional, Union, Callable, Literal from dataclasses import dataclass from enum import Enum from urllib.parse import urlparse, parse_qs, unquote, parse_qsl, urlencode, urlunparse import json import base64 import binascii from pydantic import BaseModel, Field, validator, HttpUrl class RuleProvider(BaseModel): type: Literal["http", "file", "inline"] = Field(..., description="Provider type") url: Optional[HttpUrl] = Field(None, description="Must be configured if the type is http") path: Optional[str] = Field(None, description="Optional, file path, must be unique.") interval: Optional[int] = Field(None, ge=0, description="The update interval for the provider, in seconds.") proxy: Optional[str] = Field(None, description="Download/update through the specified proxy.") behavior: Optional[Literal["domain", "ipcidr", "classical"]] = Field(None, description="Behavior of the rule provider") format: Literal["yaml", "text", "mrs"] = Field("yaml", description="Format of the rule provider file") size_limit: int = Field(0, ge=0, description="The maximum size of downloadable files in bytes (0 for no limit)") payload: Optional[List[str]] = Field(None, description="Content, only effective when type is inline") @validator("url", pre=True, always=True, allow_reuse=True) def check_url_for_http_type(cls, v, values): if values.get("type") == "http" and v is None: raise ValueError("url must be configured if the type is 'http'") return v @validator("path", pre=True, always=True, allow_reuse=True) def check_path_for_file_type(cls, v, values): if values.get("type") == "file" and v is None: raise ValueError("path must be configured if the type is 'file'") return v @validator("payload", pre=True, always=True, allow_reuse=True) def handle_payload_for_non_inline_type(cls, v, values): # If type is not inline, payload should be ignored (set to None) if values.get("type") != "inline" and v is not None: return None return v @validator("payload", allow_reuse=True) def check_payload_type_for_inline(cls, v, values): if values.get("type") == "inline" and v is not None and not isinstance(v, list): raise ValueError("payload must be a list of strings when type is 'inline'") if values.get("type") == "inline" and v is None: raise ValueError("payload must be configured if the type is 'inline'") return v @validator("format", allow_reuse=True) def check_format_with_behavior(cls, v, values): behavior = values.get("behavior") if v == "mrs" and behavior not in ["domain", "ipcidr"]: raise ValueError("mrs format only supports 'domain' or 'ipcidr' behavior") return v class RuleProviders(BaseModel): __root__: dict[str, RuleProvider] class ProxyGroupBase(BaseModel): """ 包含所有代理组类型共有的通用字段。 """ # Required field name: str = Field(..., description="The name of the proxy group.") # Proxy and provider references proxies: Optional[List[str]] = Field(None, description="References to outbound proxies or other proxy groups.") use: Optional[List[str]] = Field(None, description="References to proxy provider sets.") # Health check fields url: Optional[str] = Field(None, description="Health check test address.") interval: Optional[int] = Field(None, description="Health check interval in seconds.") lazy: bool = Field(True, description="If not selected, no health checks are performed.") timeout: Optional[int] = Field(5000, description="Health check timeout in milliseconds.") max_failed_times: Optional[int] = Field(5, description="Maximum number of failures before a forced health check.") expected_status: Optional[str] = Field(None, description="Expected HTTP response status code for health checks.") # Network and routing fields disable_udp: Optional[bool] = Field(False, description="Disables UDP for this proxy group.") interface_name: Optional[str] = Field(None, description="DEPRECATED. Specifies the outbound interface.") routing_mark: Optional[int] = Field(None, description="DEPRECATED. The routing mark for outbound connections.") # Dynamic proxy inclusion include_all: Optional[bool] = Field(False, description="Includes all outbound proxies and proxy sets.") include_all_proxies: Optional[bool] = Field(False, description="Includes all outbound proxies.") include_all_providers: Optional[bool] = Field(False, description="Includes all proxy provider sets.") # Filtering filter: Optional[str] = Field(None, description="Regex to filter nodes from providers.") exclude_filter: Optional[str] = Field(None, description="Regex to exclude nodes.") exclude_type: Optional[str] = Field(None, description="Exclude nodes by adapter type, separated by '|'.") # UI fields hidden: Optional[bool] = Field(False, description="Hides the proxy group in the API.") icon: Optional[str] = Field(None, description="Icon string for the proxy group, for UI use.") @validator('expected_status', allow_reuse=True) def validate_expected_status(cls, v: Optional[str]) -> Optional[str]: if v is None or v == '*': return v pattern = re.compile(r'^\d{3}([-/]\d{3})*$') if not pattern.match(v): raise ValueError("Invalid format for expected-status.") parts = re.split(r'[/]', v) for part in parts: if '-' in part: start, end = part.split('-') if not (start.isdigit() and end.isdigit() and 100 <= int(start) < 600 and 100 <= int(end) < 600 and int(start) <= int(end)): raise ValueError(f"Invalid status code range: {part}") elif not (part.isdigit() and 100 <= int(part) < 600): raise ValueError(f"Invalid status code: {part}") return v class SelectGroup(ProxyGroupBase): type: Literal['select'] class RelayGroup(ProxyGroupBase): type: Literal['relay'] class FallbackGroup(ProxyGroupBase): type: Literal['fallback'] class UrlTestGroup(ProxyGroupBase): type: Literal['url-test'] tolerance: Optional[int] = Field(None, description="proxies switch tolerance, measured in milliseconds (ms).") class LoadBalanceGroup(ProxyGroupBase): type: Literal['load-balance'] strategy: Optional[Literal['round-robin', 'consistent-hashing', 'sticky-sessions']] = Field( 'round-robin', description="Load balancing strategy." ) # --- Discriminated Union --- ProxyGroupUnion = Union[SelectGroup, RelayGroup, FallbackGroup, UrlTestGroup, LoadBalanceGroup] class ProxyGroup(BaseModel): __root__: ProxyGroupUnion class AdditionalParam(Enum): NO_RESOLVE = 'no-resolve' SRC = 'src' class RuleType(Enum): """Enumeration of all supported Clash rule types""" DOMAIN = "DOMAIN" DOMAIN_SUFFIX = "DOMAIN-SUFFIX" DOMAIN_KEYWORD = "DOMAIN-KEYWORD" DOMAIN_REGEX = "DOMAIN-REGEX" GEOSITE = "GEOSITE" IP_CIDR = "IP-CIDR" IP_CIDR6 = "IP-CIDR6" IP_SUFFIX = "IP-SUFFIX" IP_ASN = "IP-ASN" GEOIP = "GEOIP" SRC_GEOIP = "SRC-GEOIP" SRC_IP_ASN = "SRC-IP-ASN" SRC_IP_CIDR = "SRC-IP-CIDR" SRC_IP_SUFFIX = "SRC-IP-SUFFIX" DST_PORT = "DST-PORT" SRC_PORT = "SRC-PORT" IN_PORT = "IN-PORT" IN_TYPE = "IN-TYPE" IN_USER = "IN-USER" IN_NAME = "IN-NAME" PROCESS_PATH = "PROCESS-PATH" PROCESS_PATH_REGEX = "PROCESS-PATH-REGEX" PROCESS_NAME = "PROCESS-NAME" PROCESS_NAME_REGEX = "PROCESS-NAME-REGEX" UID = "UID" NETWORK = "NETWORK" DSCP = "DSCP" RULE_SET = "RULE-SET" AND = "AND" OR = "OR" NOT = "NOT" SUB_RULE = "SUB-RULE" MATCH = "MATCH" class Action(Enum): """Enumeration of rule actions""" DIRECT = "DIRECT" REJECT = "REJECT" REJECT_DROP = "REJECT-DROP" PASS = "PASS" COMPATIBLE = "COMPATIBLE" @dataclass class ClashRule: """Represents a parsed Clash routing rule""" rule_type: RuleType payload: str action: Union[Action, str] # Can be Action enum or custom proxy group name additional_params: Optional[AdditionalParam] = None raw_rule: str = "" priority: int = 0 def condition_string(self) -> str: return f"{self.rule_type.value},{self.payload}" @dataclass class LogicRule: """Represents a logic rule (AND, OR, NOT)""" logic_type: RuleType conditions: List[Union[ClashRule, 'LogicRule']] action: Union[Action, str] raw_rule: str = "" priority: int = 0 def condition_string(self) -> str: conditions_str = ','.join([f"({c.condition_string()})" for c in self.conditions]) return f"{self.logic_type.value},({conditions_str})" @dataclass class MatchRule: """Represents a match rule""" action: Union[Action, str] raw_rule: str = "" priority: int = 0 rule_type: RuleType = RuleType.MATCH @staticmethod def condition_string() -> str: return "MATCH" class ClashRuleParser: """Parser for Clash routing rules""" def __init__(self): self.rules: List[Union[ClashRule, LogicRule, MatchRule]] = [] @staticmethod def parse_rule_line(line: str) -> Optional[Union[ClashRule, LogicRule, MatchRule]]: """Parse a single rule line""" line = line.strip() try: # Handle logic rules (AND, OR, NOT) if line.startswith(('AND,', 'OR,', 'NOT,')): return ClashRuleParser._parse_logic_rule(line) elif line.startswith('MATCH'): return ClashRuleParser._parse_match_rule(line) # Handle regular rules return ClashRuleParser._parse_regular_rule(line) except Exception as e: return None @staticmethod def parse_rule_dict(clash_rule: Dict[str, Any]) -> Optional[Union[ClashRule, LogicRule, MatchRule]]: if not clash_rule: return None if clash_rule.get("type") in ('AND', 'OR', 'NOT'): conditions = clash_rule.get("conditions") if not conditions: return None conditions_str = '' for condition in conditions: conditions_str += f'({condition.get("type")},{condition.get("payload")})' conditions_str = f"({conditions_str})" raw_rule = f"{clash_rule.get('type')},{conditions_str},{clash_rule.get('action')}" rule = ClashRuleParser._parse_logic_rule(raw_rule) elif clash_rule.get("type") == 'MATCH': raw_rule = f"{clash_rule.get('type')},{clash_rule.get('action')}" rule = ClashRuleParser._parse_match_rule(raw_rule) else: raw_rule = f"{clash_rule.get('type')},{clash_rule.get('payload')},{clash_rule.get('action')}" if clash_rule.get('additional_params'): raw_rule += f",{clash_rule.get('additional_params')}" rule = ClashRuleParser._parse_regular_rule(raw_rule) if rule and 'priority' in clash_rule: rule.priority = clash_rule['priority'] return rule @staticmethod def _parse_match_rule(line: str) -> MatchRule: parts = line.split(',') if len(parts) < 2: raise ValueError(f"Invalid rule format: {line}") action = parts[1] # Validate rule type try: action_enum = Action(action.upper()) final_action = action_enum except ValueError: final_action = action return MatchRule( action=final_action, raw_rule=line ) @staticmethod def _parse_regular_rule(line: str) -> ClashRule: """Parse a regular (non-logic) rule""" parts = line.split(',') if len(parts) < 3 or len(parts) > 4: raise ValueError(f"Invalid rule format: {line}") rule_type_str = parts[0].upper() payload = parts[1] action = parts[2] if not payload or not rule_type_str: raise ValueError(f"Invalid rule format: {line}") additional_params = parts[3] if len(parts) > 3 else None # Validate rule type try: rule_type = RuleType(rule_type_str) except ValueError: raise ValueError(f"Unknown rule type: {rule_type_str}") # Try to convert action to enum, otherwise keep as string (custom proxy group) try: action_enum = Action(action.upper()) final_action = action_enum except ValueError: final_action = action return ClashRule( rule_type=rule_type, payload=payload, action=final_action, additional_params=additional_params, raw_rule=line ) @staticmethod def _parse_logic_rule(line: str) -> LogicRule: """Parse a logic rule (AND, OR, NOT)""" # Extract logic type logic_rule_match = re.match(r'^(AND|OR|NOT),\((.+)\),([^,]+)$', line) if not logic_rule_match: raise ValueError(f"Cannot extract action from logic rule: {line}") logic_type_str = logic_rule_match.group(1).upper() logic_type = RuleType(logic_type_str) action = logic_rule_match.group(3) # Try to convert action to enum try: action_enum = Action(action.upper()) final_action = action_enum except ValueError: final_action = action conditions_str = logic_rule_match.group(2) conditions = ClashRuleParser._parse_logic_conditions(conditions_str) return LogicRule( logic_type=logic_type, conditions=conditions, action=final_action, raw_rule=line ) @staticmethod def _parse_logic_conditions(conditions_str: str) -> List[ClashRule]: """Parse conditions within logic rules""" conditions = [] # Simple parser for conditions like (DOMAIN,baidu.com),(NETWORK,UDP) # This is a basic implementation - more complex nested logic would need a proper parser condition_pattern = r'\(([^,]+),([^)]+)\)' matches = re.findall(condition_pattern, conditions_str) for rule_type_str, payload in matches: try: rule_type = RuleType(rule_type_str.upper()) condition = ClashRule( rule_type=rule_type, payload=payload, action="", # Logic conditions don't have actions raw_rule=f"{rule_type_str},{payload}" ) conditions.append(condition) except ValueError: continue return conditions @staticmethod def action_string(action: Union[Action, str]) -> str: return action.value if isinstance(action, Action) else action def parse_rules(self, rules_text: str) -> List[Union[ClashRule, LogicRule, MatchRule]]: """Parse multiple rules from text, preserving order and priority""" self.rules = [] lines = rules_text.strip().split('\n') priority = 0 for line in lines: rule = self.parse_rule_line(line) if rule: rule.priority = priority # Assign priority based on position self.rules.append(rule) priority += 1 return self.rules def parse_rules_from_list(self, rules_list: List[str]) -> List[Union[ClashRule, LogicRule, MatchRule]]: """Parse rules from a list of rule strings, preserving order and priority""" self.rules = [] for priority, rule_str in enumerate(rules_list): rule = self.parse_rule_line(rule_str) if rule: rule.priority = priority # Assign priority based on list position self.rules.append(rule) return self.rules def validate_rule(self, rule: ClashRule) -> bool: """Validate a parsed rule""" try: # Basic validation based on rule type if rule.rule_type in [RuleType.IP_CIDR, RuleType.IP_CIDR6]: # Validate CIDR format return '/' in rule.payload elif rule.rule_type == RuleType.DST_PORT or rule.rule_type == RuleType.SRC_PORT: # Validate port number/range return rule.payload.isdigit() or '-' in rule.payload elif rule.rule_type == RuleType.NETWORK: # Validate network type return rule.payload.lower() in ['tcp', 'udp'] elif rule.rule_type == RuleType.DOMAIN_REGEX or rule.rule_type == RuleType.PROCESS_PATH_REGEX: # Try to compile regex re.compile(rule.payload) return True return True except Exception: return False def to_list(self) -> List[str]: result = [] for rule in self.rules: result.append(rule.raw_rule) return result def to_dict(self) -> List[Dict[str, Any]]: """Convert parsed rules to dictionary format""" result = [] for rule in self.rules: if isinstance(rule, ClashRule): rule_dict = { 'type': rule.rule_type.value, 'payload': rule.payload, 'action': rule.action.value if isinstance(rule.action, Action) else rule.action, 'additional_params': rule.additional_params, 'priority': rule.priority, 'raw': rule.raw_rule } result.append(rule_dict) elif isinstance(rule, LogicRule): conditions_dict = [] for condition in rule.conditions: if isinstance(condition, ClashRule): conditions_dict.append({ 'type': condition.rule_type.value, 'payload': condition.payload }) rule_dict = { 'type': rule.logic_type.value, 'conditions': conditions_dict, 'action': rule.action.value if isinstance(rule.action, Action) else rule.action, 'priority': rule.priority, 'raw': rule.raw_rule } result.append(rule_dict) elif isinstance(rule, MatchRule): rule_dict = { 'type': 'MATCH', 'action': rule.action.value if isinstance(rule.action, Action) else rule.action, 'priority': rule.priority, 'raw': rule.raw_rule } result.append(rule_dict) return result def get_rules_by_priority(self) -> List[Union[ClashRule, LogicRule, MatchRule]]: """Get rules sorted by priority (highest priority first)""" return sorted(self.rules, key=lambda rule: rule.priority) def append_rule(self, rule: Union[ClashRule, LogicRule, MatchRule]) -> None: max_priority = max(rule.priority for rule in self.rules) if len(self.rules) else 0 rule.priority = max_priority + 1 self.rules.append(rule) # Re-sort rules to maintain order self.rules.sort(key=lambda r: r.priority) def append_rules(self, rules: List[Union[ClashRule, LogicRule, MatchRule]]) -> None: max_priority = max(rule.priority for rule in self.rules) if len(self.rules) else 0 priority = max_priority + 1 for rule in rules: rule.priority = priority self.rules.append(rule) priority += 1 self.rules.sort(key=lambda r: r.priority) def insert_rule_at_priority(self, rule: Union[ClashRule, LogicRule, MatchRule], priority: int): """Insert a rule at a specific priority position, adjusting other rules""" # Adjust priorities of existing rules for existing_rule in self.rules: if existing_rule.priority >= priority: existing_rule.priority += 1 rule.priority = priority self.rules.append(rule) # Re-sort rules to maintain order self.rules.sort(key=lambda r: r.priority) def update_rule_at_priority(self, clash_rule: Union[ClashRule, LogicRule], priority: int) -> bool: if clash_rule.priority == priority: for index, existing_rule in enumerate(self.rules): if existing_rule.priority == priority: self.rules[index] = clash_rule self.rules[index].priority = priority return True return False else: removed = self.remove_rule_at_priority(priority) if not removed: return False self.insert_rule_at_priority(clash_rule, clash_rule.priority) return True def get_rule_at_priority(self, priority: int) -> Optional[Union[ClashRule, LogicRule, MatchRule]]: for rule in self.rules: if rule.priority == priority: return rule return None def remove_rule_at_priority(self, priority: int) -> Optional[Union[ClashRule, LogicRule, MatchRule]]: """Remove rule at specific priority and adjust remaining priorities""" rule_to_remove = None for rule in self.rules: if rule.priority == priority: rule_to_remove = rule break if rule_to_remove: self.rules.remove(rule_to_remove) # Adjust priorities of remaining rules for rule in self.rules: if rule.priority > priority: rule.priority -= 1 return rule_to_remove return None def remove_rules(self, condition: Callable[[Union[ClashRule, LogicRule, MatchRule]], bool]): """Remove rules by lambda""" i = 0 while i < len(self.rules): if condition(self.rules[i]): priority = self.rules[i].priority for rule in self.rules: if rule.priority > priority: rule.priority -= 1 del self.rules[i] else: i += 1 def move_rule_priority(self, from_priority: int, to_priority: int) -> bool: """Move a rule from one priority position to another""" rule_to_move = None for rule in self.rules: if rule.priority == from_priority: rule_to_move = rule break if not rule_to_move: return False # Remove rule temporarily self.remove_rule_at_priority(from_priority) # Insert at new priority self.insert_rule_at_priority(rule_to_move, to_priority) return True def filter_rules_by_lambda(self, condition: Callable[[Union[ClashRule, LogicRule, MatchRule]], bool]): """Filter rules by lambda""" return [rule for rule in self.rules if condition(rule)] def filter_rules_by_type(self, rule_type: RuleType) -> List[ClashRule]: """Filter rules by type""" return [rule for rule in self.rules if isinstance(rule, ClashRule) and rule.rule_type == rule_type] def filter_rules_by_action(self, action: Union[Action, str]) -> List[Union[ClashRule, LogicRule, MatchRule]]: """Filter rules by action""" return [rule for rule in self.rules if rule.action == action] def has_rule(self, clash_rule: Union[ClashRule, LogicRule, MatchRule]) -> bool: for rule in self.rules: if rule.rule_type != RuleType.MATCH: if rule.rule_type == clash_rule.rule_type and rule.action == clash_rule.action \ and rule.payload == clash_rule.payload: return True else: if rule.rule_type == clash_rule.rule_type and rule.action == clash_rule.action: return True return False def reorder_rules( self, moved_rule_priority: int, target_priority: int, ): """ Reorder rules :param moved_rule_priority: 被移动规则的原始优先级 :param target_priority: 目标位置的优先级 """ moved_index = next(i for i, r in enumerate(self.rules) if r.priority == moved_rule_priority) target_index = next( (i for i, r in enumerate(self.rules) if r.priority == target_priority), len(self.rules) ) # 直接修改被移动规则的优先级 moved_rule = self.rules[moved_index] moved_rule.priority = target_priority if moved_index < target_index: # 向后移动:原位置到目标位置之间的规则优先级 -1 for i in range(moved_index + 1, target_index + 1): self.rules[i].priority -= 1 elif moved_index > target_index: # 向前移动:目标位置到原位置之间的规则优先级 +1 for i in range(target_index, moved_index): self.rules[i].priority += 1 self.rules.sort(key=lambda x: x.priority) class Converter: """ Converter for V2Ray Subscription Reference: https://github.com/MetaCubeX/mihomo/blob/Alpha/common/convert/converter.go https://github.com/SubConv/SubConv/blob/main/modules/convert/converter.py """ user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome' @staticmethod def decode_base64(data): # 添加适配不同 padding 的容错机制 data = data.strip() missing_padding = len(data) % 4 if missing_padding: data += '=' * (4 - missing_padding) return base64.b64decode(data) @staticmethod def try_decode_base64_json(data): try: return json.loads(Converter.decode_base64(data).decode('utf-8')) except (binascii.Error, UnicodeDecodeError, json.JSONDecodeError, TypeError): return None @staticmethod def unique_name(name_map, name): index = name_map.get(name, 0) name_map[name] = index + 1 if index > 0: return f"{name}-{index:02d}" return name @staticmethod def strtobool(val): val = val.lower() if val in ("y", "yes", "t", "true", "on", "1"): return True elif val in ("n", "no", "f", "false", "off", "0"): return False else: raise ValueError(f"invalid truth value {val!r}") @staticmethod def convert_v2ray(v2ray_link: Union[list, bytes], skip_exception: bool = True) -> List[Dict[str, Any]]: if isinstance(v2ray_link, bytes): decoded = Converter.decode_base64(v2ray_link).decode("utf-8") lines = decoded.strip().splitlines() else: lines = v2ray_link proxies = [] names = {} for line in lines: line = line.strip() if not line: continue if "://" not in line: continue scheme, body = line.split("://", 1) scheme = scheme.lower() if scheme == "vmess": try: vmess_data = Converter.try_decode_base64_json(body) name = Converter.unique_name(names, vmess_data.get("ps", "vmess")) net = str(vmess_data.get("net", "")).lower() fake_type = str(vmess_data.get("type", "")).lower() tls_mode = str(vmess_data.get("tls", "")).lower() cipher = vmess_data.get("scy", "auto") or "auto" alter_id = vmess_data.get("aid", 0) # 调整 network 类型 if fake_type == "http": net = "http" elif net == "http": net = "h2" proxy = { "name": name, "type": "vmess", "server": vmess_data.get("add"), "port": vmess_data.get("port"), "uuid": vmess_data.get("id"), "alterId": alter_id, "cipher": cipher, "tls": tls_mode.endswith("tls") or tls_mode == "reality", "udp": True, "xudp": True, "skip-cert-verify": False, "network": net } # TLS Reality 扩展 if proxy["tls"]: proxy["client-fingerprint"] = vmess_data.get("fp", "chrome") or "chrome" alpn = vmess_data.get("alpn") if alpn: proxy["alpn"] = alpn.split(",") if isinstance(alpn, str) else alpn sni = vmess_data.get("sni") if sni: proxy["servername"] = sni if tls_mode == "reality": proxy["reality-opts"] = { "public-key": vmess_data.get("pbk", ""), "short-id": vmess_data.get("sid", "") } path = vmess_data.get("path", "/") host = vmess_data.get("host") # 不同 network 的扩展字段处理 if net == "tcp": if fake_type == "http": proxy["http-opts"] = { "path": path, "headers": {"Host": host} if host else {} } elif net == "http": proxy["network"] = "http" proxy["http-opts"] = { "path": path, "headers": {"Host": host} if host else {} } elif net == "h2": proxy["h2-opts"] = { "path": path, "host": [host] if host else [] } elif net == "ws": ws_headers = {"Host": host} if host else {} ws_headers["User-Agent"] = Converter.user_agent ws_opts = { "path": path, "headers": ws_headers } # 补充 early-data 配置 early_data = vmess_data.get("ed") if early_data: try: ws_opts["max-early-data"] = int(early_data) except ValueError: pass early_data_header = vmess_data.get("edh") if early_data_header: ws_opts["early-data-header-name"] = early_data_header proxy["ws-opts"] = ws_opts elif net == "grpc": proxy["grpc-opts"] = { "grpc-service-name": path } proxies.append(proxy) except Exception as e: if not skip_exception: raise ValueError(f"VMESS parse error: {e}") from e elif scheme == "vless": try: parsed = urlparse(line) query = dict(parse_qsl(parsed.query)) uuid = parsed.username or "" server = parsed.hostname or "" port = parsed.port or 443 tls_mode = query.get("security", "").lower() tls = tls_mode == "tls" or tls_mode == "reality" sni = query.get("sni", "") flow = query.get("flow", "") network = query.get("type", "tcp") path = query.get("path", "") host = query.get("host", "") name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) proxy: Dict[str, Any] = { "name": name, "type": "vless", "server": server, "port": port, "uuid": uuid, "tls": tls, "udp": True } if sni: proxy["servername"] = sni if flow: proxy["flow"] = flow if tls: proxy["skip-cert-verify"] = Converter.strtobool(query.get("allowInsecure", "0")) if network: proxy["network"] = network if network in ["ws", "httpupgrade"]: headers = {"User-Agent": Converter.user_agent} if host: headers["Host"] = host ws_opts:Dict[str, Any] = { "path": path, "headers": headers } try: parsed_path = urlparse(path) q = dict(parse_qsl(parsed_path.query)) if "ed" in q: med = int(q["ed"]) if network == "ws": ws_opts["max-early-data"] = med ws_opts["early-data-header-name"] = q.get("eh", "Sec-WebSocket-Protocol") elif network == "httpupgrade": ws_opts["v2ray-http-upgrade-fast-open"] = True if "eh" in q and q["eh"]: ws_opts["early-data-header-name"] = q["eh"] except Exception: pass proxy["ws-opts"] = ws_opts elif network == "grpc": proxy["grpc-opts"] = { "grpc-service-name": query.get("serviceName", "") } if tls_mode == "reality": proxy["reality-opts"] = { "public-key": query.get("pbk", "") } if query.get("sid"): proxy["reality-opts"]["short-id"] = query.get("sid", "") proxy["client-fingerprint"] = query.get("fp", "chrome") alpn = query.get("alpn", "") if alpn: proxy["alpn"] = alpn.split(",") if tls_mode.endswith("tls"): proxy["client-fingerprint"] = query.get("fp", "chrome") alpn = query.get("alpn", "") if alpn: proxy["alpn"] = alpn.split(",") proxies.append(proxy) except Exception as e: if not skip_exception: raise ValueError(f"VLESS parse error: {e}") from e elif scheme == "trojan": try: parsed = urlparse(line) query = dict(parse_qsl(parsed.query)) name = Converter.unique_name(names, unquote(parsed.fragment or f"{parsed.hostname}:{parsed.port}")) trojan = { "name": name, "type": "trojan", "server": parsed.hostname, "port": parsed.port or 443, "password": parsed.username or "", "udp": True, } # skip-cert-verify try: trojan["skip-cert-verify"] = Converter.strtobool(query.get("allowInsecure", "0")) except ValueError: trojan["skip-cert-verify"] = False # optional fields if "sni" in query: trojan["sni"] = query["sni"] alpn = query.get("alpn", "") if alpn: trojan["alpn"] = alpn.split(",") network = query.get("type", "").lower() if network: trojan["network"] = network if network == "ws": headers = {"User-Agent": Converter.user_agent} trojan["ws-opts"] = { "path": query.get("path", "/"), "headers": headers } elif network == "grpc": trojan["grpc-opts"] = { "grpc-service-name": query.get("serviceName", "") } fp = query.get("fp", "") trojan["client-fingerprint"] = fp if fp else "chrome" proxies.append(trojan) except Exception as e: if not skip_exception: raise ValueError(f"Trojan parse error: {e}") from e elif scheme == "hysteria": try: parsed = urlparse(line) query = dict(parse_qsl(parsed.query)) name = Converter.unique_name(names, unquote(parsed.fragment or f"{parsed.hostname}:{parsed.port}")) hysteria = { "name": name, "type": "hysteria", "server": parsed.hostname, "port": parsed.port, "auth_str": parsed.username or query.get("auth", ""), "obfs": query.get("obfs", ""), "sni": query.get("peer", ""), "protocol": query.get("protocol", "") } up = query.get("up", "") down = query.get("down", "") if not up: up = query.get("upmbps", "") if not down: down = query.get("downmbps", "") hysteria["up"] = up hysteria["down"] = down # alpn split alpn = query.get("alpn", "") if alpn: hysteria["alpn"] = alpn.split(",") # skip-cert-verify try: hysteria["skip-cert-verify"] = Converter.strtobool(query.get("insecure", "false")) except ValueError: hysteria["skip-cert-verify"] = False proxies.append(hysteria) except Exception as e: if not skip_exception: raise ValueError(f"Hysteria parse error: {e}") from e elif scheme in ("socks", "socks5", "socks5h"): try: parsed = urlparse(line) server = parsed.hostname port = parsed.port username = parsed.username or "" password = parsed.password or "" name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) proxy = { "name": name, "type": "socks5", "server": server, "port": port, "username": username, "password": password, "udp": True } proxies.append(proxy) except Exception as e: if not skip_exception: raise ValueError(f"SOCKS5 parse error: {e}") from e elif scheme == "ss": try: parsed = urlparse(line) # 兼容 ss://base64 或 ss://base64#name if parsed.fragment: name = Converter.unique_name(names, unquote(parsed.fragment)) else: name = Converter.unique_name(names, "ss") if parsed.port is None: base64_body = body.split("#")[0] parsed = urlparse(f"ss://{Converter.decode_base64(base64_body).decode('utf-8')}") cipher_raw = parsed.username cipher = cipher_raw password = parsed.password if not password: dc_buf = Converter.decode_base64(cipher_raw).decode('utf-8') if dc_buf.startswith("ss://"): dc_buf = dc_buf[len("ss://"):] dc_buf = Converter.decode_base64(dc_buf).decode('utf-8') cipher, password = dc_buf.split(":", 1) server = parsed.hostname port = parsed.port query = dict(parse_qsl(parsed.query)) proxy = { "name": name, "type": "ss", "server": server, "port": port, "cipher": cipher, "password": password, "udp": True } plugin = query.get("plugin") if plugin and ";" in plugin: query_string = "pluginName=" + plugin.replace(";", "&") plugin_info = parse_qs(query_string) plugin_name = plugin_info.get("pluginName", [""])[0] if "obfs" in plugin_name: proxy["plugin"] = "obfs" proxy["plugin-opts"] = { "mode": plugin_info.get("obfs", [""])[0], "host": plugin_info.get("obfs-host", [""])[0], } elif "v2ray-plugin" in plugin_name: proxy["plugin"] = "v2ray-plugin" proxy["plugin-opts"] = { "mode": plugin_info.get("mode", [""])[0], "host": plugin_info.get("host", [""])[0], "path": plugin_info.get("path", [""])[0], "tls": "tls" in plugin, } proxies.append(proxy) except Exception as e: if not skip_exception: raise ValueError(f"SS parse error: {e}") from e elif scheme == "ssr": try: decoded = Converter.decode_base64(body).decode() parts, _, params_str = decoded.partition("/?") host, port, protocol, method, obfs, password_enc = parts.split(":") password = Converter.decode_base64(password_enc).decode() params = parse_qs(params_str) remarks = Converter.decode_base64(params.get("remarks", [""])[0]).decode() obfsparam = Converter.decode_base64(params.get("obfsparam", [""])[0]).decode() protoparam = Converter.decode_base64(params.get("protoparam", [""])[0]).decode() name = Converter.unique_name(names, remarks or f"{host}:{port}") proxy = { "name": name, "type": "ssr", "server": host, "port": port, "cipher": method, "password": password, "obfs": obfs, "protocol": protocol, "udp": True } if obfsparam: proxy["obfs-param"] = obfsparam if protoparam: proxy["protocol-param"] = protoparam proxies.append(proxy) except Exception as e: if not skip_exception: raise ValueError(f"SSR parse error: {e}") from e elif scheme == "tuic": try: parsed = urlparse(line) query = parse_qs(parsed.query) user = parsed.username or "" password = parsed.password or "" server = parsed.hostname port = parsed.port or 443 name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) proxy = { "name": name, "type": "tuic", "server": server, "port": port, "udp": True } if password: proxy["uuid"] = user proxy["password"] = password else: proxy["token"] = user if "congestion_control" in query: proxy["congestion-controller"] = query["congestion_control"][0] if "alpn" in query: proxy["alpn"] = query["alpn"][0].split(",") if "sni" in query: proxy["sni"] = query["sni"][0] if query.get("disable_sni", ["0"])[0] == "1": proxy["disable-sni"] = True if "udp_relay_mode" in query: proxy["udp-relay-mode"] = query["udp_relay_mode"][0] proxies.append(proxy) except Exception as e: if not skip_exception: raise ValueError(f"TUIC parse error: {e}") from e elif scheme == "anytls": try: parsed = urlparse(line) query = parse_qs(parsed.query) username = parsed.username or "" password = parsed.password or username server = parsed.hostname port = parsed.port insecure = query.get("insecure", ["0"])[0] == "1" sni = query.get("sni", [""])[0] fingerprint = query.get("hpkp", [""])[0] name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) proxy = { "name": name, "type": "anytls", "server": server, "port": port, "username": username, "password": password, "sni": sni, "fingerprint": fingerprint, "skip-cert-verify": insecure, "udp": True } proxies.append(proxy) except Exception as e: if not skip_exception: raise ValueError(f"AnyTLS parse error: {e}") from e elif scheme in ("hysteria2", "hy2"): try: parsed = urlparse(line) query = dict(parse_qsl(parsed.query)) password = parsed.username or "" server = parsed.hostname port = parsed.port or 443 name = Converter.unique_name(names, unquote(parsed.fragment or f"{server}:{port}")) proxy = { "name": name, "type": "hysteria2", "server": server, "port": port, "password": password, "obfs": query.get("obfs", ""), "obfs-password": query.get("obfs-password", ""), "sni": query.get("sni", ""), "skip-cert-verify": Converter.strtobool(query.get("insecure", "false")), "down": query.get("down", ""), "up": query.get("up", ""), } if "pinSHA256" in query: proxy["fingerprint"] = query.get("pinSHA256", "") if "alpn" in query: proxy["alpn"] = query["alpn"].split(",") proxies.append(proxy) except Exception as e: if not skip_exception: raise ValueError(f"Hysteria2 parse error: {e}") from e if not proxies: if not skip_exception: raise ValueError("convert v2ray subscribe error: format invalid") return proxies