Files
archived-MoviePilot/app/helper/format.py
2026-05-20 22:45:00 +08:00

1239 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Match, Optional, Tuple
import anitopy
import parse
from app.core.config import settings
from app.core.metainfo import MetaInfoPath
from app.core.meta.metabase import MetaBase
from app.log import logger
from app.schemas import EpisodeFormatRule, FileItem
class FormatParser(object):
_key = ""
_split_chars = r"\.|\s+|\(|\)|\[|]|-|\+|【|】|/||;|&|\||#|_|「|」|~"
def __init__(self, eformat: str, details: Optional[str] = None, part: Optional[str] = None,
offset: Optional[str] = None, key: Optional[str] = "ep"):
"""
:params eformat: 格式化字符串
:params details: 格式化详情
:params part: 分集
:params offset: 偏移量 -10/EP*2
:prams key: EP关键字
"""
self._format = eformat
self._start_ep = None
self._end_ep = None
if not offset:
self.__offset = "EP"
elif "EP" in offset:
self.__offset = offset
else:
if offset.startswith("-") or offset.startswith("+"):
self.__offset = f"EP{offset}"
else:
self.__offset = f"EP+{offset}"
self._key = key
self._part = None
if part:
self._part = part
if details:
if re.compile("\\d{1,4}-\\d{1,4}").match(details):
self._start_ep = details
self._end_ep = details
else:
tmp = details.split(",")
if len(tmp) > 1:
self._start_ep = int(tmp[0])
self._end_ep = int(tmp[0]) if int(tmp[0]) > int(tmp[1]) else int(tmp[1])
else:
self._start_ep = self._end_ep = int(tmp[0])
@property
def format(self):
return self._format
@property
def start_ep(self):
return self._start_ep
@property
def end_ep(self):
return self._end_ep
@property
def part(self):
return self._part
@property
def offset(self):
return self.__offset
def match(self, file: str) -> bool:
if not self._format:
return True
s, e = self.__handle_single(file)
if not s:
return False
if self._start_ep is None:
return True
if self._start_ep <= s <= self._end_ep:
return True
return False
def split_episode(self, file_name: str, file_meta: MetaBase) -> Tuple[Optional[int], Optional[int], Optional[str]]:
"""
拆分集数返回开始集数结束集数Part信息
"""
# 指定的具体集数,直接返回
if self._start_ep is not None:
if self._start_ep == self._end_ep:
# `details` 格式为 `X-X` 或者 `X`
if isinstance(self._start_ep, str):
# `details` 格式为 `X-X`
s, e = self._start_ep.split("-")
start_ep = self.__offset.replace("EP", s)
end_ep = self.__offset.replace("EP", e)
if int(s) == int(e):
return int(eval(start_ep)), None, self.part
return int(eval(start_ep)), int(eval(end_ep)), self.part
else:
# `details` 格式为 `X`
start_ep = self.__offset.replace("EP", str(self._start_ep))
return int(eval(start_ep)), None, self.part
elif not self._format:
# `details` 格式为 `X,X`
start_ep = self.__offset.replace("EP", str(self._start_ep))
end_ep = self.__offset.replace("EP", str(self._end_ep))
return int(eval(start_ep)), int(eval(end_ep)), self.part
if not self._format:
# 未填入`集数定位` 且没有`指定集数` 仅处理`集数偏移`
start_ep = eval(self.__offset.replace("EP", str(file_meta.begin_episode))) if file_meta.begin_episode else None
end_ep = eval(self.__offset.replace("EP", str(file_meta.end_episode))) if file_meta.end_episode else None
return int(start_ep) if start_ep else None, int(end_ep) if end_ep else None, self.part
else:
# 有`集数定位`
s, e = self.__handle_single(file_name)
start_ep = self.__offset.replace("EP", str(s)) if s else None
end_ep = self.__offset.replace("EP", str(e)) if e else None
return int(eval(start_ep)) if start_ep else None, int(eval(end_ep)) if end_ep else None, self.part
def __handle_single(self, file: str) -> Tuple[Optional[int], Optional[int]]:
"""
处理单集,返回单集的开始和结束集数
"""
if not self._format:
return None, None
ret = parse.parse(self._format, file)
if not ret or not ret.__contains__(self._key):
return None, None
episodes = ret.__getitem__(self._key)
if not re.compile(
r"^([Ee][Pp]?)?(\d{1,4})(-([Ee][Pp]?)?(\d{1,4}))?$",
re.IGNORECASE,
).match(episodes):
return None, None
episode_splits = list(filter(lambda x: re.compile(r'[a-zA-Z]*\d{1,4}', re.IGNORECASE).match(x),
re.split(r'%s' % self._split_chars, episodes)))
if len(episode_splits) == 1:
return int(re.compile(r'[a-zA-Z]*', re.IGNORECASE).sub("", episode_splits[0])), None
else:
return int(re.compile(r'[a-zA-Z]*', re.IGNORECASE).sub("", episode_splits[0])), int(
re.compile(r'[a-zA-Z]*', re.IGNORECASE).sub("", episode_splits[1]))
@dataclass(frozen=True)
class _AutoRecommendSample:
file_name: str
ep_span: Tuple[int, int]
expected_episode: str
source_kind: str = "media"
native_episode: Optional[str] = None
native_verified: bool = False
used_native_fallback: bool = False
class EpisodeFormatRuleHelper:
"""
集数定位规则辅助类
"""
_MIN_MEDIA_FILE_SIZE_BYTES = 100 * 1024 * 1024
_EMPTY_META = MetaBase(title="")
_EP_RANGE_RE = re.compile(
r"(?<![A-Za-z0-9])[Ee][Pp]?(\d{1,4}(?:-[Ee]?[Pp]?\d{1,4})+)(?!\d)"
)
_EP_PREFIX_RE = re.compile(r"(?<![A-Za-z0-9])[Ee][Pp]?(\d{1,4})(?!\d)")
_SEASON_EP_RANGE_RE = re.compile(
r"[Ss]\d{1,4}[Ee][Pp]?(\d{1,4}(?:-[Ee]?[Pp]?\d{1,4})+)(?!\d)"
)
_SEASON_EP_RE = re.compile(r"[Ss]\d{1,4}[Ee][Pp]?(\d{1,4})(?!\d)")
_HASH_EP_RE = re.compile(r"(?<!\d)#(\d{1,4})(?!\d)")
_BRACKET_EP_RE = re.compile(r"[\[【](\d{1,4})[\]】]")
_FALLBACK_EPISODE_RE = re.compile(r"第(\d{1,4})[話话]")
_FALLBACK_EPISODE_JI_RE = re.compile(r"第(\d{1,4})集")
_FALLBACK_PERIOD_RE = re.compile(r"。(\d{1,4})\s")
_CJK_EP_RE = re.compile(r"第(\d{1,4})(?:[話话集])")
_SPECIAL_SAMPLE_RE = re.compile(
r"(?<![A-Za-z0-9])(SP\d+|NCOP|NCED|OP|ED|MENU)(?![A-Za-z0-9])",
re.IGNORECASE,
)
def recommend(
self,
rules: List[EpisodeFormatRule],
sample_files: List[FileItem],
) -> Tuple[bool, str, Optional[dict]]:
"""
推荐集数定位模板
"""
if not rules:
return self._auto_recommend(sample_files)
if not sample_files:
return False, "目录中没有可用于识别的媒体文件", None
for index, rule in enumerate(rules):
matched_samples = self._match_rule(rule, sample_files)
if not matched_samples:
continue
sample_file, match_result = matched_samples[0]
episode_format = self._build_template(sample_file.name, match_result)
if not episode_format:
continue
if not self._validate_template(episode_format, matched_samples):
logger.warn(f"集数定位规则 {rule.name} 模板校验失败")
continue
compatibility_samples = self._build_detected_samples(
self._filter_by_extension_and_size(sample_files),
)
if compatibility_samples and not self._validate_auto_template(
episode_format,
compatibility_samples,
):
logger.warn(f"集数定位规则 {rule.name} 附加文件兼容性校验失败")
continue
logger.info(
f"集数定位规则命中:{rule.name},样本文件:{sample_file.name}"
)
return True, "", {
"rule_name": rule.name,
"rule_index": index,
"pattern": rule.pattern,
"episode_format": episode_format,
"sample_file": sample_file.name,
"min_file_size_mb": rule.min_file_size_mb,
"message": "已根据预定义规则生成集数定位模板",
}
return self._auto_recommend(sample_files)
def _auto_recommend(
self,
sample_files: List[FileItem],
) -> Tuple[bool, str, Optional[dict]]:
"""
自动生成集数定位模板anitopy 反向定位 + 多文件对比
"""
if not sample_files:
return False, "目录中没有可用于识别的媒体文件", None
candidates = self._filter_by_extension_and_size(sample_files)
size_filter_relaxed = False
if not candidates:
candidates = self._filter_by_extension_and_size(
sample_files, ignore_size=True
)
size_filter_relaxed = bool(candidates)
if not candidates:
return False, "无匹配自定义定位规则,智能生成失败", None
valid_samples = self._build_detected_samples(candidates)
native_verified_count = 0
native_fallback_count = 0
native_conflict_count = 0
episode_not_detected_count = 0
for item in valid_samples:
if item.native_verified:
native_verified_count += 1
if item.used_native_fallback:
native_fallback_count += 1
for item in sorted(
candidates,
key=lambda entry: (
self._sample_kind_priority(self._get_file_kind(entry)),
(entry.name or ""),
(entry.path or ""),
),
):
file_name = item.name or ""
if self._is_special_sample(file_name):
continue
normalized_episode, native_episode, used_native_fallback, native_verified = (
self._extract_episode_with_native_fallback(item)
)
if normalized_episode and native_episode and not (
used_native_fallback or native_verified
):
native_conflict_count += 1
logger.warn(
"自动推荐样本与原生集数识别冲突,跳过:"
f"{file_name} - auto={normalized_episode}, native={native_episode}"
)
continue
expected_start, _ = self._parse_episode_value(normalized_episode)
if expected_start is None:
episode_not_detected_count += 1
continue
if expected_start <= 0:
continue
if self._locate_episode(file_name, normalized_episode) is None:
episode_not_detected_count += 1
if not valid_samples:
if native_conflict_count:
return (
False,
"样本命名与原生识别结果冲突,建议补充集数定位规则",
None,
)
if episode_not_detected_count:
return False, "样本未识别到有效集数,智能生成失败", None
return False, "无匹配自定义定位规则,智能生成失败", None
if native_conflict_count and len(valid_samples) < len(candidates):
return (
False,
"样本命名与原生识别结果冲突,建议补充集数定位规则",
None,
)
majority_samples, clear_majority = self._select_base_samples(valid_samples)
if len(valid_samples) > 1 and not clear_majority:
logger.warn("自动生成样本未形成明确多数派,放弃推荐")
return False, "样本命名差异过大,建议补充集数定位规则", None
majority_names = [sample.file_name for sample in majority_samples]
majority_spans = [sample.ep_span for sample in majority_samples]
episode_format = self._build_ep_only_template(
majority_names, majority_spans, use_majority=False
)
if not self._validate_auto_template(episode_format, majority_samples):
diff_result = self._build_template_with_diff(
majority_names, majority_spans, use_majority=False
)
if diff_result and self._validate_auto_template(
diff_result, majority_samples
):
episode_format = diff_result
else:
logger.warn("多文件对比未通过模板校验,自动生成失败")
return False, "无匹配自定义定位规则,智能生成失败", None
sample_file = majority_names[0]
low_confidence = len(majority_samples) == 1 or size_filter_relaxed
reasons = self._build_auto_reasons(
sample_count=len(valid_samples),
majority_count=len(majority_samples),
size_filter_relaxed=size_filter_relaxed,
native_fallback_count=native_fallback_count,
native_verified_count=native_verified_count,
)
logger.info(f"智能分析生成集数定位模板:{sample_file} -> {episode_format}")
return True, "", {
"rule_name": "智能分析",
"episode_format": episode_format,
"sample_file": sample_file,
"pattern": None,
"sample_count": len(valid_samples),
"majority_count": len(majority_samples),
"confidence": "low" if low_confidence else "high",
"size_filter_relaxed": size_filter_relaxed,
"native_verified_count": native_verified_count,
"native_fallback_count": native_fallback_count,
"native_conflict_count": native_conflict_count,
"reason": reasons[0] if reasons else None,
"reasons": reasons,
"message": self._build_auto_message(
sample_count=len(valid_samples),
majority_count=len(majority_samples),
size_filter_relaxed=size_filter_relaxed,
native_fallback_count=native_fallback_count,
),
}
@staticmethod
def _build_auto_message(
sample_count: int,
majority_count: int,
size_filter_relaxed: bool,
native_fallback_count: int,
) -> str:
if majority_count <= 1:
return "样本不足,仅基于单文件智能生成(仅供参考)"
if size_filter_relaxed:
return "已放宽体积限制智能生成模板(仅供参考)"
if native_fallback_count:
return "已结合原生集数识别智能生成模板(仅供参考)"
if sample_count != majority_count:
return "已根据多数派样本智能生成模板(仅供参考)"
return "无匹配自定义定位规则,已智能生成(仅供参考)"
@staticmethod
def _build_auto_reasons(
sample_count: int,
majority_count: int,
size_filter_relaxed: bool,
native_fallback_count: int,
native_verified_count: int,
) -> List[str]:
reasons: List[str] = []
if majority_count <= 1:
reasons.append("single_sample_only")
if size_filter_relaxed:
reasons.append("small_files_fallback")
if native_fallback_count:
reasons.append("native_meta_fallback")
elif native_verified_count:
reasons.append("native_meta_verified")
if sample_count != majority_count:
reasons.append("majority_samples_only")
if not reasons:
reasons.append("auto_recommendation")
return reasons
@staticmethod
def _filter_by_extension_and_size(
files: List[FileItem],
ignore_size: bool = False,
) -> List[FileItem]:
"""
第一轮筛选:主视频扩展名白名单 + 体积门槛,字幕/外挂音频始终允许参与
"""
candidates: List[FileItem] = []
for item in files:
file_kind = EpisodeFormatRuleHelper._get_file_kind(item)
if file_kind == "other":
continue
if (
file_kind == "media"
and not ignore_size
and (item.size or 0) < EpisodeFormatRuleHelper._MIN_MEDIA_FILE_SIZE_BYTES
):
continue
candidates.append(item)
return candidates
@staticmethod
def _get_file_kind(item: FileItem) -> str:
extension = f".{(item.extension or '').lower().lstrip('.')}" if item.extension else ""
if extension in settings.RMT_MEDIAEXT:
return "media"
if extension in settings.RMT_SUBEXT:
return "subtitle"
if extension in settings.RMT_AUDIOEXT:
return "audio"
return "other"
@staticmethod
def _sample_kind_priority(kind: str) -> int:
return {
"media": 0,
"subtitle": 1,
"audio": 2,
}.get(kind, 9)
@classmethod
def _is_special_sample(cls, file_name: str) -> bool:
return bool(cls._SPECIAL_SAMPLE_RE.search(file_name or ""))
def _build_detected_samples(
self,
candidates: List[FileItem],
) -> List[_AutoRecommendSample]:
valid_samples: List[_AutoRecommendSample] = []
for item in sorted(
candidates,
key=lambda entry: (
self._sample_kind_priority(self._get_file_kind(entry)),
(entry.name or ""),
(entry.path or ""),
),
):
file_name = item.name or ""
if self._is_special_sample(file_name):
# SP/NCOP/NCED/OP/ED/MENU 等明显特典样本不参与正片模板自动推荐。
continue
normalized_episode, native_episode, used_native_fallback, native_verified = (
self._extract_episode_with_native_fallback(item)
)
if normalized_episode and native_episode and not (
used_native_fallback or native_verified
):
continue
expected_start, _ = self._parse_episode_value(normalized_episode)
if expected_start is None:
continue
if expected_start <= 0:
# 00 集通常归属于特殊季,不参与正片模板自动推荐。
continue
if normalized_episode and not normalized_episode.isdigit():
# 非纯整数的特殊集数当前不在 FormatParser 消费契约内,
# 继续参与推荐只会把正片模板生成带偏。
continue
ep_span = self._locate_episode(file_name, normalized_episode)
if ep_span is None:
continue
valid_samples.append(
_AutoRecommendSample(
file_name=file_name,
ep_span=ep_span,
expected_episode=normalized_episode,
source_kind=self._get_file_kind(item),
native_episode=native_episode,
native_verified=native_verified,
used_native_fallback=used_native_fallback,
)
)
return valid_samples
@classmethod
def _locate_episode(
cls,
file_name: str,
episode_value: str,
) -> Optional[Tuple[int, int]]:
"""
三级策略反向定位 episode_number 在文件名中的位置
"""
normalized_episode_value = cls._normalize_episode_value(episode_value)
for matcher in (
cls._EP_RANGE_RE,
cls._EP_PREFIX_RE,
cls._SEASON_EP_RANGE_RE,
cls._SEASON_EP_RE,
cls._HASH_EP_RE,
cls._BRACKET_EP_RE,
cls._CJK_EP_RE,
):
for match in matcher.finditer(file_name):
if cls._episode_value_equals(
match.group(1),
normalized_episode_value,
):
return match.span(1)
for candidate in cls._build_episode_candidates(normalized_episode_value):
token_pattern = re.compile(
rf"(?:(?<=^)|(?<=[\s._\-\[\]【】()]))"
rf"{re.escape(candidate)}"
rf"(?:(?=$)|(?=[\s._\-\[\]【】()]))"
)
matches = list(token_pattern.finditer(file_name))
if matches:
return matches[-1].span()
return None
@staticmethod
def _normalize_episode_value(episode_value) -> str:
if isinstance(episode_value, list):
parts = [str(part) for part in episode_value]
else:
parts = str(episode_value).split("-")
normalized_parts = [
re.sub(r"^[Ee][Pp]?", "", part.strip())
for part in parts
if str(part).strip()
]
return "-".join(normalized_parts)
@staticmethod
def _parse_episode_value(
expected_episode: Optional[str],
) -> Tuple[Optional[int], Optional[int]]:
if not expected_episode:
return None, None
parts = []
for part in str(expected_episode).split("-"):
cleaned = re.sub(r"^[Ee][Pp]?", "", part.strip())
number_match = re.search(r"\d{1,4}", cleaned)
if not number_match:
return None, None
parts.append(int(number_match.group()))
if not parts:
return None, None
if len(parts) == 1 or parts[-1] == parts[0]:
return parts[0], None
return parts[0], parts[-1]
@classmethod
def _episode_value_equals(
cls,
actual_episode: Optional[str],
expected_episode: Optional[str],
) -> bool:
if not actual_episode or not expected_episode:
return False
return cls._parse_episode_value(actual_episode) == cls._parse_episode_value(
expected_episode
)
@classmethod
def _build_episode_candidates(
cls,
episode_value: Optional[str],
) -> List[str]:
start_episode, end_episode = cls._parse_episode_value(episode_value)
if start_episode is None:
return []
candidates: List[str] = []
if end_episode is None:
for width in range(1, 5):
candidates.append(str(start_episode).zfill(width))
else:
for width in range(1, 5):
start_text = str(start_episode).zfill(width)
end_text = str(end_episode).zfill(width)
candidates.append(f"{start_text}-{end_text}")
candidates.append(f"{start_text}-E{end_text}")
candidates.append(f"{start_text}-EP{end_text}")
# 保证顺序稳定,同时去重
return list(dict.fromkeys(candidates))
@classmethod
def _extract_native_episode(cls, item: FileItem) -> Optional[str]:
source_path = item.path or item.name
if not source_path:
return None
try:
meta = MetaInfoPath(Path(source_path))
except Exception as err:
logger.warn(f"原生集数识别失败:{source_path} - {err}")
return None
if meta.begin_episode is None:
return None
if meta.end_episode is not None and meta.end_episode != meta.begin_episode:
return f"{meta.begin_episode}-{meta.end_episode}"
return str(meta.begin_episode)
def _extract_episode_with_native_fallback(
self,
item: FileItem,
) -> Tuple[Optional[str], Optional[str], bool, bool]:
file_name = item.name or ""
native_episode = self._extract_native_episode(item)
episode_number = None
try:
result = anitopy.parse(file_name)
episode_number = result.get("episode_number")
except Exception as err:
logger.warn(f"anitopy 解析失败:{file_name} - {err}")
if not episode_number:
episode_number = self._extract_episode_fallback(file_name)
normalized_episode = (
self._normalize_episode_value(episode_number)
if episode_number
else None
)
used_native_fallback = False
native_verified = False
if normalized_episode and native_episode:
if self._episode_value_equals(normalized_episode, native_episode):
native_verified = True
else:
return normalized_episode, native_episode, False, False
elif not normalized_episode and native_episode:
normalized_episode = native_episode
used_native_fallback = True
return normalized_episode, native_episode, used_native_fallback, native_verified
@classmethod
def _extract_episode_fallback(cls, file_name: str) -> Optional[str]:
"""
anitopy 无法识别时的兜底集数提取第xx話 / 第xx话 / 。01 等)
"""
match = cls._FALLBACK_EPISODE_RE.search(file_name)
if match:
return match.group(1)
match = cls._FALLBACK_EPISODE_JI_RE.search(file_name)
if match:
return match.group(1)
match = cls._FALLBACK_PERIOD_RE.search(file_name)
if match:
return match.group(1)
return None
@staticmethod
def _select_base_samples(
samples: Iterable[_AutoRecommendSample],
) -> Tuple[List[_AutoRecommendSample], bool]:
"""
before_ep 多数投票选取基准文件,排除 OAD 等异类
"""
before_groups: Dict[str, List[_AutoRecommendSample]] = defaultdict(list)
for sample in samples:
before_groups[sample.file_name[: sample.ep_span[0]]].append(sample)
sorted_groups = sorted(
before_groups.items(),
key=lambda item: (-len(item[1]), item[0]),
)
majority_group = sorted(
sorted_groups[0][1],
key=lambda item: (
EpisodeFormatRuleHelper._sample_kind_priority(item.source_kind),
item.file_name,
item.ep_span[0],
item.ep_span[1],
),
)
clear_majority = (
len(sorted_groups) == 1
or len(majority_group) > len(sorted_groups[1][1])
)
return majority_group, clear_majority
def _build_ep_only_template(
self,
file_names: List[str],
ep_spans: List[Tuple[int, int]],
use_majority: bool = True,
) -> str:
"""
基于多数派文件生成仅含 {ep} 的模板
"""
if use_majority:
majority_samples, _ = self._select_base_samples(
_AutoRecommendSample(
file_name=name,
ep_span=span,
expected_episode="",
)
for name, span in zip(file_names, ep_spans)
)
file_names = [sample.file_name for sample in majority_samples]
ep_spans = [sample.ep_span for sample in majority_samples]
return self._build_ep_template_from_file(file_names[0], ep_spans[0])
def _build_ep_template_from_file(
self,
file_name: str,
ep_span: Tuple[int, int],
) -> str:
start, end = ep_span
return (
self._escape_literal(file_name[:start])
+ "{ep}"
+ self._escape_literal(file_name[end:])
)
def _build_template_with_diff(
self,
file_names: List[str],
ep_spans: List[Tuple[int, int]],
use_majority: bool = True,
) -> Optional[str]:
"""
多文件对比生成含 {a}/{b}/{c} 占位符的模板
"""
if use_majority:
majority_samples, _ = self._select_base_samples(
_AutoRecommendSample(
file_name=name,
ep_span=span,
expected_episode="",
)
for name, span in zip(file_names, ep_spans)
)
file_names = [sample.file_name for sample in majority_samples]
ep_spans = [sample.ep_span for sample in majority_samples]
if len(file_names) < 2:
return None
before_ep_set = {name[: span[0]] for name, span in zip(file_names, ep_spans)}
if len(before_ep_set) != 1:
return None
after_ep_list = [name[span[1]:] for name, span in zip(file_names, ep_spans)]
if len(set(after_ep_list)) == 1:
return None
template = self._build_ep_template_from_file(file_names[0], ep_spans[0])
placeholders = ["a", "b", "c"]
placeholder_idx = 0
while placeholder_idx < len(placeholders):
failed = self._find_unmatched(template, file_names)
if not failed:
break
updated_template = self._insert_variable_placeholder(
template,
failed,
after_ep_list,
file_names,
placeholders[placeholder_idx],
)
if updated_template == template:
break
template = updated_template
placeholder_idx += 1
return template
@staticmethod
def _find_unmatched(
template: str,
file_names: List[str],
) -> List[str]:
parser = EpisodeFormatRuleHelper._create_format_parser(
template,
context="多文件对比预校验",
)
if not parser:
return list(file_names)
failed: List[str] = []
for name in file_names:
if not EpisodeFormatRuleHelper._safe_match_template(
parser,
name,
context="多文件对比预校验",
):
failed.append(name)
return failed
def _insert_variable_placeholder(
self,
template: str,
failed_files: List[str],
after_ep_list: List[str],
all_file_names: List[str],
placeholder: str,
) -> str:
ep_marker = "{ep}"
ep_pos = template.find(ep_marker)
if ep_pos < 0:
return template
current_after_ep_template = template[ep_pos + len(ep_marker):]
base_after_ep = after_ep_list[0]
existing_spans = self._collect_placeholder_spans(
current_after_ep_template, base_after_ep
)
failed_after_ep_list = [
after_ep
for name, after_ep in zip(all_file_names, after_ep_list)
if name in failed_files
]
next_span = self._find_next_variable_span(
base_after_ep,
failed_after_ep_list,
existing_spans,
)
if next_span is None:
return template
updated_spans = existing_spans + [
(next_span[0], next_span[1], placeholder)
]
before_ep = template[:ep_pos]
return before_ep + ep_marker + self._render_after_ep_template(
base_after_ep,
updated_spans,
)
@staticmethod
def _collect_placeholder_spans(
after_ep_template: str,
base_after_ep: str,
) -> List[Tuple[int, int, str]]:
if not after_ep_template or "{" not in after_ep_template:
return []
result = EpisodeFormatRuleHelper._safe_parse_template(
after_ep_template,
base_after_ep,
context="占位符区间收集",
)
if not result:
return []
spans: List[Tuple[int, int, str]] = []
for name, span in result.spans.items():
spans.append((span[0], span[1], name))
spans.sort(key=lambda item: item[0])
return spans
def _find_next_variable_span(
self,
base_after_ep: str,
failed_after_ep_list: List[str],
existing_spans: List[Tuple[int, int, str]],
) -> Optional[Tuple[int, int]]:
cursor = 0
literal_gaps: List[Tuple[int, int]] = []
for start, end, _ in existing_spans:
if cursor < start:
literal_gaps.append((cursor, start))
cursor = end
if cursor < len(base_after_ep):
literal_gaps.append((cursor, len(base_after_ep)))
for gap_start, gap_end in literal_gaps:
if gap_start >= gap_end:
continue
probe_template = self._render_after_ep_template(
base_after_ep,
existing_spans + [(gap_start, gap_end, "probe")],
)
probe_values: List[str] = []
base_gap = base_after_ep[gap_start:gap_end]
for failed_after_ep in failed_after_ep_list:
result = self._safe_parse_template(
probe_template,
failed_after_ep,
context="变量区间探测",
)
if not result:
continue
probe_value = result.named.get("probe")
if probe_value is None or probe_value == base_gap:
continue
probe_values.append(probe_value)
if not probe_values:
continue
relative_span = self._calculate_variable_span(base_gap, probe_values)
if relative_span is None:
continue
return gap_start + relative_span[0], gap_start + relative_span[1]
return None
def _calculate_variable_span(
self,
base_text: str,
compare_texts: List[str],
) -> Optional[Tuple[int, int]]:
candidates = [base_text] + compare_texts
prefix_len = self._common_prefix_length(candidates)
suffix_len = self._common_suffix_length(candidates, prefix_len)
variable_parts = [
text[
prefix_len:
len(text) - suffix_len if suffix_len else len(text)
]
for text in candidates
]
while prefix_len > 0 and any(not part for part in variable_parts):
prefix_len -= 1
variable_parts = [
text[
prefix_len:
len(text) - suffix_len if suffix_len else len(text)
]
for text in candidates
]
if any(not part for part in variable_parts):
return None
end_pos = len(base_text) - suffix_len
if prefix_len >= end_pos:
return None
return prefix_len, end_pos
@staticmethod
def _common_prefix_length(texts: List[str]) -> int:
if not texts:
return 0
min_len = min(len(text) for text in texts)
prefix_len = 0
while prefix_len < min_len:
current_char = texts[0][prefix_len]
if any(text[prefix_len] != current_char for text in texts[1:]):
break
prefix_len += 1
return prefix_len
@staticmethod
def _common_suffix_length(
texts: List[str],
prefix_len: int = 0,
) -> int:
if not texts:
return 0
suffix_len = 0
min_len = min(len(text) for text in texts)
while suffix_len < min_len - prefix_len:
current_char = texts[0][-suffix_len - 1]
if any(text[-suffix_len - 1] != current_char for text in texts[1:]):
break
suffix_len += 1
return suffix_len
def _render_after_ep_template(
self,
base_after_ep: str,
spans: List[Tuple[int, int, str]],
) -> str:
template_parts: List[str] = []
cursor = 0
for start, end, name in sorted(spans, key=lambda item: item[0]):
if start < cursor or end <= start:
continue
template_parts.append(
self._escape_literal(base_after_ep[cursor:start])
)
template_parts.append(f"{{{name}}}")
cursor = end
template_parts.append(self._escape_literal(base_after_ep[cursor:]))
return "".join(template_parts)
def _validate_auto_template(
self,
episode_format: str,
samples: List[_AutoRecommendSample],
) -> bool:
"""
用 FormatParser 校验自动生成的模板
"""
if not episode_format:
return False
parser = self._create_format_parser(
episode_format,
context="自动模板校验",
)
if not parser:
return False
for sample in samples:
if not self._safe_match_template(
parser,
sample.file_name,
context="自动模板校验",
):
return False
start_episode, end_episode, _ = self._safe_split_episode(
parser,
sample.file_name,
context="自动模板校验",
)
if not self._episode_matches(
start_episode,
end_episode,
sample.expected_episode,
):
return False
if sample.native_episode and not self._episode_matches(
start_episode,
end_episode,
sample.native_episode,
):
return False
return True
@staticmethod
def _match_rule(
rule: EpisodeFormatRule,
sample_files: List[FileItem],
) -> List[Tuple[FileItem, Match[str]]]:
"""
获取规则命中的样本文件
"""
try:
compiled_pattern = re.compile(
EpisodeFormatRuleHelper._normalize_pattern(rule.pattern)
)
except Exception as err:
logger.warn(f"集数定位规则 {rule.name} 编译失败:{err}")
return []
matched_samples: List[Tuple[FileItem, Match[str]]] = []
for item in sample_files:
if (
rule.min_file_size_mb
and EpisodeFormatRuleHelper._get_file_kind(item) == "media"
and (item.size or 0) < rule.min_file_size_mb * 1024 * 1024
):
continue
match_result = compiled_pattern.search(item.name or "")
if not match_result or "ep" not in match_result.groupdict():
continue
matched_samples.append((item, match_result))
return matched_samples
def _build_template(
self,
file_name: str,
match_result: Match[str],
) -> Optional[str]:
"""
根据命中的样本生成模板
"""
group_items = []
for group_name, group_value in match_result.groupdict().items():
if group_value is None:
continue
start, end = match_result.span(group_name)
if start < 0 or end < 0:
continue
if start == end:
continue
group_items.append((start, end, group_name))
if not group_items or not any(
group_name == "ep"
for _, _, group_name in group_items
):
return None
group_items.sort(key=lambda item: (item[0], -(item[1] - item[0])))
template_parts: List[str] = []
cursor = 0
for start, end, group_name in group_items:
if start < cursor:
continue
template_parts.append(self._escape_literal(file_name[cursor:start]))
template_parts.append(f"{{{group_name}}}")
cursor = end
template_parts.append(self._escape_literal(file_name[cursor:]))
return "".join(template_parts)
def _validate_template(
self,
episode_format: str,
matched_samples: List[Tuple[FileItem, Match[str]]],
) -> bool:
"""
校验生成的模板是否可被现有格式解析器稳定消费
"""
parser = self._create_format_parser(
episode_format,
context="规则模板校验",
)
if not parser:
return False
for item, match_result in matched_samples:
file_name = item.name or ""
if not self._safe_match_template(
parser,
file_name,
context="规则模板校验",
):
return False
start_episode, end_episode, _ = self._safe_split_episode(
parser,
file_name,
context="规则模板校验",
)
expected_episode = match_result.groupdict().get("ep")
if not self._episode_matches(
start_episode,
end_episode,
expected_episode,
):
return False
return True
@staticmethod
def _create_format_parser(
episode_format: str,
context: str,
) -> Optional[FormatParser]:
try:
return FormatParser(eformat=episode_format)
except Exception as err:
logger.warn(f"{context} 创建模板解析器失败:{episode_format} - {err}")
return None
@staticmethod
def _safe_match_template(
parser: FormatParser,
file_name: str,
context: str,
) -> bool:
try:
return parser.match(file_name)
except Exception as err:
logger.warn(f"{context} 模板匹配失败:{file_name} - {err}")
return False
@classmethod
def _safe_split_episode(
cls,
parser: FormatParser,
file_name: str,
context: str,
) -> Tuple[Optional[int], Optional[int], Optional[str]]:
try:
return parser.split_episode(
file_name=file_name,
file_meta=cls._EMPTY_META,
)
except Exception as err:
logger.warn(f"{context} 集数拆分失败:{file_name} - {err}")
return None, None, None
@staticmethod
def _safe_parse_template(
template: str,
file_name: str,
context: str,
) -> Optional[parse.Result]:
try:
return parse.parse(template, file_name)
except Exception as err:
logger.warn(f"{context} parse 模板解析失败:{template} <- {file_name} - {err}")
return None
@classmethod
def _episode_matches(
cls,
actual_start: Optional[int],
actual_end: Optional[int],
expected_episode: Optional[str],
) -> bool:
"""
校验模板提取出的集数是否与期望值一致
"""
expected_start, expected_end = cls._parse_episode_value(expected_episode)
if actual_start is None or expected_start is None:
return False
if actual_start != expected_start:
return False
if expected_end is None:
return actual_end is None
return actual_end == expected_end
@staticmethod
def _normalize_pattern(pattern: str) -> str:
"""
将 PCRE 风格命名组转为 Python re 可识别的语法
"""
return re.sub(
r"\(\?<([a-zA-Z_][a-zA-Z0-9_]*)>",
r"(?P<\1>",
pattern,
)
def _escape_literal(self, text: str) -> str:
"""
将样本文本转为 parse 模板中的字面量
"""
escaped_parts: List[str] = []
for char in text:
if char in "{}":
escaped_parts.append(char * 2)
else:
escaped_parts.append(char)
return "".join(escaped_parts)