diff --git a/icons/Mihomo_Meta_A.png b/icons/Mihomo_Meta_A.png new file mode 100755 index 0000000..eacd778 Binary files /dev/null and b/icons/Mihomo_Meta_A.png differ diff --git a/package.v2.json b/package.v2.json index bb3d819..f65a1a1 100644 --- a/package.v2.json +++ b/package.v2.json @@ -435,5 +435,17 @@ "v1.1": "推荐支持IMDB数据源; 优化海报尺寸,减少卡顿", "v1.0": "探索支持IMDb数据源" } + }, + "ClashRuleProvider": { + "name": "Clash Rule Provider", + "description": "随时为Clash添加一些额外的规则。", + "labels": "工具", + "version": "0.1.0", + "icon": "Mihomo_Meta_A.png", + "author": "wumode", + "level": 1, + "history": { + "v0.1.0": "新增ClashRuleProvider" + } } } diff --git a/plugins.v2/clashruleprovider/__init__.py b/plugins.v2/clashruleprovider/__init__.py new file mode 100644 index 0000000..cc1605b --- /dev/null +++ b/plugins.v2/clashruleprovider/__init__.py @@ -0,0 +1,624 @@ +import requests +import re +from typing import Any, Optional, List, Dict, Tuple, Union +import time +import yaml +import hashlib +from fastapi import Body, Response +from datetime import datetime, timedelta +import pytz + +from apscheduler.schedulers.background import BackgroundScheduler +from cachetools import cached, TTLCache +from apscheduler.triggers.cron import CronTrigger + +from app.core.config import settings +from app.core.event import eventmanager, Event +from app.log import logger +from app.plugins import _PluginBase +from app.schemas.types import EventType, NotificationType +from app.utils.http import RequestUtils +from app.plugins.clashruleprovider.clash_rule_parser import ClashRuleParser +from app.plugins.clashruleprovider.clash_rule_parser import Action, RuleType, ClashRule, MatchRule, LogicRule + + +class ClashRuleProvider(_PluginBase): + # 插件名称 + plugin_name = "Clash Rule Provider" + # 插件描述 + plugin_desc = "随时为Clash添加一些额外的规则。" + # 插件图标 + plugin_icon = ("https://raw.githubusercontent.com/wumode/MoviePilot-Plugins/" + "refs/heads/imdbsource_assets/icons/Mihomo_Meta_A.png") + # 插件版本 + plugin_version = "0.1.0" + # 插件作者 + plugin_author = "wumode" + # 作者主页 + author_url = "https://github.com/wumode" + # 插件配置项ID前缀 + plugin_config_prefix = "clashruleprovider_" + # 加载顺序 + plugin_order = 99 + # 可使用的用户级别 + auth_level = 1 + + # 插件配置 + # 启用插件 + _enabled = False + _proxy = False + _notify = False + # 订阅链接 + _sub_links = [] + # Clash 面板 URL + _clash_dashboard_url = None + # Clash 面板密钥 + _clash_dashboard_secret = None + # MoviePilot URL + _movie_pilot_url = None + _cron = '' + _timeout = 10 + _retry_times = 3 + _filter_keywords = [] + _auto_update_subscriptions = True + _ruleset_prefix = '📂<-' + + # 插件数据 + _clash_config = None + _top_rules: List[str] = [] + _ruleset_rules: List[str] = [] + _rule_provider: Dict[str, Any] = {} + _subscription_info = {} + _ruleset_names: Dict[str, str] = {} + + # protected variables + _clash_rule_parser = None + _ruleset_rule_parser = None + _custom_rule_sets = None + _scheduler: Optional[BackgroundScheduler] = None + + def init_plugin(self, config: dict = None): + self._clash_config = self.get_data("clash_config") + self._ruleset_rules = self.get_data("ruleset_rules") + self._top_rules = self.get_data("top_rules") + self._subscription_info = self.get_data("subscription_info") or \ + {"download": 0, "upload": 0, "total": 0, "expire": 0, "last_update": 0} + self._rule_provider = self.get_data("rule_provider") or {} + self._ruleset_names = self.get_data("ruleset_names") or {} + if config: + self._enabled = config.get("enabled") + self._proxy = config.get("proxy") + self._notify = config.get("notify"), + self._sub_links = config.get("sub_links") + self._clash_dashboard_url = config.get("clash_dashboard_url") + self._clash_dashboard_secret = config.get("clash_dashboard_secret") + self._movie_pilot_url = config.get("movie_pilot_url") + if self._movie_pilot_url[-1] == '/': + self._movie_pilot_url = self._movie_pilot_url[:-1] + self._cron = config.get("cron_string") + self._timeout = config.get("timeout") + self._retry_times = config.get("retry_times") + self._filter_keywords = config.get("filter_keywords") + self._ruleset_prefix = config.get("ruleset_prefix", "Custom_") + self._auto_update_subscriptions = config.get("auto_update_subscriptions") + self._clash_rule_parser = ClashRuleParser() + self._ruleset_rule_parser = ClashRuleParser() + if self._enabled: + self.__parse_config() + self._scheduler = BackgroundScheduler(timezone=settings.TZ) + self._scheduler.start() + + def get_state(self) -> bool: + return self._enabled + + @staticmethod + def get_command() -> List[Dict[str, Any]]: + pass + + def get_api(self) -> List[Dict[str, Any]]: + return [ + { + "path": "/connectivity", + "endpoint": self.test_connectivity, + "methods": ["POST"], + "auth": "bear", + "summary": "测试连接", + "description": "测试连接" + }, + { + "path": "/clash_outbound", + "endpoint": self.get_clash_outbound, + "methods": ["GET"], + "auth": "bear", + "summary": "clash outbound", + "description": "clash outbound" + }, + { + "path": "/status", + "endpoint": self.get_status, + "methods": ["GET"], + "auth": "bear", + "summary": "stated", + "description": "state" + }, + { + "path": "/rules", + "endpoint": self.get_rules, + "methods": ["GET"], + "auth": "bear", + "summary": "clash rules", + "description": "clash rules" + }, + { + "path": "/rules", + "endpoint": self.update_rules, + "methods": ["PUT"], + "auth": "bear", + "summary": "clash rules", + "description": "clash rules" + }, + { + "path": "/reorder-rules", + "endpoint": self.reorder_rules, + "methods": ["PUT"], + "auth": "bear", + "summary": "clash rules", + "description": "clash rules" + }, + { + "path": "/rule", + "endpoint": self.update_rule, + "methods": ["PUT"], + "auth": "bear", + "summary": "clash rules", + "description": "clash rules" + }, + { + "path": "/rule", + "endpoint": self.add_rule, + "methods": ["POSt"], + "auth": "bear", + "summary": "clash rules", + "description": "clash rules" + }, + { + "path": "/rule", + "endpoint": self.delete_rule, + "methods": ["DELETE"], + "auth": "bear", + "summary": "clash rules", + "description": "clash rules" + }, + { + "path": "/subscription", + "endpoint": self.get_subscription, + "methods": ["GET"], + "auth": "bear", + "summary": "clash rules", + "description": "clash rules" + }, + { + "path": "/subscription", + "endpoint": self.update_subscription, + "methods": ["PUT"], + "auth": "bear", + "summary": "update clash rules", + "description": "update clash rules" + }, + { + "path": "/rule_providers", + "endpoint": self.get_rule_providers, + "methods": ["GET"], + "auth": "bear", + "summary": "update rule providers", + "description": "update rule providers" + }, + { + "path": "/ruleset", + "endpoint": self.get_ruleset, + "methods": ["GET"], + "summary": "update rule providers", + "description": "update rule providers" + }, + { + "path": "/config", + "endpoint": self.get_clash_config, + "methods": ["GET"], + "summary": "update rule providers", + "description": "update rule providers" + } + ] + + def get_render_mode(self) -> Tuple[str, str]: + """ + 获取插件渲染模式 + :return: 1、渲染模式,支持:vue/vuetify,默认vuetify + :return: 2、组件路径,默认 dist/assets + """ + return "vue", "dist/assets" + + def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: + """ + 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 + """ + return [], {} + + def get_page(self) -> List[dict]: + return [] + + def stop_service(self): + """ + 退出插件 + """ + pass + + def get_service(self) -> List[Dict[str, Any]]: + if self.get_state() and self._auto_update_subscriptions: + return [{ + "id": "ClashRuleProvider", + "name": "Clash Rule Provider 服务", + "trigger": CronTrigger.from_crontab(self._cron), + "func": self.update_subscription_service, + "kwargs": {} + }] + return [] + + def __update_config(self): + # 保存配置 + self.update_config( + { + "enabled": self._enabled, + "cron": self._cron, + "proxy": self._proxy, + "notify": self._notify, + "sub_links": self._sub_links, + "clash_dashboard_url": self._clash_dashboard_url, + "clash_dashboard_secret": self._clash_dashboard_secret, + "movie_pilot_url": self._movie_pilot_url, + "retry_times": self._retry_times, + "timeout": self._timeout, + }) + + def __save_data(self): + self.__insert_ruleset() + self._top_rules = self._clash_rule_parser.to_string() + self._ruleset_rules = self._ruleset_rule_parser.to_string() + self.save_data('clash_config', self._clash_config) + self.save_data('ruleset_rules', self._ruleset_rules) + self.save_data('top_rules', self._top_rules) + self.save_data('subscription_info', self._subscription_info) + self.save_data('ruleset_names', self._ruleset_names) + self.save_data('rule_provider', self._rule_provider) + + def __parse_config(self): + if not self._top_rules: + return + self._clash_rule_parser.parse_rules_from_list(self._top_rules) + if not self._ruleset_rules: + return + self._ruleset_rule_parser.parse_rules_from_list(self._ruleset_rules) + + def test_connectivity(self, params: Dict[str, Any]) -> Dict[str, Any]: + if not self._enabled: + return {"success": False, "message": ""} + if not params.get('clash_dashboard_url') or not params.get('clash_dashboard_secret')\ + or not params.get('sub_link'): + return {"success": False, "message": "missing params"} + clash_version_url = f"{params.get('clash_dashboard_url')}/version" + ret = RequestUtils(accept_type="application/json", + headers={"authorization": f"Bearer {params.get('clash_dashboard_secret')}"} + ).get(clash_version_url) + if not ret: + return {"success": False, "message": "无法连接到Clash"} + ret = RequestUtils(accept_type="text/html", + proxies=settings.PROXY if self._proxy else None + ).get(params.get('sub_link')) + if not ret: + return {"success": False, "message": f"Unable to get {params.get('sub_link')}"} + return {"success": True, "message": "测试连接成功"} + + def get_ruleset(self, name): + if not self._ruleset_names.get(name): + return None + name = self._ruleset_names.get(name) + rules = self.__get_ruleset(name) + # if rules or ruleset in self._rule_provider: + # self._rule_provider[ruleset] = rules + res = yaml.dump({"payload": rules}, allow_unicode=True) + return Response(content=res, media_type="text/yaml") + + def get_clash_outbound(self): + outbound = self.clash_outbound(self._clash_config) + return {"success": True, "message": None, "data": {"outbound": outbound}} + + def get_status(self): + return {"success": True, "message": "", + "data": {"state": self._enabled, + "ruleset_prefix": self._ruleset_prefix, + "clash": {"rule_size": len(self._clash_config.get("rules", []))}, + "subscription_info": self._subscription_info, + "sub_url": f"{self._movie_pilot_url}/api/v1/plugin/ClashRuleProvider/config?" + f"apikey={settings.API_TOKEN}"}} + + def get_clash_config(self): + config = self.clash_config() + if not config: + return {"success": False, "message": ""} + res = yaml.dump(config, allow_unicode=True) + headers = {'Subscription-Userinfo': f'upload={self._subscription_info["upload"]}; ' + f'download={self._subscription_info["download"]}; ' + f'total={self._subscription_info["total"]}; ' + f'expire={self._subscription_info["expire"]}'} + return Response(headers=headers, content=res, media_type="text/yaml") + + def get_rules(self, rule_type: str) -> Dict[str, Any]: + if rule_type == 'ruleset': + return {"success": True, "message": None, "data": {"rules": self._ruleset_rule_parser.to_dict()}} + return {"success": True, "message": None, "data": {"rules": self._clash_rule_parser.to_dict()}} + + def delete_rule(self, params: dict = Body(...)): + if not self._enabled: + return {"success": False, "message": ""} + if params.get('type') == 'ruleset': + res = self.delete_rule_by_priority(params.get('priority'), self._ruleset_rule_parser) + if res: + self.__add_notification_job(f"{self._ruleset_prefix}{res.action.value if isinstance(res.action, Action) else res.action}") + else: + res = self.delete_rule_by_priority(params.get('priority'), self._clash_rule_parser) + return {"success": res, "message": None} + + def reorder_rules(self, params: Dict[str, Any]): + if not self._enabled: + return {"success": False, "message": ""} + moved_priority = params.get('moved_priority') + target_priority = params.get('target_priority') + try: + if params.get('type') == 'ruleset': + self.__reorder_rules(self._ruleset_rule_parser, moved_priority, target_priority) + self.__add_notification_job(f"{self._ruleset_prefix}{params.get('rule_data').get('action')}") + else: + self.__reorder_rules(self._clash_rule_parser, moved_priority, target_priority) + except Exception as e: + return {"success": False, "message": str(e)} + return {"success": True, "message": None} + + def update_rules(self, params: Dict[str, Any]): + if not self._enabled: + return {"success": False, "message": ""} + if params.get('type') == 'ruleset': + self.__update_rules(params.get('rules'), self._ruleset_rule_parser) + else: + self.__update_rules(params.get('rules'), self._clash_rule_parser) + return {"success": True, "message": None} + + def update_rule(self, params: Dict[str, Any]) -> Dict[str, Any]: + if not self._enabled: + return {"success": False, "message": ""} + if params.get('type') == 'ruleset': + res = self.update_rule_by_priority(params.get('rule_data'), self._ruleset_rule_parser) + if res: + self.__add_notification_job(f"{self._ruleset_prefix}{params.get('rule_data').get('action')}") + else: + res = self.update_rule_by_priority(params.get('rule_data'), self._clash_rule_parser) + return {"success": bool(res), "message": None} + + def add_rule(self, params: Dict[str, Any]) -> Dict[str, Any]: + if not self._enabled: + return {"success": False, "message": ""} + if params.get('type') == 'ruleset': + res = self.add_rule_by_priority(params.get('rule_data'), self._ruleset_rule_parser) + if res: + self.__add_notification_job(f"{self._ruleset_prefix}{params.get('rule_data').get('action')}") + else: + res = self.add_rule_by_priority(params.get('rule_data'), self._clash_rule_parser) + return {"success": bool(res), "message": None} + + def get_subscription(self): + if not self._sub_links: + return None + return {"success": True, "message": None, "data": {"url": self._sub_links[0]}} + + def update_subscription(self, params: Dict[str, Any]): + if not self._enabled: + return {"success": False, "message": ""} + url = params.get('url') + if not url: + return {"success": False, "message": "missing params"} + res = self.update_subscription_service() + if not res: + return {"success": True, "message": f"订阅链接 {self._sub_links[0]} 更新失败"} + return {"success": True, "message": "订阅更新成功"} + + def get_rule_providers(self): + return {"success": True, "message": None, "data": self.rule_providers()} + + @staticmethod + def clash_outbound(clash_config: Dict[str, Any]) -> Optional[List]: + if not clash_config: + return [] + outbound = [{'name': proxy_group.get("name")} for proxy_group in clash_config.get("proxy-groups")] + outbound.extend([{'name': proxy.get("name")} for proxy in clash_config.get("proxies")]) + return outbound + + def rule_providers(self) -> Optional[Dict[str, Any]]: + if not self._clash_config: + return None + rule_providers = {} + for key, value in self._clash_config.get("rule-providers", {}): + if value.get("path", '').startwith("./CRP/"): + continue + rule_providers[key] = value + return rule_providers + + def __update_rules(self, rules: List[Dict[str, Any]], rule_parser: ClashRuleParser): + rule_parser.rules = [] + for rule in rules: + clash_rule = ClashRuleParser.parse_rule_dict(rule) + rule_parser.insert_rule_at_priority(clash_rule, rule.get("priority")) + self.__save_data() + + def __reorder_rules(self, rule_parser: ClashRuleParser, moved_priority, target_priority): + rule_parser.reorder_rules(moved_priority, target_priority) + self.__save_data() + + def __get_ruleset(self, ruleset: str) -> List[str]: + if ruleset.startswith(self._ruleset_prefix): + action = ruleset[len(self._ruleset_prefix):] + else: + return [] + try: + action_enum = Action(action.upper()) + final_action = action_enum + except ValueError: + final_action = action + rules = self._ruleset_rule_parser.filter_rules_by_action(final_action) + res = [] + for rule in rules: + res.append(rule.condition_string()) + return res + + def __insert_ruleset(self): + outbounds = [] + for rule in self._ruleset_rule_parser.rules: + action_str = f"{rule.action.value}" if isinstance(rule.action, Action) else rule.action + if action_str not in outbounds: + outbounds.append(action_str) + self._clash_rule_parser.remove_rules(lambda r: r.rule_type == RuleType.RULE_SET and + r.payload.startswith(self._ruleset_prefix)) + for outbound in outbounds: + clash_rule = ClashRuleParser.parse_rule_line(f"RULE-SET,{self._ruleset_prefix}{outbound},{outbound}") + if not self._clash_rule_parser.has_rule(clash_rule): + self._clash_rule_parser.insert_rule_at_priority(clash_rule, 0) + + def update_rule_by_priority(self, rule: Dict[str, Any], rule_parser: ClashRuleParser) -> bool: + if not isinstance(rule.get("priority"), int): + return False + clash_rule = ClashRuleParser.parse_rule_dict(rule) + if not clash_rule: + return False + res = rule_parser.update_rule_at_priority(clash_rule, rule.get("priority")) + self.__save_data() + return res + + def add_rule_by_priority(self, rule: Dict[str, Any], rule_parser: ClashRuleParser) -> bool: + if not isinstance(rule.get("priority"), int): + return False + try: + clash_rule = self._clash_rule_parser.parse_rule_dict(rule) + except ValueError: + logger.warn(f"无效的输入规则: {rule}") + return False + if not clash_rule: + return False + rule_parser.insert_rule_at_priority(clash_rule, rule.get("priority")) + self.__save_data() + return True + + def delete_rule_by_priority(self, priority: int, rule_parser: ClashRuleParser + ) -> Optional[Union[ClashRule, LogicRule, MatchRule]]: + if not isinstance(priority, int): + return None + res = rule_parser.remove_rule_at_priority(priority) + self.__save_data() + return res + + @eventmanager.register(EventType.PluginAction) + def update_subscription_service(self) -> bool: + if not self._sub_links: + return False + url = self._sub_links[0] + ret = RequestUtils(accept_type="text/html", + proxies=settings.PROXY if self._proxy else None + ).get_res(url) + if not ret: + return False + try: + rs = yaml.load(ret.content, Loader=yaml.FullLoader) + self._clash_config = self.__remove_nodes_by_keywords(rs) + except Exception as e: + logger.error(f"解析配置出错: {e}") + return False + if 'Subscription-Userinfo' in ret.headers: + matches = re.findall(r'(\w+)=(\d+)', ret.headers['Subscription-Userinfo']) + variables = {key: int(value) for key, value in matches} + self._subscription_info['download'] = variables['download'] + self._subscription_info['upload'] = variables['upload'] + self._subscription_info['total'] = variables['total'] + self._subscription_info['expire'] = variables['expire'] + self._subscription_info["last_update"] = int(time.time()) + self.save_data('subscription_info', self._subscription_info) + self.save_data('clash_config', self._clash_config) + return True + + def notify_clash(self, ruleset: str): + url = f'{self._clash_dashboard_url}/providers/rules/{ruleset}' + RequestUtils(content_type="application/json", + headers={"authorization": f"Bearer {self._clash_dashboard_secret}"} + ).put(url) + + def __add_notification_job(self, ruleset: str): + if ruleset in self._rule_provider: + self._scheduler.add_job(self.notify_clash, "date", + run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=30), + args=[ruleset], + id='CRP-notify-clash', + replace_existing=True + ) + + def __remove_nodes_by_keywords(self, clash_config: Dict[str, Any]) -> Dict[str, Any]: + removed_proxies = [] + proxies = [] + for proxy in clash_config.get("proxies", []): + has_keywords = bool(len([x for x in self._filter_keywords if x in proxy.get("name", '')])) + if has_keywords: + removed_proxies.append(proxy.get("name")) + else: + proxies.append(proxy) + if proxies: + clash_config["proxies"] = proxies + else: + logger.warn(f"关键词过滤后无可用节点,跳过过滤") + removed_proxies = [] + for proxy_group in clash_config.get("proxy-groups", []): + proxy_group['proxies'] = [x for x in proxy_group.get('proxies') if x not in removed_proxies] + clash_config["proxy-groups"] = [x for x in clash_config.get("proxy-groups", []) if x.get("proxies")] + return clash_config + + def clash_config(self) -> Optional[Dict[str, Any]]: + if not self._clash_config: + return + self.__insert_ruleset() + self._top_rules = self._clash_rule_parser.to_string() + clash_config = self._clash_config.copy() + top_rules = [] + for rule in self._clash_rule_parser.rules: + if (not isinstance(rule.action, Action) and + not len([x for x in self.clash_outbound(clash_config) if rule.action == x.get("name", '')])): + logger.warn(f"出站 {rule.action} 不存在, 绕过 {rule.raw_rule}") + continue + top_rules.append(rule.raw_rule) + clash_config["rules"] = self._top_rules + clash_config.get("rules", []) + self._rule_provider = {} + for r in self._clash_rule_parser.rules: + if r.rule_type == RuleType.RULE_SET and r.payload.startswith(self._ruleset_prefix): + action_str = f"{r.action.value}" if isinstance(r.action, Action) else r.action + path_name = hashlib.sha256(action_str.encode('utf-8')).hexdigest()[:10] + self._ruleset_names[path_name] = r.payload + sub_url = (f"{self._movie_pilot_url}/api/v1/plugin/ClashRuleProvider/ruleset?" + f"name={path_name}&apikey={settings.API_TOKEN}") + self._rule_provider[r.payload] = {"behavior": "classical", + "format": "yaml", + "interval": 3600, + "path": f"./CRP/{path_name}.yaml", + "type": "http", + "url": sub_url} + if clash_config.get("rule-providers"): + clash_config['rule-providers'].update(self._rule_provider) + else: + clash_config['rule-providers'] = self._rule_provider + for key, item in self._ruleset_names.items(): + if item not in clash_config['rule-providers']: + del self._ruleset_names[key] + self.save_data('ruleset_names', self._ruleset_names) + self.save_data('rule_provider', self._rule_provider) + return clash_config diff --git a/plugins.v2/clashruleprovider/clash_rule_parser.py b/plugins.v2/clashruleprovider/clash_rule_parser.py new file mode 100644 index 0000000..071d427 --- /dev/null +++ b/plugins.v2/clashruleprovider/clash_rule_parser.py @@ -0,0 +1,486 @@ +import re +from typing import List, Dict, Any, Optional, Union, Callable +from dataclasses import dataclass +from enum import Enum + + +class RuleType(Enum): + """Enumeration of all supported Clash rule types""" + DOMAIN = "DOMAIN" + DOMAIN_SUFFIX = "DOMAIN-SUFFIX" + DOMAIN_KEYWORD = "DOMAIN-KEYWORD" + DOMAIN_REGEX = "DOMAIN-REGEX" + GEOSITE = "GEOSITE" + + IP_CIDR = "IP-CIDR" + IP_CIDR6 = "IP-CIDR6" + IP_SUFFIX = "IP-SUFFIX" + IP_ASN = "IP-ASN" + GEOIP = "GEOIP" + + SRC_GEOIP = "SRC-GEOIP" + SRC_IP_ASN = "SRC-IP-ASN" + SRC_IP_CIDR = "SRC-IP-CIDR" + SRC_IP_SUFFIX = "SRC-IP-SUFFIX" + + DST_PORT = "DST-PORT" + SRC_PORT = "SRC-PORT" + + IN_PORT = "IN-PORT" + IN_TYPE = "IN-TYPE" + IN_USER = "IN-USER" + IN_NAME = "IN-NAME" + + PROCESS_PATH = "PROCESS-PATH" + PROCESS_PATH_REGEX = "PROCESS-PATH-REGEX" + PROCESS_NAME = "PROCESS-NAME" + PROCESS_NAME_REGEX = "PROCESS-NAME-REGEX" + + UID = "UID" + NETWORK = "NETWORK" + DSCP = "DSCP" + + RULE_SET = "RULE-SET" + AND = "AND" + OR = "OR" + NOT = "NOT" + SUB_RULE = "SUB-RULE" + + MATCH = "MATCH" + + +class Action(Enum): + """Enumeration of rule actions""" + DIRECT = "DIRECT" + REJECT = "REJECT" + REJECT_DROP = "REJECT-DROP" + PASS = "PASS" + COMPATIBLE = "COMPATIBLE" + + +@dataclass +class ClashRule: + """Represents a parsed Clash routing rule""" + rule_type: RuleType + payload: str + action: Union[Action, str] # Can be Action enum or custom proxy group name + additional_params: Optional[List[str]] = None + raw_rule: str = "" + priority: int = 0 + + def __post_init__(self): + if self.additional_params is None: + self.additional_params = [] + + def condition_string(self) -> str: + return f"{self.rule_type.value},{self.payload}" + + +@dataclass +class LogicRule: + """Represents a logic rule (AND, OR, NOT)""" + logic_type: RuleType + conditions: List[Union[ClashRule, 'LogicRule']] + action: Union[Action, str] + raw_rule: str = "" + priority: int = 0 + + def condition_string(self) -> str: + conditions_str = ','.join([f"({c.condition_string()})" for c in self.conditions]) + return f"{self.logic_type.value},({conditions_str})" + + +@dataclass +class MatchRule: + """Represents a match rule""" + action: Union[Action, str] + raw_rule: str = "" + priority: int = 0 + rule_type: RuleType = RuleType.MATCH + + @staticmethod + def condition_string() -> str: + return "MATCH" + + +class ClashRuleParser: + """Parser for Clash routing rules""" + + def __init__(self): + self.rules: List[Union[ClashRule, LogicRule, MatchRule]] = [] + + @staticmethod + def parse_rule_line(line: str) -> Optional[Union[ClashRule, LogicRule, MatchRule]]: + """Parse a single rule line""" + line = line.strip() + try: + # Handle logic rules (AND, OR, NOT) + + if line.startswith(('AND,', 'OR,', 'NOT,')): + return ClashRuleParser._parse_logic_rule(line) + elif line.startswith('MATCH'): + return ClashRuleParser._parse_match_rule(line) + # Handle regular rules + return ClashRuleParser._parse_regular_rule(line) + + except Exception as e: + print(f"Error parsing rule '{line}': {e}") + return None + + @staticmethod + def parse_rule_dict(clash_rule: Dict[str, Any]) -> Optional[Union[ClashRule, LogicRule, MatchRule]]: + if not clash_rule: + return None + if clash_rule.get("type") in ('AND', 'OR', 'NOT'): + conditions = clash_rule.get("conditions") + if not conditions: + return None + conditions_str = '' + for condition in conditions: + conditions_str += f'({condition.get("type")},{condition.get("payload")})' + conditions_str = f"({conditions_str})" + raw_rule = f"{clash_rule.get('type')},{conditions_str},{clash_rule.get('action')}" + return ClashRuleParser._parse_logic_rule(raw_rule) + elif clash_rule.get("type") == 'MATCH': + raw_rule = f"{clash_rule.get('type')},{clash_rule.get('action')}" + return ClashRuleParser._parse_match_rule(raw_rule) + else: + raw_rule = f"{clash_rule.get('type')},{clash_rule.get('payload')},{clash_rule.get('action')}" + return ClashRuleParser._parse_regular_rule(raw_rule) + + @staticmethod + def _parse_match_rule(line: str) -> MatchRule: + parts = line.split(',') + if len(parts) < 2: + raise ValueError(f"Invalid rule format: {line}") + action = parts[1] + # Validate rule type + try: + action_enum = Action(action.upper()) + final_action = action_enum + except ValueError: + final_action = action + + return MatchRule( + action=final_action, + raw_rule=line + ) + + @staticmethod + def _parse_regular_rule(line: str) -> ClashRule: + """Parse a regular (non-logic) rule""" + parts = line.split(',') + + if len(parts) < 3: + raise ValueError(f"Invalid rule format: {line}") + + rule_type_str = parts[0].upper() + payload = parts[1] + action = parts[2] + + if not payload or not rule_type_str: + raise ValueError(f"Invalid rule format: {line}") + + additional_params = parts[3:] if len(parts) > 3 else [] + + # Validate rule type + try: + rule_type = RuleType(rule_type_str) + except ValueError: + raise ValueError(f"Unknown rule type: {rule_type_str}") + + # Try to convert action to enum, otherwise keep as string (custom proxy group) + try: + action_enum = Action(action.upper()) + final_action = action_enum + except ValueError: + final_action = action + + return ClashRule( + rule_type=rule_type, + payload=payload, + action=final_action, + additional_params=additional_params, + raw_rule=line + ) + + @staticmethod + def _parse_logic_rule(line: str) -> LogicRule: + """Parse a logic rule (AND, OR, NOT)""" + # Extract logic type + logic_rule_match = re.match(r'^(AND|OR|NOT),\((.+)\),([^,]+)$', line) + if not logic_rule_match: + raise ValueError(f"Cannot extract action from logic rule: {line}") + logic_type_str = logic_rule_match.group(1).upper() + logic_type = RuleType(logic_type_str) + action = logic_rule_match.group(3) + # Try to convert action to enum + try: + action_enum = Action(action.upper()) + final_action = action_enum + except ValueError: + final_action = action + conditions_str = logic_rule_match.group(2) + conditions = ClashRuleParser._parse_logic_conditions(conditions_str) + + return LogicRule( + logic_type=logic_type, + conditions=conditions, + action=final_action, + raw_rule=line + ) + + @staticmethod + def _parse_logic_conditions(conditions_str: str) -> List[ClashRule]: + """Parse conditions within logic rules""" + conditions = [] + + # Simple parser for conditions like (DOMAIN,baidu.com),(NETWORK,UDP) + # This is a basic implementation - more complex nested logic would need a proper parser + condition_pattern = r'\(([^,]+),([^)]+)\)' + matches = re.findall(condition_pattern, conditions_str) + + for rule_type_str, payload in matches: + try: + rule_type = RuleType(rule_type_str.upper()) + condition = ClashRule( + rule_type=rule_type, + payload=payload, + action="", # Logic conditions don't have actions + raw_rule=f"{rule_type_str},{payload}" + ) + conditions.append(condition) + except ValueError: + print(f"Unknown rule type in logic condition: {rule_type_str}") + + return conditions + + def parse_rules(self, rules_text: str) -> List[Union[ClashRule, LogicRule, MatchRule]]: + """Parse multiple rules from text, preserving order and priority""" + self.rules = [] + lines = rules_text.strip().split('\n') + priority = 0 + + for line in lines: + rule = self.parse_rule_line(line) + if rule: + rule.priority = priority # Assign priority based on position + self.rules.append(rule) + priority += 1 + + return self.rules + + def parse_rules_from_list(self, rules_list: List[str]) -> List[Union[ClashRule, LogicRule, MatchRule]]: + """Parse rules from a list of rule strings, preserving order and priority""" + self.rules = [] + + for priority, rule_str in enumerate(rules_list): + rule = self.parse_rule_line(rule_str) + if rule: + rule.priority = priority # Assign priority based on list position + self.rules.append(rule) + + return self.rules + + def validate_rule(self, rule: ClashRule) -> bool: + """Validate a parsed rule""" + try: + # Basic validation based on rule type + if rule.rule_type in [RuleType.IP_CIDR, RuleType.IP_CIDR6]: + # Validate CIDR format + return '/' in rule.payload + + elif rule.rule_type == RuleType.DST_PORT or rule.rule_type == RuleType.SRC_PORT: + # Validate port number/range + return rule.payload.isdigit() or '-' in rule.payload + + elif rule.rule_type == RuleType.NETWORK: + # Validate network type + return rule.payload.lower() in ['tcp', 'udp'] + + elif rule.rule_type == RuleType.DOMAIN_REGEX or rule.rule_type == RuleType.PROCESS_PATH_REGEX: + # Try to compile regex + re.compile(rule.payload) + return True + + return True + + except Exception: + return False + + def to_string(self) -> List[str]: + result = [] + for rule in self.rules: + result.append(rule.raw_rule) + return result + + def to_dict(self) -> List[Dict[str, Any]]: + """Convert parsed rules to dictionary format""" + result = [] + + for rule in self.rules: + if isinstance(rule, ClashRule): + rule_dict = { + 'type': rule.rule_type.value, + 'payload': rule.payload, + 'action': rule.action.value if isinstance(rule.action, Action) else rule.action, + 'additional_params': rule.additional_params, + 'priority': rule.priority, + 'raw': rule.raw_rule + } + result.append(rule_dict) + + elif isinstance(rule, LogicRule): + conditions_dict = [] + for condition in rule.conditions: + if isinstance(condition, ClashRule): + conditions_dict.append({ + 'type': condition.rule_type.value, + 'payload': condition.payload + }) + + rule_dict = { + 'type': rule.logic_type.value, + 'conditions': conditions_dict, + 'action': rule.action.value if isinstance(rule.action, Action) else rule.action, + 'priority': rule.priority, + 'raw': rule.raw_rule + } + result.append(rule_dict) + elif isinstance(rule, MatchRule): + rule_dict = { + 'type': 'MATCH', + 'action': rule.action.value if isinstance(rule.action, Action) else rule.action, + 'priority': rule.priority, + 'raw': rule.raw_rule + } + result.append(rule_dict) + return result + + def get_rules_by_priority(self) -> List[Union[ClashRule, LogicRule, MatchRule]]: + """Get rules sorted by priority (highest priority first)""" + return sorted(self.rules, key=lambda rule: rule.priority) + + def append_rule(self, rule: Union[ClashRule, LogicRule, MatchRule]) -> None: + max_priority = max(rule.priority for rule in self.rules) if len(self.rules) else 0 + rule.priority = max_priority + 1 + self.rules.append(rule) + # Re-sort rules to maintain order + self.rules.sort(key=lambda r: r.priority) + + def insert_rule_at_priority(self, rule: Union[ClashRule, LogicRule, MatchRule], priority: int): + """Insert a rule at a specific priority position, adjusting other rules""" + # Adjust priorities of existing rules + for existing_rule in self.rules: + if existing_rule.priority >= priority: + existing_rule.priority += 1 + rule.priority = priority + self.rules.append(rule) + + # Re-sort rules to maintain order + self.rules.sort(key=lambda r: r.priority) + + def update_rule_at_priority(self, clash_rule: Union[ClashRule, LogicRule], priority: int) -> bool: + for index, existing_rule in enumerate(self.rules): + if existing_rule.priority == priority: + self.rules[index] = clash_rule + self.rules[index].priority = priority + return True + return False + + def remove_rule_at_priority(self, priority: int) -> Optional[Union[ClashRule, LogicRule, MatchRule]]: + """Remove rule at specific priority and adjust remaining priorities""" + rule_to_remove = None + for rule in self.rules: + if rule.priority == priority: + rule_to_remove = rule + break + + if rule_to_remove: + self.rules.remove(rule_to_remove) + + # Adjust priorities of remaining rules + for rule in self.rules: + if rule.priority > priority: + rule.priority -= 1 + + return rule_to_remove + return None + + def remove_rules(self, condition: Callable[[Union[ClashRule, LogicRule, MatchRule]], bool]): + """Remove rules by lambda""" + i = 0 + while i < len(self.rules): + if condition(self.rules[i]): + priority = self.rules[i].priority + for rule in self.rules: + if rule.priority > priority: + rule.priority -= 1 + del self.rules[i] + else: + i += 1 + + def move_rule_priority(self, from_priority: int, to_priority: int) -> bool: + """Move a rule from one priority position to another""" + rule_to_move = None + for rule in self.rules: + if rule.priority == from_priority: + rule_to_move = rule + break + + if not rule_to_move: + return False + + # Remove rule temporarily + self.remove_rule_at_priority(from_priority) + + # Insert at new priority + self.insert_rule_at_priority(rule_to_move, to_priority) + + return True + + def filter_rules_by_type(self, rule_type: RuleType) -> List[ClashRule]: + """Filter rules by type""" + return [rule for rule in self.rules + if isinstance(rule, ClashRule) and rule.rule_type == rule_type] + + def filter_rules_by_action(self, action: Union[Action, str]) -> List[Union[ClashRule, LogicRule, MatchRule]]: + """Filter rules by action""" + return [rule for rule in self.rules if rule.action == action] + + def has_rule(self, clash_rule: Union[ClashRule, LogicRule, MatchRule]) -> bool: + for rule in self.rules: + if rule.rule_type == clash_rule.rule_type and rule.action == clash_rule.action \ + and rule.payload == clash_rule.payload: + return True + return False + + def reorder_rules( + self, + moved_rule_priority: int, + target_priority: int, + ): + """ + Reorder rules + + :param moved_rule_priority: 被移动规则的原始优先级 + :param target_priority: 目标位置的优先级 + """ + moved_index = next(i for i, r in enumerate(self.rules) if r.priority == moved_rule_priority) + target_index = next( + (i for i, r in enumerate(self.rules) if r.priority == target_priority), + len(self.rules) + ) + # 直接修改被移动规则的优先级 + moved_rule = self.rules[moved_index] + moved_rule.priority = target_priority + + if moved_index < target_index: + # 向后移动:原位置到目标位置之间的规则优先级 -1 + for i in range(moved_index + 1, target_index + 1): + self.rules[i].priority -= 1 + elif moved_index > target_index: + # 向前移动:目标位置到原位置之间的规则优先级 +1 + for i in range(target_index, moved_index): + self.rules[i].priority += 1 + self.rules.sort(key=lambda x: x.priority)