This commit is contained in:
jxxghp
2025-09-13 13:32:48 +08:00
parent ee0406a13f
commit 6c572baca5
2 changed files with 13 additions and 271 deletions

View File

@@ -1,4 +1,3 @@
import signal
import threading
import time
from pathlib import Path
@@ -18,51 +17,6 @@ from app.utils.singleton import WeakSingleton
lock = threading.Lock()
# 全局SMB实例列表用于信号处理
_smb_instances = []
def _signal_handler(signum, frame):
"""
信号处理器用于在进程终止时清理SMB连接
"""
logger.info(f"【SMB】收到信号 {signum}开始清理SMB连接...")
for smb_instance in _smb_instances:
try:
if hasattr(smb_instance, '_cleanup_connection'):
smb_instance._cleanup_connection()
except Exception as e:
logger.debug(f"【SMB】清理实例时出错: {e}")
# 注册信号处理器
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
def _safe_smb_operation(operation_func, *args, **kwargs):
"""
安全地执行SMB操作处理连接错误和线程问题
"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
return operation_func(*args, **kwargs)
except (SMBResponseException, SMBException) as e:
retry_count += 1
logger.warning(f"【SMB】操作失败尝试重试 ({retry_count}/{max_retries}): {e}")
if retry_count >= max_retries:
logger.error(f"【SMB】操作失败已达到最大重试次数: {e}")
raise
# 短暂等待后重试
import time
time.sleep(1)
except Exception as e:
logger.error(f"【SMB】操作时发生未知错误: {e}")
raise
class SMBConnectionError(Exception):
"""
@@ -88,21 +42,6 @@ class SMB(StorageBase, metaclass=WeakSingleton):
# 文件块大小默认10MB
chunk_size = 10 * 1024 * 1024
def __enter__(self):
"""
上下文管理器入口
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
上下文管理器出口,确保连接被正确清理
"""
try:
self._cleanup_connection()
except Exception as e:
logger.debug(f"【SMB】上下文管理器清理连接失败: {e}")
def __init__(self):
super().__init__()
self._connected = False
@@ -110,10 +49,7 @@ class SMB(StorageBase, metaclass=WeakSingleton):
self._host = None
self._username = None
self._password = None
# 注册到全局实例列表
_smb_instances.append(self)
self._init_connection()
def _init_connection(self):
@@ -179,8 +115,8 @@ class SMB(StorageBase, metaclass=WeakSingleton):
测试SMB连接
"""
try:
# 使用安全操作包装器测试连接
_safe_smb_operation(smbclient.listdir, self._server_path)
# 尝试列出根目录来测试连接
smbclient.listdir(self._server_path)
except SMBAuthenticationError as e:
raise SMBConnectionError(f"SMB认证失败{e}")
except SMBResponseException as e:
@@ -202,13 +138,6 @@ class SMB(StorageBase, metaclass=WeakSingleton):
"""
if not self._connected or not self._server_path:
raise SMBConnectionError("【SMB】连接未建立或已断开请检查配置")
# 尝试重新连接如果连接已断开
try:
self._test_connection()
except Exception as e:
logger.warning(f"【SMB】连接检查失败尝试重新连接: {e}")
self._reconnect()
def _normalize_path(self, path: Union[str, Path]) -> str:
"""
@@ -305,13 +234,6 @@ class SMB(StorageBase, metaclass=WeakSingleton):
except Exception as e:
logger.debug(f"【SMB】连接检查失败{e}")
self._connected = False
# 尝试重新连接
try:
self._reconnect()
if self._connected:
return True
except Exception as reconnect_error:
logger.debug(f"【SMB】重新连接失败{reconnect_error}")
return False
def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]:
@@ -332,11 +254,12 @@ class SMB(StorageBase, metaclass=WeakSingleton):
# 列出目录内容
try:
entries = _safe_smb_operation(smbclient.listdir, smb_path)
except Exception as e:
entries = smbclient.listdir(smb_path)
except SMBResponseException as e:
logger.error(f"【SMB】列出目录失败: {smb_path} - {e}")
return []
except SMBException as e:
logger.error(f"【SMB】列出目录失败: {smb_path} - {e}")
# 尝试重新连接
self._reconnect()
return []
items = []
@@ -356,8 +279,6 @@ class SMB(StorageBase, metaclass=WeakSingleton):
return items
except Exception as e:
logger.error(f"【SMB】列出文件失败: {e}")
# 尝试重新连接
self._reconnect()
return []
def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
@@ -740,51 +661,7 @@ class SMB(StorageBase, metaclass=WeakSingleton):
析构函数,清理连接
"""
try:
# 从全局实例列表中移除
if self in _smb_instances:
_smb_instances.remove(self)
# smbclient 自动管理连接池,但我们可以重置缓存
if hasattr(self, '_connected') and self._connected:
self._cleanup_connection()
if self._connected:
reset_connection_cache()
except Exception as e:
logger.debug(f"【SMB】清理连接失败: {e}")
def _reconnect(self):
"""
重新建立SMB连接
"""
try:
logger.info("【SMB】尝试重新连接...")
# 清理现有连接
self._cleanup_connection()
# 重新初始化连接
self._init_connection()
if self._connected:
logger.info("【SMB】重新连接成功")
else:
logger.error("【SMB】重新连接失败")
except Exception as e:
logger.error(f"【SMB】重新连接过程中出现错误: {e}")
self._connected = False
def _cleanup_connection(self):
"""
安全地清理SMB连接
"""
try:
# 标记连接为断开状态
self._connected = False
# 重置连接缓存,这会清理所有连接
reset_connection_cache()
logger.debug("【SMB】连接清理完成")
except Exception as e:
logger.debug(f"【SMB】连接清理过程中出现错误: {e}")
# 即使清理失败,也要确保连接状态被重置
self._connected = False

View File

@@ -1,5 +1,3 @@
import multiprocessing
import os
import re
from pathlib import Path
from threading import Lock
@@ -24,75 +22,6 @@ from app.utils.system import SystemUtils
lock = Lock()
def _transfer_command_worker(args):
"""
在子进程中执行文件转移命令的工作函数
:param args: 包含所有必要参数的元组
"""
# 解包参数
(fileitem, target_storage, target_file, transfer_type, storage_schemas, result_queue) = args
def __get_storage_oper(_storage: str, _func: Optional[str] = None) -> Optional[StorageBase]:
"""
获取存储操作对象
"""
for storage_schema in storage_schemas:
if storage_schema.schema \
and storage_schema.schema.value == _storage \
and (not _func or hasattr(storage_schema, _func)):
return storage_schema()
return None
try:
# 获取存储操作对象
source_oper = __get_storage_oper(fileitem.storage)
target_oper = __get_storage_oper(target_storage)
if not source_oper or not target_oper:
result_queue.put((None, f"无法创建存储操作对象: source={fileitem.storage}, target={target_storage}"))
return
# 执行原有的转移逻辑
result = TransHandler.execute_transfer_command(
fileitem, target_storage, source_oper, target_oper,
target_file, transfer_type
)
# 将结果放入队列
result_queue.put(result)
except Exception as e:
result_queue.put((None, str(e)))
def _supports_fork():
"""
检测当前系统是否支持 fork
:return: True 如果支持 forkFalse 否则
"""
try:
# Windows 不支持 fork
if os.name == 'nt':
return False
# 检查可用的启动方法
available_methods = multiprocessing.get_all_start_methods()
if 'fork' not in available_methods:
return False
# 尝试设置 fork 方法
try:
multiprocessing.set_start_method('fork', force=True)
return True
except RuntimeError:
# 如果已经设置过启动方法,检查当前方法
current_method = multiprocessing.get_start_method()
return current_method == 'fork'
except Exception as e:
logger.error(f"检测 fork 支持时出错: {str(e)}")
return False
class TransHandler:
"""
文件转移整理类
@@ -100,8 +29,7 @@ class TransHandler:
inner_lock: Lock = Lock()
def __init__(self, storage_schemas: List[StorageBase]):
self.storage_schemas = storage_schemas
def __init__(self):
self.result = None
def __reset_result(self):
@@ -382,75 +310,12 @@ class TransHandler:
finally:
self.result = None
def __transfer_command(self, fileitem: FileItem, target_storage: str,
@staticmethod
def __transfer_command(fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str,
) -> Tuple[Optional[FileItem], str]:
"""
处理单个文件,支持在 fork 进程中运行以提高内存回收
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标文件路径
:param transfer_type: 整理方式
"""
# 检查是否支持 fork
if _supports_fork():
logger.info(f"[PID:{os.getpid()}] 在新的进程中整理文件: {fileitem.path}")
# 创建队列用于进程间通信
result_queue = multiprocessing.Queue()
# 准备参数
args = (
fileitem,
target_storage,
target_file,
transfer_type,
self.storage_schemas,
result_queue
)
# 创建进程
process = multiprocessing.Process(
target=_transfer_command_worker,
args=(args,)
)
# 启动进程
process.start()
# 等待进程完成
process.join()
# 检查进程是否正常退出
if process.exitcode != 0:
logger.error(f"文件转移进程异常退出,退出码: {process.exitcode}")
return None, f"文件转移进程异常退出"
# 从队列获取结果
try:
result = result_queue.get(timeout=1)
logger.debug(f"文件转移在子进程中完成: {fileitem.path}")
return result
except Exception as e:
logger.error(f"无法从子进程获取结果:{str(e)}")
return None, f"无法从子进程获取结果:{str(e)}"
else:
# 不支持 fork直接在当前线程执行
logger.debug("当前系统不支持 fork在当前线程中执行文件转移")
return TransHandler.execute_transfer_command(
fileitem, target_storage, source_oper, target_oper,
target_file, transfer_type
)
@staticmethod
def execute_transfer_command(fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str,
) -> Tuple[Optional[FileItem], str]:
"""
处理单个文件
:param fileitem: 源文件
:param target_storage: 目标存储