mirror of
https://github.com/jxxghp/MoviePilot-Plugins.git
synced 2026-03-27 10:05:57 +00:00
974 lines
43 KiB
Python
974 lines
43 KiB
Python
import asyncio
|
|
import hashlib
|
|
import json
|
|
import pytz
|
|
import re
|
|
import time
|
|
import yaml
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, List, Optional, Tuple, Iterable, TypeVar
|
|
|
|
import jsonpatch
|
|
from fastapi import HTTPException
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
|
|
from app.core.cache import cached
|
|
from app.core.config import settings
|
|
from app.log import logger
|
|
from app.utils.http import AsyncRequestUtils
|
|
|
|
from .base import Constant
|
|
from .helper.clashruleparser import ClashRuleParser, RoutingRuleType, Action
|
|
from .helper.configconverter import Converter
|
|
from .helper.utilsprovider import UtilsProvider
|
|
from .models import ProxyGroup, Proxy, RuleProvider, RuleProviderData, ProxyData, HostData, VehicleType, SelectGroup, \
|
|
ProxyGroupData, RuleItem, RuleData, Metadata, RuleProviders
|
|
from .models.api import ClashApi, SubscriptionSetting, DataUsage, SubscriptionInfo, ConfigRequest
|
|
from .models.configuration import ClashConfig
|
|
from .models.datapatch import PatchItem, DataPatch
|
|
from .models.rule import RuleType
|
|
from .models.types import DataSource, DataKey, RuleSet, ClashKey, SupportsPatch
|
|
from .state import PluginState
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
class ClashRuleProviderService:
|
|
|
|
def __init__(
|
|
self, plugin_id: str,
|
|
state: PluginState,
|
|
scheduler: Optional[AsyncIOScheduler] = None
|
|
):
|
|
self.plugin_id = plugin_id
|
|
self.state = state
|
|
self.scheduler = scheduler
|
|
|
|
def save_rules(self):
|
|
self.state.save_data(DataKey.TOP_RULES, self.state.top_rules_manager.export_rules())
|
|
self.state.save_data(DataKey.RULESET_RULES, self.state.ruleset_rules_manager.export_rules())
|
|
|
|
def load_rules(self):
|
|
self.state.top_rules_manager.import_rules(self.state.get_data(DataKey.TOP_RULES) or [])
|
|
self.state.ruleset_rules_manager.import_rules(self.state.get_data(DataKey.RULESET_RULES) or [])
|
|
|
|
def _make_proxy_patch(self, src: Proxy, dst: Proxy):
|
|
src_dict = src.model_dump(mode="json", by_alias=True, exclude_none=True)
|
|
dst_dict = dst.model_dump(mode="json", by_alias=True, exclude_none=True)
|
|
patch = jsonpatch.make_patch(src_dict, dst_dict)
|
|
patches = self.state.proxy_patch
|
|
patches[src.name] = PatchItem(patch=patch.to_string(), lifecycle=Constant.PATCH_LIFESPAN)
|
|
self.state.proxy_patch = patches
|
|
|
|
def _apply_patch(self, item: SupportsPatch[T], name: str, patch: DataPatch) -> T:
|
|
try:
|
|
if name in patch:
|
|
return item.patch(patch[name].patch)
|
|
except Exception as err:
|
|
logger.error(f"Failed to apply patch for {name}: {repr(err)}")
|
|
return item
|
|
|
|
def _apply_patches(self, items: list[Any], patches: DataPatch) -> list[Any]:
|
|
for item in items:
|
|
item.data = self._apply_patch(item.data, item.name, patches)
|
|
item.meta.patched = item.name in patches
|
|
return items
|
|
|
|
def _apply_patch_to_config(self, conf: ClashConfig) -> ClashConfig:
|
|
conf.proxies = [self._apply_patch(proxy, proxy.name, self.state.proxy_patch) for proxy in conf.proxies]
|
|
conf.proxy_groups = [self._apply_patch(pg, pg.name, self.state.proxy_group_patch) for pg in conf.proxy_groups]
|
|
return conf
|
|
|
|
def _merge_subscriptions(self, config: ClashConfig):
|
|
subscriptions_config = self.state.config.subscriptions_config
|
|
subscription_info = self.state.subscription_info
|
|
|
|
for conf in subscriptions_config:
|
|
if not subscription_info.get(conf.url).enabled:
|
|
continue
|
|
sub_config = self.state.get_sub_config(conf.url)
|
|
config.merge(sub_config)
|
|
|
|
def _filter_available_items(self, items: Iterable[Any], param: ConfigRequest) -> list[Any]:
|
|
return [item.data for item in items if item.meta.available(param)]
|
|
|
|
def _process_auto_rule_providers(self, config: ClashConfig):
|
|
auto_rule_provider = {}
|
|
ruleset_names = self.state.ruleset_names
|
|
|
|
for r in self.state.ruleset_rules_manager.rules:
|
|
rule = r.rule
|
|
rule_provider_name = f'{self.state.config.ruleset_prefix}{rule.action}'
|
|
if rule_provider_name not in auto_rule_provider:
|
|
path_name = hashlib.sha256(rule.action.encode('utf-8')).hexdigest()[:10]
|
|
ruleset_names[path_name] = rule_provider_name
|
|
sub_url = (f"{self.state.config.movie_pilot_url}/api/v1/plugin/{self.plugin_id}/ruleset?"
|
|
f"name={path_name}&apikey={self.state.config.apikey or settings.API_TOKEN}")
|
|
auto_rule_provider[rule_provider_name] = RuleProvider(
|
|
type=VehicleType.HTTP, behavior="classical", url=sub_url, path=f"./CRP/{path_name}.yaml",
|
|
interval=3600, format="yaml"
|
|
)
|
|
config.rule_providers = config.rule_providers | auto_rule_provider
|
|
self.state.rule_provider = auto_rule_provider
|
|
self.state.ruleset_names = ruleset_names
|
|
|
|
def _process_rules(self, config: ClashConfig, param: ConfigRequest):
|
|
top_rules: list[RuleType] = []
|
|
acl4ssr_providers_map: dict[str, RuleProvider] = {}
|
|
acl4ssr_data = self.state.acl4ssr_providers
|
|
|
|
for r in self.state.top_rules_manager:
|
|
if not r.meta.available(param):
|
|
continue
|
|
rule = r.rule
|
|
if rule.rule_type == RoutingRuleType.RULE_SET:
|
|
if rule.payload in acl4ssr_data:
|
|
acl4ssr_providers_map[rule.payload] = acl4ssr_data.get(rule.payload).data
|
|
top_rules.append(rule)
|
|
config.rule_providers = config.rule_providers | acl4ssr_providers_map
|
|
config.rules = top_rules + config.rules
|
|
|
|
def _cleanup_ruleset_names(self, config: ClashConfig):
|
|
ruleset_names = self.state.ruleset_names
|
|
key_to_delete = [key for key, item in ruleset_names.items() if item not in config.rule_providers]
|
|
for key in key_to_delete:
|
|
del ruleset_names[key]
|
|
self.state.ruleset_names = ruleset_names
|
|
|
|
def _check_cycles(self, config: ClashConfig):
|
|
proxy_graph = self._build_graph(config)
|
|
cycles = UtilsProvider.find_cycles(proxy_graph)
|
|
if cycles:
|
|
logger.warn("发现代理组回环:")
|
|
for cycle in cycles:
|
|
logger.warn(" -> ".join(cycle))
|
|
|
|
def _make_proxy_group_patch(self, src: ProxyGroup, dst: ProxyGroup):
|
|
src_dict = src.model_dump(mode="json", by_alias=True, exclude_none=True)
|
|
dst_dict = dst.model_dump(mode="json", by_alias=True, exclude_none=True)
|
|
patch = jsonpatch.make_patch(src_dict, dst_dict)
|
|
|
|
# Flatten list patches to full replace to avoid index shift issues
|
|
new_ops = []
|
|
replaced_paths = set()
|
|
list_fields = ["/proxies", "/use"]
|
|
|
|
for op in patch.patch:
|
|
path = op["path"]
|
|
matched_list = next((f for f in list_fields if path == f or path.startswith(f + '/')), None)
|
|
if matched_list:
|
|
if matched_list not in replaced_paths:
|
|
field_name = matched_list.strip('/')
|
|
val = dst_dict.get(field_name)
|
|
if val is None:
|
|
# Removed in dst
|
|
new_ops.append({"op": "remove", "path": matched_list})
|
|
elif field_name not in src_dict:
|
|
# Not in src, added in dst
|
|
new_ops.append({"op": "add", "path": matched_list, "value": val})
|
|
else:
|
|
# In src and dst, replacing
|
|
new_ops.append({"op": "replace", "path": matched_list, "value": val})
|
|
replaced_paths.add(matched_list)
|
|
else:
|
|
new_ops.append(op)
|
|
|
|
patch.patch = new_ops
|
|
pg_patches = self.state.proxy_group_patch
|
|
pg_patches[src.name] = PatchItem(patch=patch.to_string(), lifecycle=Constant.PATCH_LIFESPAN)
|
|
self.state.proxy_group_patch = pg_patches
|
|
|
|
def organize_and_save_rules(self):
|
|
self.sync_ruleset()
|
|
self.save_rules()
|
|
|
|
def ruleset(self, ruleset: str) -> List[str]:
|
|
if not ruleset.startswith(self.state.config.ruleset_prefix):
|
|
return []
|
|
action = ruleset[len(self.state.config.ruleset_prefix):]
|
|
try:
|
|
final_action = Action(action.upper())
|
|
except ValueError:
|
|
final_action = action
|
|
rules = self.state.ruleset_rules_manager.filter_rules_by_action(final_action)
|
|
return [rule.rule.condition_string() for rule in rules if rule.meta.available()]
|
|
|
|
def sync_ruleset(self):
|
|
outbounds = set()
|
|
new_outbounds = set()
|
|
manager = self.state.top_rules_manager
|
|
|
|
manager.remove_rules_by_lambda(
|
|
lambda r: r.rule.rule_type == RoutingRuleType.RULE_SET and
|
|
r.meta.source == DataSource.AUTO and
|
|
r.rule.payload != f"{self.state.config.ruleset_prefix}{r.rule.action}"
|
|
)
|
|
rules_existed = manager.filter_rules_by_condition(
|
|
lambda r: r.meta.source == DataSource.AUTO and r.rule.rule_type == RoutingRuleType.RULE_SET
|
|
)
|
|
actions_existed = {r.rule.action for r in rules_existed}
|
|
|
|
for r in self.state.ruleset_rules_manager:
|
|
if r.meta.disabled:
|
|
continue
|
|
outbounds.add(r.rule.action)
|
|
if r.rule.action not in actions_existed:
|
|
new_outbounds.add(r.rule.action)
|
|
|
|
manager.remove_rules_by_lambda(
|
|
lambda r: r.rule.rule_type == RoutingRuleType.RULE_SET and
|
|
r.meta.source == DataSource.AUTO and
|
|
(r.rule.action not in outbounds)
|
|
)
|
|
|
|
for outbound in new_outbounds:
|
|
clash_rule = ClashRuleParser.parse_rule_line(
|
|
f"RULE-SET,{self.state.config.ruleset_prefix}{outbound},{outbound}")
|
|
if clash_rule is None:
|
|
continue
|
|
rule = RuleItem(rule=clash_rule, meta=Metadata(source=DataSource.AUTO))
|
|
if not manager.has_rule_item(rule):
|
|
manager.insert_rule_at_priority(rule, 0)
|
|
|
|
def append_top_rules(self, rules: List[str]):
|
|
clash_rules = []
|
|
for rule in rules:
|
|
clash_rule = ClashRuleParser.parse_rule_line(rule)
|
|
if clash_rule:
|
|
clash_rules.append(RuleItem(rule=clash_rule, meta=Metadata(source=DataSource.MANUAL)))
|
|
self.state.top_rules_manager.append_rules(clash_rules)
|
|
self.state.save_data(DataKey.TOP_RULES, self.state.top_rules_manager.export_rules())
|
|
|
|
def clash_outbound(self) -> list[str]:
|
|
outbound = [pg.data.name for pg in self.state.proxy_groups]
|
|
if self.state.clash_template:
|
|
outbound.extend(pg.name for pg in self.state.clash_template.proxy_groups)
|
|
if self.state.config.group_by_region or self.state.config.group_by_country:
|
|
outbound.extend(pg.name for pg in self.proxy_groups_by_region())
|
|
outbound.extend(pg_data.data.name for pg_data in self.state.proxy_groups_from_subs())
|
|
outbound.extend(pg.data.name for pg in self.get_proxies())
|
|
return outbound
|
|
|
|
def delete_proxy(self, name: str) -> Tuple[bool, str]:
|
|
proxies = self.state.proxies
|
|
deleted = proxies.pop(name)
|
|
if deleted:
|
|
self.state.proxies = proxies
|
|
return True, "代理删除成功"
|
|
return False, f"代理 {name!r} 不存在"
|
|
|
|
def delete_proxy_patch(self, name: str) -> tuple[bool, str]:
|
|
patches = self.state.proxy_patch
|
|
if name in patches:
|
|
del patches.root[name]
|
|
self.state.proxy_patch = patches
|
|
return True, "补丁已删除"
|
|
return False, "补丁不存在"
|
|
|
|
def import_proxies(self, vehicle: str, payload: str) -> tuple[bool, str]:
|
|
proxies = []
|
|
if vehicle == 'LINK':
|
|
links = payload.strip().splitlines()
|
|
proxies = list(Converter().convert_v2ray(links, skip_exception=True, logger=logger).items())
|
|
elif vehicle == 'YAML':
|
|
try:
|
|
imported = yaml.load(payload, Loader=yaml.SafeLoader)
|
|
if not isinstance(imported, dict):
|
|
return False, "无效的输入"
|
|
except yaml.YAMLError as err:
|
|
logger.error(f"Failed to import rules: {repr(err)}")
|
|
return False, 'YAML 格式错误'
|
|
proxies = [(None, p) for p in (imported.get(DataKey.PROXIES) or [])]
|
|
if not proxies:
|
|
return False, "无可用节点"
|
|
success_count = 0
|
|
error_messages = ''
|
|
success = True
|
|
ps = self.state.proxies
|
|
for item in proxies:
|
|
try:
|
|
proxy = Proxy.model_validate(item[1])
|
|
meta = Metadata(source=DataSource.MANUAL)
|
|
pd = ProxyData(data=proxy, name=proxy.name, meta=meta, raw=item[0], v2ray_link=item[0])
|
|
if not pd.v2ray_link:
|
|
try:
|
|
pd.v2ray_link = Converter.convert_to_share_link(item[1])
|
|
except Exception as err:
|
|
logger.debug(f"Failed to convert proxy link: {repr(err)}")
|
|
ps.add(pd)
|
|
success_count += 1
|
|
except Exception as err:
|
|
success = False
|
|
error_messages += f"{err}\n"
|
|
message = f"导入 {success_count}/{len(proxies)} 个代理节点. \n{error_messages}"
|
|
self.state.proxies = ps
|
|
return success, message
|
|
|
|
def update_proxy(self, previous_name: str, source: str, proxy: Proxy) -> tuple[bool, str]:
|
|
if source == DataSource.MANUAL:
|
|
proxies = self.state.proxies
|
|
proxies.update(previous_name, ProxyData(data=proxy, name=proxy.name, meta=Metadata()))
|
|
self.state.proxies = proxies
|
|
return True, "代理更新成功"
|
|
if previous_name != proxy.name:
|
|
return False, "请勿修改代理名称"
|
|
proxies = list(self.state.proxies_from_subs())
|
|
src = next((g for g in proxies if g.name == previous_name), None)
|
|
if src is None:
|
|
return False, f"代理组 {previous_name!r} ({source}) 不存在"
|
|
self._make_proxy_patch(src.data, proxy)
|
|
return True, "代理更新成功"
|
|
|
|
def update_proxy_meta(self, name: str, meta: Metadata) -> tuple[bool, str]:
|
|
proxies = self.state.proxies
|
|
if name not in proxies:
|
|
return False, f"The proxy name {name} does not exist"
|
|
proxies.set_meta(name, meta)
|
|
self.state.proxies = proxies
|
|
return True, ''
|
|
|
|
def get_proxies(self, patched: bool = True) -> list[ProxyData]:
|
|
proxies = self.state.all_proxies
|
|
proxies = list(
|
|
filter(lambda p: not any(keyword in p.data.name for keyword in self.state.config.filter_keywords), proxies)
|
|
)
|
|
if not patched:
|
|
return proxies
|
|
return self._apply_patches(proxies, self.state.proxy_patch)
|
|
|
|
@cached(maxsize=1, ttl=86400, skip_empty=True)
|
|
def _get_countries_data(self) -> List[Dict[str, str]]:
|
|
file_path = settings.ROOT_PATH / 'app' / 'plugins' / self.plugin_id.lower() / 'countries.json'
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"加载国家/地区文件错误:{e}")
|
|
return []
|
|
|
|
def proxy_groups_by_region(self) -> list[ProxyGroupData]:
|
|
countries = self._get_countries_data()
|
|
proxies = self.get_proxies()
|
|
return self._group_by_region(
|
|
countries, proxies, self.state.config.group_by_region, self.state.config.group_by_country
|
|
)
|
|
|
|
@cached(maxsize=2, ttl=86400)
|
|
def _group_by_region(self, countries: list[dict[str, str]], proxies: list[ProxyData], group_by_continent: bool,
|
|
group_by_country: bool) -> list[ProxyGroupData]:
|
|
continent_groups = {}
|
|
country_groups = {}
|
|
continent_map = {
|
|
'欧洲': 'Europe', '亚洲': 'Asia', '大洋洲': 'Oceania', '非洲': 'Africa',
|
|
'北美洲': 'NorthAmerica', '南美洲': 'SouthAmerica'
|
|
}
|
|
proxy_groups: list[ProxyGroup] = []
|
|
hk = next((c for c in countries if c['abbr'] == 'HK'), {})
|
|
tw = next((c for c in countries if c['abbr'] == 'TW'), {})
|
|
|
|
for proxy_data in proxies:
|
|
proxy_node = proxy_data.data
|
|
country = ClashRuleProviderService._country_from_node(countries, proxy_node.name)
|
|
if not country:
|
|
continue
|
|
if country.get("abbr") == "CN":
|
|
if any(key in proxy_node.name for key in ("🇭🇰", "HK", "香港")):
|
|
country = hk
|
|
if any(key in proxy_node.name for key in ("🇹🇼", "TW", "台湾")):
|
|
country = tw
|
|
continent = continent_map.get(country["continent"])
|
|
if continent and group_by_continent:
|
|
continent_groups.setdefault(continent, []).append(proxy_node.name)
|
|
if group_by_country:
|
|
country_groups.setdefault(f"{country.get('emoji')} {country.get('chinese')}", []).append(
|
|
proxy_node.name)
|
|
for continent, nodes in continent_groups.items():
|
|
if nodes:
|
|
proxy_groups.append(ProxyGroup(root=SelectGroup(name=continent, proxies=nodes)))
|
|
|
|
excluded = ('中国', '香港', 'CN', 'HK', '🇨🇳', '🇭🇰')
|
|
for continent_node in continent_groups.get('Asia', []):
|
|
if any(x in continent_node for x in excluded):
|
|
continue
|
|
continent_groups.setdefault('AsiaExceptChina', []).append(continent_node)
|
|
if continent_groups.get('AsiaExceptChina'):
|
|
pg = SelectGroup(name="AsiaExceptChina", proxies=continent_groups['AsiaExceptChina'])
|
|
proxy_groups.append(ProxyGroup(root=pg))
|
|
for country, nodes in country_groups.items():
|
|
if len(nodes):
|
|
proxy_groups.append(ProxyGroup(root=SelectGroup(name=country, proxies=nodes)))
|
|
country_group = list(country_groups.keys())
|
|
if country_group:
|
|
proxy_groups.append(ProxyGroup(root=SelectGroup(name="🏴☠️国家分组", proxies=country_group)))
|
|
ret = [ProxyGroupData(name=p.name, data=p, meta=Metadata(source=DataSource.AUTO)) for p in proxy_groups]
|
|
return ret
|
|
|
|
@staticmethod
|
|
def _country_from_node(countries: List[Dict[str, str]], node_name: str) -> Optional[Dict[str, str]]:
|
|
node_name_lower = node_name.lower()
|
|
for country in countries:
|
|
if country.get('emoji') and country['emoji'] in node_name:
|
|
return country
|
|
if (
|
|
(country.get('chinese') and country['chinese'] in node_name) or
|
|
(country.get('english') and country['english'].lower() in node_name_lower)
|
|
):
|
|
return country
|
|
return None
|
|
|
|
@staticmethod
|
|
def _build_graph(config: ClashConfig) -> Dict[str, Any]:
|
|
"""构建代理组有向图"""
|
|
graph = {}
|
|
groups = config.proxy_groups
|
|
group_names = {g.name for g in groups}
|
|
for group in groups:
|
|
proxies = group.proxies
|
|
graph[group.name] = [p for p in proxies if p in group_names]
|
|
return graph
|
|
|
|
async def fetch_clash_data(self, endpoint: str) -> Dict:
|
|
headers = {"Authorization": f"Bearer {self.state.config.dashboard_secret}"}
|
|
url = f"{self.state.config.dashboard_url}/{endpoint}"
|
|
response = await AsyncRequestUtils().get_json(url, headers=headers, timeout=10)
|
|
if response is None:
|
|
raise HTTPException(status_code=502, detail=f"Failed to fetch {endpoint}")
|
|
return response
|
|
|
|
def get_subscription_user_info(self) -> DataUsage:
|
|
sub_info = DataUsage()
|
|
for info in self.state.subscription_info.root.values():
|
|
sub_info.upload += info.upload
|
|
sub_info.download += info.download
|
|
sub_info.total += info.total
|
|
sub_info.expire = max(sub_info.expire, info.expire)
|
|
return sub_info
|
|
|
|
@staticmethod
|
|
async def async_notify_clash(ruleset: str, api_url: str, api_secret: str):
|
|
"""
|
|
通知 Clash 刷新规则集
|
|
"""
|
|
logger.info(f"正在刷新 [{ruleset}] {api_url} ...")
|
|
url = f'{api_url}/providers/rules/{ruleset}'
|
|
resp = await AsyncRequestUtils(content_type="application/json",
|
|
headers={"authorization": f"Bearer {api_secret}"}
|
|
).put_res(url)
|
|
if resp and resp.status_code == 204:
|
|
logger.info(f"[{ruleset}] {api_url} 刷新完成")
|
|
else:
|
|
logger.warn(f"[{ruleset}] {api_url} 刷新失败")
|
|
|
|
def add_notification_job(self, ruleset_names: List[str]):
|
|
if not self.state.config.enabled or not self.scheduler:
|
|
return
|
|
for ruleset in ruleset_names:
|
|
if ruleset in self.state.rule_provider:
|
|
self.scheduler.add_job(
|
|
ClashRuleProviderService.async_notify_clash, "date",
|
|
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) +
|
|
timedelta(seconds=self.state.config.refresh_delay),
|
|
args=(ruleset, self.state.config.dashboard_url,
|
|
self.state.config.dashboard_secret),
|
|
id=f'CRP-notify-clash{ruleset}', replace_existing=True,
|
|
misfire_grace_time=Constant.MISFIRE_GRACE_TIME
|
|
)
|
|
|
|
def build_clash_config(self, param: ConfigRequest) -> ClashConfig | None:
|
|
if not self.state.clash_template:
|
|
config = ClashConfig()
|
|
else:
|
|
config = self.state.clash_template.model_copy(deep=True)
|
|
|
|
# Merge subscriptions
|
|
self._merge_subscriptions(config)
|
|
|
|
# Add proxies
|
|
config.proxies += self._filter_available_items(self.state.proxies, param)
|
|
config.proxies = list(
|
|
filter(lambda p: not any(kw in p.name for kw in self.state.config.filter_keywords), config.proxies)
|
|
)
|
|
# Add proxy groups
|
|
config.proxy_groups += self._filter_available_items(self.state.proxy_groups, param)
|
|
|
|
# Add region groups
|
|
if self.state.config.group_by_region or self.state.config.group_by_country:
|
|
config.proxy_groups += [pg.data for pg in self.proxy_groups_by_region()]
|
|
|
|
# Add rule providers (Load once)
|
|
current_rule_providers = self.state.rule_providers
|
|
rule_providers = {}
|
|
for rp_data in current_rule_providers:
|
|
if rp_data.meta.available(param):
|
|
rule_providers[rp_data.name] = rp_data.data
|
|
config.rule_providers = config.rule_providers | rule_providers
|
|
|
|
# Apply patches
|
|
config = self._apply_patch_to_config(config)
|
|
|
|
# Sync and add auto rule providers
|
|
self.sync_ruleset()
|
|
self._process_auto_rule_providers(config)
|
|
|
|
# Add rules (including ACL4SSR)
|
|
self._process_rules(config, param)
|
|
|
|
# Add Hosts
|
|
hosts = self.state.hosts.to_dict(self.state.config.best_cf_ip)
|
|
if hosts:
|
|
config.hosts = config.hosts or {}
|
|
config.hosts = config.hosts | hosts
|
|
|
|
# Cleanup ruleset names
|
|
self._cleanup_ruleset_names(config)
|
|
|
|
# Cycle check
|
|
self._check_cycles(config)
|
|
|
|
return config
|
|
|
|
def delete_proxy_group(self, name: str) -> Tuple[bool, str]:
|
|
"""
|
|
Deletes a proxy group by name and saves the state.
|
|
Returns True if a group was deleted, False otherwise.
|
|
"""
|
|
pgs = self.state.proxy_groups
|
|
deleted = pgs.pop(name)
|
|
if deleted:
|
|
self.state.proxy_groups = pgs
|
|
return True, "代理组删除成功"
|
|
return False, f"代理组 {name!r} 不存在"
|
|
|
|
def delete_proxy_group_patch(self, name: str) -> tuple[bool, str]:
|
|
patches = self.state.proxy_group_patch
|
|
if name in patches:
|
|
del patches.root[name]
|
|
self.state.proxy_group_patch = patches
|
|
return True, "补丁已删除"
|
|
return False, "补丁不存在"
|
|
|
|
def update_proxy_group_meta(self, name: str, meta: Metadata) -> tuple[bool, str]:
|
|
pgs = self.state.proxy_groups
|
|
res = pgs.set_meta(name, meta)
|
|
if res:
|
|
self.state.proxy_groups = pgs
|
|
return True, ""
|
|
return False, f"代理组 {name!r} 不存在"
|
|
|
|
def add_proxy_group(self, proxy_group: ProxyGroup) -> tuple[bool, str]:
|
|
"""
|
|
Adds a new proxy group, saves the state, and returns status.
|
|
"""
|
|
try:
|
|
pgs = self.state.proxy_groups
|
|
pgs.add(ProxyGroupData(data=proxy_group, name=proxy_group.name, meta=Metadata(source=DataSource.MANUAL)))
|
|
self.state.proxy_groups = pgs
|
|
except Exception as e:
|
|
logger.error(f"Failed to add proxy group: {repr(e)}")
|
|
return False, "代理组添加失败"
|
|
return True, "代理组添加成功"
|
|
|
|
def get_proxy_groups(self, patched = True) -> list[ProxyGroupData]:
|
|
pgs = self.state.all_proxy_groups
|
|
pgs += self.proxy_groups_by_region()
|
|
if not patched:
|
|
return pgs
|
|
return self._apply_patches(pgs, self.state.proxy_group_patch)
|
|
|
|
def update_proxy_group(self, previous_name: str, source: str, proxy_group: ProxyGroup) -> tuple[bool, str]:
|
|
if source == DataSource.MANUAL:
|
|
pgs = self.state.proxy_groups
|
|
pgs.update(previous_name, ProxyGroupData(data=proxy_group, name=proxy_group.name, meta=Metadata()))
|
|
self.state.proxy_groups = pgs
|
|
return True, "代理组更新成功"
|
|
if previous_name != proxy_group.name:
|
|
return False, "请勿修改代理组名称"
|
|
pgs = self.proxy_groups_by_region()
|
|
src = next((g for g in pgs if g.name == previous_name), None)
|
|
if src is None:
|
|
return False, f"代理组 {previous_name!r} ({source}) 不存在"
|
|
self._make_proxy_group_patch(src.data, proxy_group)
|
|
return True, "代理组更新成功"
|
|
|
|
def update_rule_provider(self, name: str, rule_provider: RuleProviderData) -> Tuple[bool, str]:
|
|
"""
|
|
Updates a rule provider.
|
|
"""
|
|
rps = self.state.rule_providers
|
|
if name not in rps:
|
|
return False, f"规则集 {name!r} 不存在"
|
|
rps.update(name, rule_provider)
|
|
self.state.rule_providers = rps
|
|
return True, "规则集更新成功"
|
|
|
|
def update_rule_providers_meta(self, name: str, meta: Metadata) -> tuple[bool, str]:
|
|
rps = self.state.rule_providers
|
|
if name in rps:
|
|
res = rps.set_meta(name, meta)
|
|
if res:
|
|
self.state.rule_providers = rps
|
|
return True, ""
|
|
|
|
arps = self.state.acl4ssr_providers
|
|
if name in arps:
|
|
res = arps.set_meta(name, meta)
|
|
if res:
|
|
self.state.acl4ssr_providers = arps
|
|
return True, ""
|
|
return False, f"规则集 {name!r} 不存在"
|
|
|
|
def update_rule_meta(self, rule_type: RuleSet, priority: int, meta: Metadata) -> tuple[bool, str]:
|
|
manager = self.state.get_rule_manager(rule_type)
|
|
rule = manager.get_rule_at_priority(priority)
|
|
if not rule:
|
|
return False, "规则不存在"
|
|
res = manager.update_rule_meta_at_priority(priority, meta)
|
|
if res:
|
|
if rule_type == RuleSet.RULESET:
|
|
self.add_notification_job([f"{self.state.config.ruleset_prefix}{rule.rule.action}"])
|
|
self.organize_and_save_rules()
|
|
return True, ""
|
|
return False, "更新规则元数据失败"
|
|
|
|
def delete_rule_provider(self, name: str) -> tuple[bool, str]:
|
|
rps = self.state.rule_providers
|
|
deleted = rps.pop(name)
|
|
if deleted:
|
|
self.state.rule_providers = rps
|
|
return True, f"规则集删除成功"
|
|
return False, f"规则集 {name!r} 不存在"
|
|
|
|
def add_rule_provider(self, name: str, rule_provider: RuleProvider) -> tuple[bool, str]:
|
|
try:
|
|
rps = self.state.rule_providers
|
|
rps.add(RuleProviderData(data=rule_provider, name=name, meta=Metadata(source=DataSource.MANUAL)))
|
|
self.state.rule_providers = rps
|
|
except Exception as e:
|
|
logger.error(f"Failed to add rule provider: {repr(e)}")
|
|
return False, "规则集添加失败"
|
|
return True, "规则集添加成功"
|
|
|
|
async def test_connectivity(self, clash_apis: List[ClashApi], sub_links: List[str]) -> Tuple[bool, str]:
|
|
tasks = []
|
|
urls = []
|
|
for d in clash_apis:
|
|
headers = {"authorization": f"Bearer {d.secret}"}
|
|
url = f"{d.url}/version"
|
|
task = asyncio.create_task(
|
|
AsyncRequestUtils(accept_type="application/json", headers=headers, timeout=5).get_res(url)
|
|
)
|
|
urls.append(url)
|
|
tasks.append(task)
|
|
for sub_link in sub_links:
|
|
task = asyncio.create_task(
|
|
AsyncRequestUtils(
|
|
accept_type="text/html", proxies=settings.PROXY if self.state.config.proxy else None,
|
|
timeout=5).get(sub_link)
|
|
)
|
|
urls.append(sub_link)
|
|
tasks.append(task)
|
|
results = await asyncio.gather(*tasks)
|
|
for i, result in enumerate(results):
|
|
if not result:
|
|
return False, f"无法连接到 {urls[i]}"
|
|
return True, ""
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
data = {
|
|
"state": self.state.config.enabled,
|
|
"ruleset_prefix": self.state.config.ruleset_prefix,
|
|
"preset_identifiers": self.state.config.identifiers,
|
|
"best_cf_ip": self.state.config.best_cf_ip,
|
|
"geoRules": self.state.geo_rules,
|
|
"subscription_info": self.state.subscription_info,
|
|
"sub_url": f"{self.state.config.movie_pilot_url}/api/v1/plugin/{self.plugin_id}/config?"
|
|
f"apikey={self.state.config.apikey or settings.API_TOKEN}"
|
|
}
|
|
return data
|
|
|
|
def get_rules(self, ruleset: RuleSet) -> list[RuleData]:
|
|
manager = self.state.get_rule_manager(ruleset)
|
|
return manager.to_list()
|
|
|
|
def reorder_rules(self, rule_type: RuleSet, moved_priority: int, target_priority: int) -> tuple[bool, str]:
|
|
manager = self.state.get_rule_manager(rule_type)
|
|
try:
|
|
rule = manager.reorder_rules(moved_priority, target_priority)
|
|
if rule_type == RuleSet.RULESET:
|
|
self.add_notification_job(
|
|
[f"{self.state.config.ruleset_prefix}{rule.rule.action}"])
|
|
except Exception as e:
|
|
logger.info(f"Failed to reorder rules: {repr(e)}")
|
|
return False, "规则移动失败"
|
|
self.organize_and_save_rules()
|
|
return True, ""
|
|
|
|
def update_rule(self, rule_type: RuleSet, priority: int, rule_data: RuleData) -> tuple[bool, str]:
|
|
try:
|
|
dst_priority = rule_data.priority
|
|
src_priority = priority
|
|
clash_rule = ClashRuleParser.parse_rule_dict(rule_data.model_dump(mode='json', exclude_none=True))
|
|
if not clash_rule:
|
|
return False, f"无效的规则: {rule_data!r}"
|
|
manager = self.state.get_rule_manager(rule_type)
|
|
original_rule = manager.get_rule_at_priority(src_priority)
|
|
meta = Metadata(source=original_rule.meta.source, time_modified=time.time())
|
|
rule_item = RuleItem(rule=clash_rule, meta=meta)
|
|
if rule_type == RuleSet.RULESET:
|
|
res = manager.update_rule_at_priority(rule_item, src_priority, dst_priority)
|
|
if res:
|
|
ruleset_to_notify = [f"{self.state.config.ruleset_prefix}{clash_rule.action}"]
|
|
if rule_data.action != original_rule.rule.action:
|
|
ruleset_to_notify.append(f"{self.state.config.ruleset_prefix}{original_rule.rule.action}")
|
|
self.add_notification_job(ruleset_to_notify)
|
|
else:
|
|
res = manager.update_rule_at_priority(rule_item, src_priority, dst_priority)
|
|
except Exception as err:
|
|
logger.info(f"Failed to update rules: {repr(err)}")
|
|
return False, "更新规则出错"
|
|
self.organize_and_save_rules()
|
|
return res, ""
|
|
|
|
def add_rule(self, rule_type: RuleSet, rule_data: RuleData) -> tuple[bool, str]:
|
|
try:
|
|
priority = rule_data.priority
|
|
clash_rule = ClashRuleParser.parse_rule_dict(rule_data.model_dump(mode='json', exclude_none=True))
|
|
if not clash_rule:
|
|
return False, f"无效的输入规则: {rule_data.model_dump(mode='json', exclude_none=True)}"
|
|
meta = Metadata(source=DataSource.MANUAL, time_modified=time.time())
|
|
rule_item = RuleItem(rule=clash_rule, meta=meta)
|
|
if rule_type == RuleSet.RULESET:
|
|
self.state.ruleset_rules_manager.insert_rule_at_priority(rule_item, priority)
|
|
self.add_notification_job([f"{self.state.config.ruleset_prefix}{clash_rule.action}"])
|
|
else:
|
|
self.state.top_rules_manager.insert_rule_at_priority(rule_item, priority)
|
|
except Exception as err:
|
|
logger.info(f"Failed to add rule: {repr(err)}")
|
|
return False, "添加规则出错"
|
|
self.organize_and_save_rules()
|
|
return True, ""
|
|
|
|
def delete_rule(self, ruleset: RuleSet, priority: int):
|
|
manager = self.state.get_rule_manager(ruleset)
|
|
res = manager.remove_rule_at_priority(priority)
|
|
if ruleset == RuleSet.RULESET:
|
|
if res:
|
|
self.add_notification_job([f"{self.state.config.ruleset_prefix}{res.rule.action}"])
|
|
self.organize_and_save_rules()
|
|
|
|
def delete_rules(self, ruleset: RuleSet, priorities: list[int]):
|
|
manager = self.state.get_rule_manager(ruleset)
|
|
removed = manager.remove_rules_at_priorities(priorities)
|
|
if ruleset == RuleSet.RULESET:
|
|
if removed:
|
|
actions = {r.rule.action for r in removed}
|
|
self.add_notification_job([f"{self.state.config.ruleset_prefix}{action}" for action in actions])
|
|
self.organize_and_save_rules()
|
|
|
|
def set_rules_status(self, ruleset: RuleSet, priorities: dict[int, bool]):
|
|
manager = self.state.get_rule_manager(ruleset)
|
|
updated = manager.update_rules_at_priorities(priorities)
|
|
if ruleset == RuleSet.RULESET:
|
|
if updated:
|
|
actions = {r.rule.action for r in updated}
|
|
self.add_notification_job([f"{self.state.config.ruleset_prefix}{action}" for action in actions])
|
|
self.organize_and_save_rules()
|
|
|
|
def import_rules(self, vehicle: str, payload: str) -> tuple[bool, str]:
|
|
rules: List[str] = []
|
|
if vehicle == 'YAML':
|
|
try:
|
|
imported_rules = yaml.load(payload, Loader=yaml.SafeLoader)
|
|
if not isinstance(imported_rules, dict):
|
|
return False, "无效的输入"
|
|
except yaml.YAMLError as err:
|
|
logger.error(f"Failed to import rules: {repr(err)}")
|
|
return False, 'YAML 格式错误'
|
|
rules = imported_rules.get(ClashKey.RULES, [])
|
|
self.append_top_rules(rules)
|
|
return True, ""
|
|
|
|
def get_ruleset(self, name: str) -> Optional[str]:
|
|
ruleset_name = self.state.ruleset_names.get(name)
|
|
if ruleset_name is None:
|
|
return None
|
|
rules = self.ruleset(ruleset_name)
|
|
res = yaml.dump({"payload": rules}, allow_unicode=True)
|
|
return res
|
|
|
|
def update_hosts(self, domain: str, host: HostData) -> tuple[bool, str]:
|
|
hosts = self.state.hosts
|
|
hosts.update(domain, host)
|
|
self.state.hosts = hosts
|
|
return True, f"Host for domain {host.domain} updated successfully."
|
|
|
|
def delete_host(self, domain: str) -> tuple[bool, str]:
|
|
hosts = self.state.hosts
|
|
original_len = len(hosts)
|
|
hosts.delete(domain)
|
|
if len(hosts) < original_len:
|
|
self.state.hosts = hosts
|
|
return True, ''
|
|
else:
|
|
return False, f'Host for domain {domain} not found.'
|
|
|
|
async def refresh_subscription(self, url: str) -> Tuple[bool, str]:
|
|
sub_conf = next((conf for conf in self.state.config.subscriptions_config if conf.url == url), None)
|
|
if not sub_conf:
|
|
return False, f"Configuration for {url} not found."
|
|
config, info = await self.async_get_subscription(url)
|
|
if not config:
|
|
return False, f"订阅链接 {url} 更新失败"
|
|
|
|
sub_configs = self.state.sub_configs
|
|
sub_configs[url] = config
|
|
self.state.sub_configs = sub_configs
|
|
|
|
sub_info_map = self.state.subscription_info
|
|
info.enabled = sub_info_map.get(url).enabled
|
|
sub_info_map[url] = info
|
|
self.state.subscription_info = sub_info_map
|
|
return True, "订阅更新成功"
|
|
|
|
def update_subscription_info(self, sub_setting: SubscriptionSetting):
|
|
sub_info = self.state.subscription_info
|
|
sub_info.set(sub_setting)
|
|
self.state.subscription_info = sub_info
|
|
|
|
async def async_get_subscription(self, url: str) -> tuple[ClashConfig | None, SubscriptionInfo | None]:
|
|
if not url:
|
|
return None, None
|
|
logger.info(f"正在刷新 {UtilsProvider.get_url_domain(url)} ...")
|
|
ret = None
|
|
raw_proxies = {}
|
|
for _ in range(self.state.config.retry_times):
|
|
ret = await AsyncRequestUtils(
|
|
accept_type="text/html", timeout=self.state.config.timeout, ua="clash.meta",
|
|
proxies=settings.PROXY if self.state.config.proxy else None
|
|
).get_res(url)
|
|
if ret:
|
|
break
|
|
if not ret:
|
|
logger.warning(f"{UtilsProvider.get_url_domain(url)} 刷新失败.")
|
|
return None, None
|
|
try:
|
|
content = ret.content
|
|
rs = yaml.safe_load(content)
|
|
if isinstance(rs, str):
|
|
proxies = Converter().convert_v2ray(content)
|
|
if not proxies:
|
|
raise ValueError("Unknown content type")
|
|
rs = {
|
|
ClashKey.PROXIES: proxies.values(),
|
|
ClashKey.PROXY_GROUPS: [
|
|
{ClashKey.NAME: "All Proxies", 'type': 'select', 'include-all-proxies': True}
|
|
]
|
|
}
|
|
raw_proxies = {p['name']: link for link, p in proxies.items()}
|
|
if not isinstance(rs, dict):
|
|
raise ValueError("Subscription content is not a valid dictionary.")
|
|
rs: dict[str, Any] = rs
|
|
logger.info(f"已刷新: {UtilsProvider.get_url_domain(url)}. 节点数量: {len(rs.get(ClashKey.PROXIES, []))}")
|
|
conf = ClashConfig.model_validate(rs)
|
|
except Exception as e:
|
|
logger.error(f"解析配置出错: {e}")
|
|
return None, None
|
|
info = {"last_update": int(time.time()), "proxy_num": conf.node_num}
|
|
if 'Subscription-Userinfo' in ret.headers:
|
|
matches = re.findall(r'(\w+)=(\d+)', ret.headers['Subscription-Userinfo'])
|
|
variables = {key: int(value) for key, value in matches}
|
|
info.update(variables)
|
|
sub_info = SubscriptionInfo(**info)
|
|
conf.raw_proxies = raw_proxies
|
|
return conf, sub_info
|
|
|
|
async def async_refresh_subscriptions(self) -> Dict[str, bool]:
|
|
res = {}
|
|
sub_info_map = self.state.subscription_info
|
|
sub_configs_map = self.state.sub_configs
|
|
|
|
for sub_conf in self.state.config.subscriptions_config:
|
|
url = sub_conf.url
|
|
if not sub_info_map.get(url).enabled:
|
|
continue
|
|
conf, sub_info = await self.async_get_subscription(url)
|
|
if not conf:
|
|
res[url] = False
|
|
continue
|
|
sub_info_map[url] = sub_info
|
|
res[url] = True
|
|
sub_configs_map[url] = conf
|
|
self.state.subscription_info = sub_info_map
|
|
self.state.sub_configs = sub_configs_map
|
|
return res
|
|
|
|
async def async_refresh_acl4ssr(self):
|
|
logger.info("正在刷新 ACL4SSR ...")
|
|
paths = ['Clash/Providers', 'Clash/Providers/Ruleset']
|
|
api_url = f"{Constant.ACL4SSR_API}/contents/%s"
|
|
branch = 'master'
|
|
new_providers = []
|
|
names = set()
|
|
for path in paths:
|
|
response = await AsyncRequestUtils().get_res(api_url % path, headers=settings.GITHUB_HEADERS,
|
|
params={'ref': branch})
|
|
if not response:
|
|
continue
|
|
files = response.json()
|
|
yaml_files = [f for f in files if f["type"] == "file" and f[ClashKey.NAME].endswith((".yaml", ".yml"))]
|
|
for f in yaml_files:
|
|
name = f"{self.state.config.acl4ssr_prefix}{f[ClashKey.NAME][:f[ClashKey.NAME].rfind('.')]}"
|
|
if name in names:
|
|
continue
|
|
file_path = f"./ACL4SSR/{f['name']}"
|
|
provider = RuleProvider(
|
|
type=VehicleType.HTTP, path=file_path, url=f["download_url"], interval=600, behavior="classical",
|
|
format="yaml"
|
|
)
|
|
meta = Metadata(source=DataSource.ACL4SSR)
|
|
new_providers.append(RuleProviderData(name=name, data=provider, meta=meta))
|
|
names.add(name)
|
|
|
|
self.state.acl4ssr_providers = RuleProviders.model_validate(new_providers)
|
|
logger.info(f"ACL4SSR 规则集刷新完成. 规则集数量: {len(self.state.acl4ssr_providers)}")
|
|
|
|
async def async_refresh_geo_dat(self):
|
|
logger.info("正在刷新 Geo Rules ...")
|
|
branch = 'meta'
|
|
api_url = f"{Constant.METACUBEX_RULE_DAT_API}/contents/geo"
|
|
resp = await AsyncRequestUtils().get_res(api_url, headers=settings.GITHUB_HEADERS, params={'ref': branch})
|
|
if not resp:
|
|
return
|
|
|
|
geo_rules = self.state.geo_rules
|
|
for path in resp.json():
|
|
if path["type"] == "dir" and path["name"] in geo_rules.model_fields:
|
|
tree_sha = path["sha"]
|
|
url = f"{Constant.METACUBEX_RULE_DAT_API}/git/trees/{tree_sha}"
|
|
res = await AsyncRequestUtils().get_res(url, headers=settings.GITHUB_HEADERS, params={'ref': branch})
|
|
if not res:
|
|
continue
|
|
tree = res.json()
|
|
yaml_files = [item["path"][:item["path"].rfind('.')] for item in tree["tree"] if
|
|
item["type"] == "blob" and item['path'].endswith((".yaml", ".yml"))]
|
|
setattr(geo_rules, path["name"], yaml_files)
|
|
self.state.geo_rules = geo_rules
|
|
logger.info(f"Geo Rules 更新完成. 规则数量: "
|
|
f"geoip({len(self.state.geo_rules.geoip)}), geosite({len(self.state.geo_rules.geosite)})")
|
|
|
|
def check_patch_lifetime(self):
|
|
pp = self.state.proxy_patch
|
|
proxies = self.state.all_proxies
|
|
pp.update_patch({g.name for g in proxies}, lifespan=Constant.PATCH_LIFESPAN)
|
|
self.state.proxy_patch = pp
|
|
|
|
groups = self.proxy_groups_by_region() + self.state.all_proxy_groups
|
|
pgp = self.state.proxy_group_patch
|
|
pgp.update_patch({g.name for g in groups}, lifespan=Constant.PATCH_LIFESPAN)
|
|
self.state.proxy_group_patch = pgp
|
|
|
|
rpp = self.state.rule_provider_patch
|
|
rule_providers = self.state.all_rule_providers
|
|
rpp.update_patch({g.name for g in rule_providers}, lifespan=Constant.PATCH_LIFESPAN)
|
|
self.state.rule_provider_patch = rpp
|