diff --git a/app/modules/filemanager/storages/smb.py b/app/modules/filemanager/storages/smb.py index 584c53d4..6924f62d 100644 --- a/app/modules/filemanager/storages/smb.py +++ b/app/modules/filemanager/storages/smb.py @@ -1,4 +1,3 @@ -import signal import threading import time from pathlib import Path @@ -18,51 +17,6 @@ from app.utils.singleton import WeakSingleton lock = threading.Lock() -# 全局SMB实例列表,用于信号处理 -_smb_instances = [] - - -def _signal_handler(signum, frame): - """ - 信号处理器,用于在进程终止时清理SMB连接 - """ - logger.info(f"【SMB】收到信号 {signum},开始清理SMB连接...") - for smb_instance in _smb_instances: - try: - if hasattr(smb_instance, '_cleanup_connection'): - smb_instance._cleanup_connection() - except Exception as e: - logger.debug(f"【SMB】清理实例时出错: {e}") - - -# 注册信号处理器 -signal.signal(signal.SIGTERM, _signal_handler) -signal.signal(signal.SIGINT, _signal_handler) - - -def _safe_smb_operation(operation_func, *args, **kwargs): - """ - 安全地执行SMB操作,处理连接错误和线程问题 - """ - max_retries = 3 - retry_count = 0 - - while retry_count < max_retries: - try: - return operation_func(*args, **kwargs) - except (SMBResponseException, SMBException) as e: - retry_count += 1 - logger.warning(f"【SMB】操作失败,尝试重试 ({retry_count}/{max_retries}): {e}") - if retry_count >= max_retries: - logger.error(f"【SMB】操作失败,已达到最大重试次数: {e}") - raise - # 短暂等待后重试 - import time - time.sleep(1) - except Exception as e: - logger.error(f"【SMB】操作时发生未知错误: {e}") - raise - class SMBConnectionError(Exception): """ @@ -88,21 +42,6 @@ class SMB(StorageBase, metaclass=WeakSingleton): # 文件块大小,默认10MB chunk_size = 10 * 1024 * 1024 - def __enter__(self): - """ - 上下文管理器入口 - """ - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """ - 上下文管理器出口,确保连接被正确清理 - """ - try: - self._cleanup_connection() - except Exception as e: - logger.debug(f"【SMB】上下文管理器清理连接失败: {e}") - def __init__(self): super().__init__() self._connected = False @@ -110,10 +49,7 @@ class SMB(StorageBase, metaclass=WeakSingleton): self._host = None self._username = None self._password = None - - # 注册到全局实例列表 - _smb_instances.append(self) - + self._init_connection() def _init_connection(self): @@ -179,8 +115,8 @@ class SMB(StorageBase, metaclass=WeakSingleton): 测试SMB连接 """ try: - # 使用安全操作包装器测试连接 - _safe_smb_operation(smbclient.listdir, self._server_path) + # 尝试列出根目录来测试连接 + smbclient.listdir(self._server_path) except SMBAuthenticationError as e: raise SMBConnectionError(f"SMB认证失败:{e}") except SMBResponseException as e: @@ -202,13 +138,6 @@ class SMB(StorageBase, metaclass=WeakSingleton): """ if not self._connected or not self._server_path: raise SMBConnectionError("【SMB】连接未建立或已断开,请检查配置!") - - # 尝试重新连接如果连接已断开 - try: - self._test_connection() - except Exception as e: - logger.warning(f"【SMB】连接检查失败,尝试重新连接: {e}") - self._reconnect() def _normalize_path(self, path: Union[str, Path]) -> str: """ @@ -305,13 +234,6 @@ class SMB(StorageBase, metaclass=WeakSingleton): except Exception as e: logger.debug(f"【SMB】连接检查失败:{e}") self._connected = False - # 尝试重新连接 - try: - self._reconnect() - if self._connected: - return True - except Exception as reconnect_error: - logger.debug(f"【SMB】重新连接失败:{reconnect_error}") return False def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]: @@ -332,11 +254,12 @@ class SMB(StorageBase, metaclass=WeakSingleton): # 列出目录内容 try: - entries = _safe_smb_operation(smbclient.listdir, smb_path) - except Exception as e: + entries = smbclient.listdir(smb_path) + except SMBResponseException as e: + logger.error(f"【SMB】列出目录失败: {smb_path} - {e}") + return [] + except SMBException as e: logger.error(f"【SMB】列出目录失败: {smb_path} - {e}") - # 尝试重新连接 - self._reconnect() return [] items = [] @@ -356,8 +279,6 @@ class SMB(StorageBase, metaclass=WeakSingleton): return items except Exception as e: logger.error(f"【SMB】列出文件失败: {e}") - # 尝试重新连接 - self._reconnect() return [] def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]: @@ -740,51 +661,7 @@ class SMB(StorageBase, metaclass=WeakSingleton): 析构函数,清理连接 """ try: - # 从全局实例列表中移除 - if self in _smb_instances: - _smb_instances.remove(self) - - # smbclient 自动管理连接池,但我们可以重置缓存 - if hasattr(self, '_connected') and self._connected: - self._cleanup_connection() + if self._connected: + reset_connection_cache() except Exception as e: logger.debug(f"【SMB】清理连接失败: {e}") - - def _reconnect(self): - """ - 重新建立SMB连接 - """ - try: - logger.info("【SMB】尝试重新连接...") - - # 清理现有连接 - self._cleanup_connection() - - # 重新初始化连接 - self._init_connection() - - if self._connected: - logger.info("【SMB】重新连接成功") - else: - logger.error("【SMB】重新连接失败") - - except Exception as e: - logger.error(f"【SMB】重新连接过程中出现错误: {e}") - self._connected = False - - def _cleanup_connection(self): - """ - 安全地清理SMB连接 - """ - try: - # 标记连接为断开状态 - self._connected = False - - # 重置连接缓存,这会清理所有连接 - reset_connection_cache() - - logger.debug("【SMB】连接清理完成") - except Exception as e: - logger.debug(f"【SMB】连接清理过程中出现错误: {e}") - # 即使清理失败,也要确保连接状态被重置 - self._connected = False diff --git a/app/modules/filemanager/transhandler.py b/app/modules/filemanager/transhandler.py index 0a8967c9..66366a63 100644 --- a/app/modules/filemanager/transhandler.py +++ b/app/modules/filemanager/transhandler.py @@ -1,5 +1,3 @@ -import multiprocessing -import os import re from pathlib import Path from threading import Lock @@ -24,75 +22,6 @@ from app.utils.system import SystemUtils lock = Lock() -def _transfer_command_worker(args): - """ - 在子进程中执行文件转移命令的工作函数 - :param args: 包含所有必要参数的元组 - """ - # 解包参数 - (fileitem, target_storage, target_file, transfer_type, storage_schemas, result_queue) = args - - def __get_storage_oper(_storage: str, _func: Optional[str] = None) -> Optional[StorageBase]: - """ - 获取存储操作对象 - """ - for storage_schema in storage_schemas: - if storage_schema.schema \ - and storage_schema.schema.value == _storage \ - and (not _func or hasattr(storage_schema, _func)): - return storage_schema() - return None - - try: - # 获取存储操作对象 - source_oper = __get_storage_oper(fileitem.storage) - target_oper = __get_storage_oper(target_storage) - - if not source_oper or not target_oper: - result_queue.put((None, f"无法创建存储操作对象: source={fileitem.storage}, target={target_storage}")) - return - - # 执行原有的转移逻辑 - result = TransHandler.execute_transfer_command( - fileitem, target_storage, source_oper, target_oper, - target_file, transfer_type - ) - - # 将结果放入队列 - result_queue.put(result) - - except Exception as e: - result_queue.put((None, str(e))) - - -def _supports_fork(): - """ - 检测当前系统是否支持 fork - :return: True 如果支持 fork,False 否则 - """ - try: - # Windows 不支持 fork - if os.name == 'nt': - return False - - # 检查可用的启动方法 - available_methods = multiprocessing.get_all_start_methods() - if 'fork' not in available_methods: - return False - - # 尝试设置 fork 方法 - try: - multiprocessing.set_start_method('fork', force=True) - return True - except RuntimeError: - # 如果已经设置过启动方法,检查当前方法 - current_method = multiprocessing.get_start_method() - return current_method == 'fork' - except Exception as e: - logger.error(f"检测 fork 支持时出错: {str(e)}") - return False - - class TransHandler: """ 文件转移整理类 @@ -100,8 +29,7 @@ class TransHandler: inner_lock: Lock = Lock() - def __init__(self, storage_schemas: List[StorageBase]): - self.storage_schemas = storage_schemas + def __init__(self): self.result = None def __reset_result(self): @@ -382,75 +310,12 @@ class TransHandler: finally: self.result = None - def __transfer_command(self, fileitem: FileItem, target_storage: str, + @staticmethod + def __transfer_command(fileitem: FileItem, target_storage: str, source_oper: StorageBase, target_oper: StorageBase, target_file: Path, transfer_type: str, ) -> Tuple[Optional[FileItem], str]: """ - 处理单个文件,支持在 fork 进程中运行以提高内存回收 - :param fileitem: 源文件 - :param target_storage: 目标存储 - :param source_oper: 源存储操作对象 - :param target_oper: 目标存储操作对象 - :param target_file: 目标文件路径 - :param transfer_type: 整理方式 - """ - # 检查是否支持 fork - if _supports_fork(): - logger.info(f"[PID:{os.getpid()}] 在新的进程中整理文件: {fileitem.path}") - - # 创建队列用于进程间通信 - result_queue = multiprocessing.Queue() - - # 准备参数 - args = ( - fileitem, - target_storage, - target_file, - transfer_type, - self.storage_schemas, - result_queue - ) - - # 创建进程 - process = multiprocessing.Process( - target=_transfer_command_worker, - args=(args,) - ) - - # 启动进程 - process.start() - - # 等待进程完成 - process.join() - - # 检查进程是否正常退出 - if process.exitcode != 0: - logger.error(f"文件转移进程异常退出,退出码: {process.exitcode}") - return None, f"文件转移进程异常退出" - - # 从队列获取结果 - try: - result = result_queue.get(timeout=1) - logger.debug(f"文件转移在子进程中完成: {fileitem.path}") - return result - except Exception as e: - logger.error(f"无法从子进程获取结果:{str(e)}") - return None, f"无法从子进程获取结果:{str(e)}" - else: - # 不支持 fork,直接在当前线程执行 - logger.debug("当前系统不支持 fork,在当前线程中执行文件转移") - return TransHandler.execute_transfer_command( - fileitem, target_storage, source_oper, target_oper, - target_file, transfer_type - ) - - @staticmethod - def execute_transfer_command(fileitem: FileItem, target_storage: str, - source_oper: StorageBase, target_oper: StorageBase, - target_file: Path, transfer_type: str, - ) -> Tuple[Optional[FileItem], str]: - """ 处理单个文件 :param fileitem: 源文件 :param target_storage: 目标存储