Merge pull request #867 from wumode/imdbsource

This commit is contained in:
jxxghp
2025-08-05 06:52:46 +08:00
committed by GitHub
6 changed files with 770 additions and 175 deletions

View File

@@ -418,28 +418,30 @@
"name": "绕过Trackers",
"description": "提供tracker服务器IP地址列表帮助IPv6连接绕过OpenClash。",
"labels": "工具",
"version": "1.4.1",
"version": "1.4.2",
"icon": "Clash_A.png",
"author": "wumode",
"level": 2,
"history": {
"v1.0": "支持自定义Trackers",
"v1.1": "更新列表后发送通知",
"v1.2": "修复Trackers加载错误",
"v1.3": "新增一些Trackers",
"v1.4.2": "修复插件动作",
"v1.4.1": "修复通知类型错误",
"v1.4": "异步查询DNS",
"v1.4.1": "修复通知类型错误"
"v1.3": "新增一些Trackers",
"v1.2": "修复Trackers加载错误",
"v1.1": "更新列表后发送通知",
"v1.0": "支持自定义Trackers"
}
},
"ImdbSource": {
"name": "IMDb源",
"description": "让探索推荐和媒体识别支持IMDb数据源。",
"labels": "探索",
"version": "1.5.2",
"version": "1.5.3",
"icon": "IMDb_IOS-OSX_App.png",
"author": "wumode",
"level": 1,
"history": {
"v1.5.3": "异步执行; 修复 bugs (主程序版本需要高于 2.6.8)",
"v1.5.2": "修复一些bugs",
"v1.5.1": "改进媒体id转换; 支持二级分类和自定义推荐",
"v1.5.0": "支持媒体识别",
@@ -461,11 +463,12 @@
"name": "Clash Rule Provider",
"description": "随时为Clash添加一些额外的规则。",
"labels": "工具",
"version": "1.3.1",
"version": "1.3.2",
"icon": "Mihomo_Meta_A.png",
"author": "wumode",
"level": 1,
"history": {
"v1.3.2": "注册插件动作",
"v1.3.1": "支持配置 Hosts",
"v1.2.8": "改进导入界面",
"v1.2.7": "修复分享链接解析错误",

View File

@@ -5,6 +5,7 @@
- 即时通知 Clash 刷新规则集合
- 基于 Meta 内核丰富的代理组配置,提供灵活的路由功能
- 支持按大洲分组节点
- GEO 规则输入提示
- 支持 [ACL4SSR](https://github.com/ACL4SSR/ACL4SSR) 规则集合
## 配置说明
@@ -35,4 +36,8 @@
在**高级选项**中启用按大洲分组节点。选择Asia以外的代理组设置`url`: `https://chatgpt.com/` , `expected-status`: `200`
![](https://images2.imgbox.com/e2/37/EoITSfRi_o.jpg)
![](https://images2.imgbox.com/e2/37/EoITSfRi_o.jpg)
### Hosts
如果需要自动更新此处使用的 Cloudflare IP, 可以通过其它[插件](https://github.com/wumode/MoviePilot-Addons)实现。

View File

@@ -20,6 +20,8 @@ from sse_starlette.sse import EventSourceResponse
from app import schemas
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.schemas.types import EventType
from app.log import logger
from app.schemas.types import NotificationType
from app.utils.ip import IpUtils
@@ -38,7 +40,7 @@ class ClashRuleProvider(_PluginBase):
# 插件图标
plugin_icon = "Mihomo_Meta_A.png"
# 插件版本
plugin_version = "1.3.1"
plugin_version = "1.3.2"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -103,6 +105,7 @@ class ClashRuleProvider(_PluginBase):
_geo_rules: Dict[str, List[str]] = {'geoip': [], 'geosite': []}
def init_plugin(self, config: dict = None):
self.stop_service()
self._ruleset_rules = self.get_data("ruleset_rules")
self._top_rules = self.get_data("top_rules")
self._proxy_groups = self.get_data("proxy_groups") or []
@@ -486,7 +489,14 @@ class ClashRuleProvider(_PluginBase):
"""
退出插件
"""
pass
if self._scheduler:
try:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error(f"退出插件失败:{e}")
def get_service(self) -> List[Dict[str, Any]]:
if self.get_state() and self._auto_update_subscriptions and self._sub_links:
@@ -576,10 +586,10 @@ class ClashRuleProvider(_PluginBase):
async def fetch_clash_data(self, endpoint: str) -> Dict:
clash_headers = {"Authorization": f"Bearer {self._clash_dashboard_secret}"}
url = f"{self._clash_dashboard_url}/{endpoint}"
response = await AsyncRequestUtils().get_res(url, headers=clash_headers, timeout=10)
response = await AsyncRequestUtils().get_json(url, headers=clash_headers, timeout=10)
if response is None:
raise HTTPException(status_code=502, detail=f"Failed to fetch {endpoint}")
return response.json()
return response
async def clash_proxy(self, path: str) -> Dict:
return await self.fetch_clash_data(path)
@@ -1329,6 +1339,8 @@ class ClashRuleProvider(_PluginBase):
return None
def __add_notification_job(self, ruleset_names: List[str]):
if not self._enabled or not self._scheduler:
return
for ruleset in ruleset_names:
if ruleset in self._rule_provider:
self._scheduler.add_job(self.notify_clash, "date",
@@ -1509,3 +1521,15 @@ class ClashRuleProvider(_PluginBase):
def best_cf_ipv6(self) -> List[str]:
v6 = [ip for ip in self._best_cf_ip if IpUtils.is_ipv6(ip)]
return v6
@eventmanager.register(EventType.PluginAction)
def update_cloudflare_ips_handler(self, event:Event = None):
event_data = event.event_data
if not event_data or event_data.get("action") != "update_cloudflare_ips":
return
ips = event_data.get("ips")
if isinstance(ips, str):
ips = [ips]
if isinstance(ips, list):
logger.info(f"更新 Cloudflare 优选 IP ...")
self.update_best_cf_ip(ips)

View File

@@ -1,4 +1,4 @@
from typing import Optional, Any, List, Dict, Tuple
from typing import Any, Callable, Coroutine, Dict, Optional, List, Tuple
from datetime import datetime
import re
import urllib.parse
@@ -14,7 +14,7 @@ from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData,
from app.schemas.types import ChainEventType
from app.plugins.imdbsource.imdbhelper import ImdbHelper
from app import schemas
from app.utils.http import RequestUtils
from app.utils.http import AsyncRequestUtils, RequestUtils
from app.schemas.types import MediaType
from app.core.meta import MetaBase
from app.core.context import MediaInfo
@@ -29,7 +29,7 @@ class ImdbSource(_PluginBase):
# 插件图标
plugin_icon = "IMDb_IOS-OSX_App.png"
# 插件版本
plugin_version = "1.5.2"
plugin_version = "1.5.3"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -48,14 +48,14 @@ class ImdbSource(_PluginBase):
_recognize_media: bool = False
_interests: List[str] = []
_component_size: str = 'medium'
_recognition_mode = 'auxiliary'
_recognition_mode: str = 'auxiliary'
# 私有属性
_imdb_helper = None
_cache = {"discover": [], "trending": [], "imdb_top_250": [], "staff_picks": {}}
_img_proxy_prefix = ''
_imdb_helper: Optional[ImdbHelper] = None
_cache: Dict[str, Any] = {"discover": [], "trending": [], "imdb_top_250": [], "staff_picks": {}}
_img_proxy_prefix: str = ''
_scheduler: Optional[BackgroundScheduler] = None
_original_method = None
_original_method: Optional[Callable[..., Coroutine[Any, Any, Optional[MediaInfo]]]] = None
def init_plugin(self, config: dict = None):
@@ -78,6 +78,23 @@ class ImdbSource(_PluginBase):
return plugin_instance.recognize_media(meta, mtype)
return result
async def patched_async_recognize_media(chain_self, meta: MetaBase = None,
mtype: Optional[MediaType] = None,
tmdbid: Optional[int] = None,
doubanid: Optional[str] = None,
bangumiid: Optional[int] = None,
episode_group: Optional[str] = None,
cache: bool = True):
# 调用原始方法
if not plugin_instance._original_method:
return None
result = await plugin_instance._original_method(chain_self, meta, mtype, tmdbid, doubanid, bangumiid,
episode_group, cache)
if result is None and plugin_instance._enabled and plugin_instance._recognize_media:
logger.info(f"通过插件 {plugin_instance.plugin_name} 执行async_recognize_media ...")
return await plugin_instance.async_recognize_media(meta, mtype)
return result
# 给 patch 函数加唯一标记
patched_recognize_media._patched_by = id(self)
# 保存原始方法
@@ -85,6 +102,12 @@ class ImdbSource(_PluginBase):
ChainBase.recognize_media._patched_by == id(self)):
self._original_method = getattr(ChainBase, "recognize_media", None)
patched_async_recognize_media._patched_by = id(self)
# 保存原始方法
if not (hasattr(ChainBase.async_recognize_media, "_patched_by") and
ChainBase.async_recognize_media._patched_by == id(self)):
self._original_async_method = getattr(ChainBase, "async_recognize_media", None)
if config:
self._enabled = config.get("enabled")
self._proxy = config.get("proxy")
@@ -113,12 +136,21 @@ class ImdbSource(_PluginBase):
if not (hasattr(ChainBase.recognize_media, "_patched_by") and
ChainBase.recognize_media._patched_by == id(self)):
ChainBase.recognize_media = patched_recognize_media
# 替换 ChainBase.async_recognize_media
if not (hasattr(ChainBase.async_recognize_media, "_patched_by") and
ChainBase.async_recognize_media._patched_by == id(self)):
ChainBase.async_recognize_media = patched_async_recognize_media
else:
# 恢复 ChainBase.recognize_media
if (hasattr(ChainBase.recognize_media, "_patched_by") and
ChainBase.recognize_media._patched_by == id(self) and
self._original_method):
ChainBase.recognize_media = self._original_method
# 恢复 ChainBase.async_recognize_media
if (hasattr(ChainBase.async_recognize_media, "_patched_by") and
ChainBase.async_recognize_media._patched_by == id(self) and
self._original_async_method):
ChainBase.async_recognize_media = self._original_async_method
else:
self.stop_service()
@@ -712,6 +744,10 @@ class ImdbSource(_PluginBase):
ChainBase.recognize_media._patched_by == id(self) and
self._original_method):
ChainBase.recognize_media = self._original_method
if (hasattr(ChainBase.async_recognize_media, "_patched_by") and
ChainBase.async_recognize_media._patched_by == id(self) and
self._original_async_method):
ChainBase.async_recognize_media = self._original_async_method
def get_module(self) -> Dict[str, Any]:
"""
@@ -723,6 +759,7 @@ class ImdbSource(_PluginBase):
"""
modules = {}
if self._recognize_media and self._recognition_mode == 'hijacking':
modules['async_recognize_media'] = self.async_recognize_media
modules['recognize_media'] = self.recognize_media
return modules
@@ -745,7 +782,7 @@ class ImdbSource(_PluginBase):
if entries:
imdb_items = self._imdb_helper.vertical_list_page_items(
titles=[entry.get('ttconst', '') for entry in entries],
names=[item for entry in entries for item in entry.get("relatedconst", [])],
names=[item for entry in entries for item in entry.get("relatedconst", []) if item.startswith("nm")],
images=[entry.get('rmconst', '') for entry in entries],
)
if not entries or not imdb_items:
@@ -756,9 +793,11 @@ class ImdbSource(_PluginBase):
title = ""
if movie_info.get("titleText"):
title = movie_info.get("titleText", {}).get("text", "")
release_year = 0
release_year = ''
if movie_info.get("releaseYear"):
release_year = movie_info.get("releaseYear", {}).get("year")
if not release_year and movie_info.get("releaseDate"):
release_year = movie_info.get("releaseDate", {}).get("year")
poster_path = None
if movie_info.get("primaryImage"):
primary_image = movie_info.get("primaryImage").get("url")
@@ -773,12 +812,13 @@ class ImdbSource(_PluginBase):
runtime = movie_info.get("runtime").get("seconds")
overview = ''
if movie_info.get("plot"):
overview = movie_info.get("plot").get("plotText").get("plainText")
if movie_info.get("plot", {}).get("plotText"):
overview = movie_info.get("plot").get("plotText").get("plainText")
return schemas.MediaInfo(
type="电影",
title=title,
year=f'{release_year}',
title_year=f"{title} ({release_year})",
title_year=f"{title} ({release_year})" if release_year else title,
mediaid_prefix="imdb",
media_id=str(movie_info.get("id")),
poster_path=poster_path,
@@ -792,9 +832,11 @@ class ImdbSource(_PluginBase):
title = ""
if series_info.get("titleText"):
title = series_info.get("titleText", {}).get("text", "")
release_year = 0
release_year = ''
if series_info.get("releaseYear"):
release_year = series_info.get("releaseYear", {}).get("year")
if not release_year and series_info.get("releaseDate"):
release_year = series_info.get("releaseDate", {}).get("year")
poster_path = None
if series_info.get("primaryImage"):
primary_image = series_info.get("primaryImage").get("url")
@@ -809,7 +851,7 @@ class ImdbSource(_PluginBase):
runtime = series_info.get("runtime").get("seconds")
overview = ''
if series_info.get("plot"):
if series_info.get("plot").get("plotText"):
if series_info.get("plot", {}).get("plotText"):
overview = series_info.get("plot").get("plotText").get("plainText")
release_date_str = '0000-00-00'
if series_info.get("releaseDate"):
@@ -819,7 +861,7 @@ class ImdbSource(_PluginBase):
type="电视剧",
title=title,
year=f'{release_year}',
title_year=f"{title} ({release_year})",
title_year=f"{title} ({release_year})" if release_year else title,
mediaid_prefix="imdb",
media_id=str(series_info.get("id")),
release_date=release_date_str,
@@ -840,7 +882,7 @@ class ImdbSource(_PluginBase):
return True
return False
def trending(self, interest: str, page: int = 1) -> List[schemas.MediaInfo]:
async def trending(self, interest: str, page: int = 1) -> List[schemas.MediaInfo]:
if not self._imdb_helper:
return []
if interest not in self._imdb_helper.interest_id:
@@ -859,7 +901,7 @@ class ImdbSource(_PluginBase):
results.extend(self._cache[interest])
remaining = count - len(results)
self._cache[interest] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
data = await self._imdb_helper.async_advanced_title_search(first_page=first_page,
title_types=title_types,
sort_by="POPULARITY",
sort_order="ASC",
@@ -882,7 +924,7 @@ class ImdbSource(_PluginBase):
res.append(self.__series_to_media(item.get('node').get("title")))
return res
def imdb_top_250(self, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
async def imdb_top_250(self, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
if not self._imdb_helper:
return []
title_types = ("movie",)
@@ -898,12 +940,13 @@ class ImdbSource(_PluginBase):
results.extend(self._cache["imdb_top_250"])
remaining = count - len(results)
self._cache["imdb_top_250"] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_types=title_types,
sort_by="USER_RATING",
sort_order="DESC",
ranked=("TOP_RATED_MOVIES-250",)
)
data = await self._imdb_helper.async_advanced_title_search(
first_page=first_page,
title_types=title_types,
sort_by="USER_RATING",
sort_order="DESC",
ranked=("TOP_RATED_MOVIES-250",)
)
if not data:
new_results = []
else:
@@ -919,7 +962,7 @@ class ImdbSource(_PluginBase):
res.append(self.__movie_to_media(item.get('node').get("title")))
return res
def imdb_trending(self, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
async def imdb_trending(self, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
if not self._imdb_helper:
return []
title_types = ("tvSeries", "tvMiniSeries", "tvShort", 'movie')
@@ -935,11 +978,12 @@ class ImdbSource(_PluginBase):
results.extend(self._cache["discover"])
remaining = count - len(results)
self._cache["discover"] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_types=title_types,
sort_by="POPULARITY",
sort_order="ASC",
)
data = await self._imdb_helper.async_advanced_title_search(
first_page=first_page,
title_types=title_types,
sort_by="POPULARITY",
sort_order="ASC",
)
if not data:
new_results = []
else:
@@ -957,18 +1001,18 @@ class ImdbSource(_PluginBase):
res.append(self.__series_to_media(item.get('node').get("title")))
return res
def imdb_discover(self, mtype: str = "series",
country: str = None,
lang: str = None,
genre: str = None,
sort_by: str = 'POPULARITY',
sort_order: str = 'DESC',
using_rating: bool = False,
user_rating: str = None,
year: str = None,
award: str = None,
ranked_list: str = None,
page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
async def imdb_discover(self, mtype: str = "series",
country: str = None,
lang: str = None,
genre: str = None,
sort_by: str = 'POPULARITY',
sort_order: str = 'DESC',
using_rating: bool = False,
user_rating: str = None,
year: str = None,
award: str = None,
ranked_list: str = None,
page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
if not self._imdb_helper:
return []
@@ -1037,19 +1081,20 @@ class ImdbSource(_PluginBase):
else:
results.extend(self._cache["discover"])
remaining = count - len(results)
self._cache["discover"] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_types=title_type,
genres=genres,
sort_by=sort_by,
sort_order=sort_order,
rating_min=user_rating,
countries=countries,
languages=languages,
release_date_end=release_date_end,
release_date_start=release_date_start,
award_constraint=awards,
ranked=ranked_lists)
self._cache["discover"] = []
data = await self._imdb_helper.async_advanced_title_search(
first_page=first_page,
title_types=title_type,
genres=genres,
sort_by=sort_by,
sort_order=sort_order,
rating_min=user_rating,
countries=countries,
languages=languages,
release_date_end=release_date_end,
release_date_start=release_date_start,
award_constraint=awards,
ranked=ranked_lists)
if not data:
new_results = []
else:
@@ -1682,7 +1727,7 @@ class ImdbSource(_PluginBase):
event_data.extra_sources.append(imdb_source)
@eventmanager.register(ChainEventType.MediaRecognizeConvert)
def media_recognize_covert(self, event: Event) -> Optional[dict]:
async def async_media_recognize_covert(self, event: Event) -> Optional[dict]:
if not self._enabled:
return
event_data: MediaRecognizeConvertEventData = event.event_data
@@ -1693,7 +1738,7 @@ class ImdbSource(_PluginBase):
if not event_data.mediaid.startswith("imdb"):
return
imdb_id = event_data.mediaid[5:]
tmdb_id = ImdbSource.imdb_to_tmdb(imdb_id)
tmdb_id = await self.async_imdb_to_tmdb(imdb_id)
if tmdb_id is not None:
event_data.media_dict["id"] = tmdb_id
@@ -1783,7 +1828,68 @@ class ImdbSource(_PluginBase):
if info:
info = self._imdb_helper.update_info(info.get('id'), info=info) or info
mediainfo = ImdbSource._convert_mediainfo(info)
mediainfo.tmdb_id = ImdbSource.imdb_to_tmdb(info.get('id'), mediainfo)
mediainfo.tmdb_id = self.imdb_to_tmdb(info.get('id'), mediainfo)
cat = ImdbHelper.get_category(info.get('media_type'), info)
mediainfo.set_category(cat)
logger.info(f"{meta.name} IMDb 识别结果:{mediainfo.type.value} "
f"{mediainfo.title_year} "
f"{mediainfo.imdb_id}")
return mediainfo
return None
async def async_recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
**kwargs) -> Optional[MediaInfo]:
"""
异步识别媒体信息
:param meta: 识别的元数据
:param mtype: 识别的媒体类型
:return: 识别的媒体信息,包括剧集信息
"""
if not self._enabled:
return None
if not meta:
return None
elif not meta.name:
logger.warn("识别媒体信息时未提供元数据名称")
return None
else:
if mtype:
meta.type = mtype
info = {}
# 简体名称
zh_name = zhconv.convert(meta.cn_name, 'zh-hans') if meta.cn_name else None
names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
for name in names:
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
logger.info(f"正在识别 {name} ...")
if meta.type == MediaType.UNKNOWN and not meta.year:
info = await self._imdb_helper.async_match_by(name)
else:
if meta.type == MediaType.TV:
info = await self._imdb_helper.async_match(name=name, year=meta.year, mtype=meta.type,
season_year=meta.year,
season_number=meta.begin_season)
if not info:
# 去掉年份再查一次
info = await self._imdb_helper.async_match(name=name, mtype=meta.type)
else:
# 有年份先按电影查
info = await self._imdb_helper.async_match(name=name, year=meta.year, mtype=MediaType.MOVIE)
# 没有再按电视剧查
if not info:
info = await self._imdb_helper.async_match(name=name, year=meta.year, mtype=MediaType.TV)
if not info:
# 去掉年份和类型再查一次
info = await self._imdb_helper.async_match_by(name=name)
if info:
break
if info:
info = await self._imdb_helper.async_update_info(info.get('id'), info=info) or info
mediainfo = ImdbSource._convert_mediainfo(info)
mediainfo.tmdb_id = await self.async_imdb_to_tmdb(info.get('id'), mediainfo)
cat = ImdbHelper.get_category(info.get('media_type'), info)
mediainfo.set_category(cat)
logger.info(f"{meta.name} IMDb 识别结果:{mediainfo.type.value} "
@@ -1832,16 +1938,7 @@ class ImdbSource(_PluginBase):
return mediainfo
@staticmethod
def imdb_to_tmdb(imdb_id: str, media_info: Optional[MediaInfo] = None) -> Optional[int]:
api_key = settings.TMDB_API_KEY
api_url = (
f"https://{settings.TMDB_API_DOMAIN}/3/find/{imdb_id}"
f"?api_key={api_key}&external_source=imdb_id"
)
ret = RequestUtils(accept_type="application/json").get_res(api_url)
if not ret:
return None
data = ret.json()
def _match_results(data: dict, media_info: Optional[MediaInfo] = None) -> Optional[int]:
# 合并两种结果
all_results = []
for key in ["movie_results", "tv_results"]:
@@ -1899,3 +1996,27 @@ class ImdbSource(_PluginBase):
# 最终按人气返回
most_popular = pick_most_popular(filtered)
return most_popular.get("id") if most_popular else None
def imdb_to_tmdb(self, imdb_id: str, media_info: Optional[MediaInfo] = None) -> Optional[int]:
api_key = settings.TMDB_API_KEY
api_url = (
f"https://{settings.TMDB_API_DOMAIN}/3/find/{imdb_id}"
f"?api_key={api_key}&external_source=imdb_id"
)
data = RequestUtils(accept_type="application/json", proxies=settings.PROXY if self._proxy else None
).get_json(api_url)
if not data:
return None
return self._match_results(data, media_info)
async def async_imdb_to_tmdb(self, imdb_id: str, media_info: Optional[MediaInfo] = None) -> Optional[int]:
api_key = settings.TMDB_API_KEY
api_url = (
f"https://{settings.TMDB_API_DOMAIN}/3/find/{imdb_id}"
f"?api_key={api_key}&external_source=imdb_id"
)
data = await AsyncRequestUtils(accept_type="application/json", proxies=settings.PROXY if self._proxy else None
).get_json(api_url)
if not data:
return None
return self._match_results(data, media_info)

File diff suppressed because one or more lines are too long

View File

@@ -12,7 +12,7 @@ from apscheduler.triggers.cron import CronTrigger
from fastapi import Response
from app.core.config import settings
from app.core.event import eventmanager
from app.core.event import eventmanager, Event
from app.db.site_oper import SiteOper
from app.log import logger
from app.plugins import _PluginBase
@@ -29,7 +29,7 @@ class ToBypassTrackers(_PluginBase):
# 插件图标
plugin_icon = "Clash_A.png"
# 插件版本
plugin_version = "1.4.1"
plugin_version = "1.4.2"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -128,7 +128,14 @@ class ToBypassTrackers(_PluginBase):
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
return [{
"cmd": "/refresh_tracker_ips",
"event": EventType.PluginAction,
"desc": "更新 Tracker IP 列表",
"data": {
"action": "refresh_tracker_ips"
}
}]
def get_api(self) -> List[Dict[str, Any]]:
"""
@@ -495,7 +502,7 @@ class ToBypassTrackers(_PluginBase):
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))
logger.error(f"退出插件失败:{e}")
def get_service(self) -> List[Dict[str, Any]]:
"""
@@ -518,13 +525,13 @@ class ToBypassTrackers(_PluginBase):
}]
return []
def bypassed_ips(self, protocol: str):
def bypassed_ips(self, protocol: str) -> Response:
if protocol == '6':
return Response(content=self.ipv6_txt, media_type="text/plain")
return Response(content=self.ipv4_txt, media_type="text/plain")
@eventmanager.register(EventType.PluginAction)
def update_ips(self):
def update_ips(self, event: Optional[Event]=None):
def __is_ip_in_subnet(ip_input: str, su_bnet: str) -> bool:
"""
Check if the given IP address is in the specified subnet.
@@ -537,10 +544,10 @@ class ToBypassTrackers(_PluginBase):
subnet_obj = ipaddress.ip_network(su_bnet, strict=False)
return ip_obj in subnet_obj
def __search_ip(ip, ips_list):
def __search_ip(_ip, ips_list):
i = 0
for ip_range in ips_list:
if __is_ip_in_subnet(ip, ip_range):
if __is_ip_in_subnet(_ip, ip_range):
return i
i += 1
return -1
@@ -592,6 +599,10 @@ class ToBypassTrackers(_PluginBase):
for domain_ in domains_])
await asyncio.gather(*tasks)
if event:
event_data = event.event_data
if not event_data or event_data.get("action") != "refresh_tracker_ips":
return
query_helper = DnsHelper(self._dns_input)
logger.info(f"开始通过 {query_helper.method_name} 解析DNS")
chnroute6_lists_url = "https://ispip.clang.cn/all_cn_ipv6.txt"