From 51981d151e75cc3846089e43f26dd86d9704082c Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 5 Jun 2026 00:01:28 +0800 Subject: [PATCH] feat(workflow): enhance execution state handling for non-JSON serializable values --- app/chain/workflow.py | 17 +++++++++++++- tests/test_workflow_execution.py | 39 ++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/app/chain/workflow.py b/app/chain/workflow.py index a5744f55..7e47590e 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -260,10 +260,25 @@ class WorkflowExecutor: """ 将运行期对象转换为可写入 JSON 列的数据。 """ + if isinstance(value, dict): + return { + str(key): WorkflowExecutor.make_json_safe(item) + for key, item in value.items() + } + if isinstance(value, (list, tuple, set)): + return [WorkflowExecutor.make_json_safe(item) for item in value] try: - return jsonable_encoder(value) + encoded_value = jsonable_encoder(value) except Exception: return str(value) + if isinstance(encoded_value, dict): + return { + str(key): WorkflowExecutor.make_json_safe(item) + for key, item in encoded_value.items() + } + if isinstance(encoded_value, list): + return [WorkflowExecutor.make_json_safe(item) for item in encoded_value] + return encoded_value def execute(self) -> None: """ diff --git a/tests/test_workflow_execution.py b/tests/test_workflow_execution.py index 7da5623a..34faf97f 100644 --- a/tests/test_workflow_execution.py +++ b/tests/test_workflow_execution.py @@ -67,6 +67,15 @@ class _FakeWorkflowManager: return self.contracts.get(action_type) or {} +class _OpaqueValue: + """模拟无法直接 JSON 序列化的值。""" + + __slots__ = () + + def __str__(self): + return "opaque-value" + + def test_workflow_executor_resumes_downstream_nodes(monkeypatch): """恢复执行时应释放已完成节点的后继节点。""" calls = [] @@ -449,6 +458,36 @@ def test_workflow_executor_restores_outputs_from_execution_state(monkeypatch): assert executor.context.node_outputs["A"]["torrents"] == ["movie"] +def test_workflow_executor_keeps_execution_state_dict_for_non_json_leaf(monkeypatch): + """结构化状态遇到不可序列化叶子节点时仍应保持字典结构。""" + calls = [] + states = [] + fake_manager = _FakeWorkflowManager( + calls, + results={ + "A": lambda action, context: ActionResult( + success=True, + message=f"{action.name}完成", + context=context, + outputs={"opaque": _OpaqueValue()} + ) + } + ) + + monkeypatch.setattr(workflow_module, "WorkFlowManager", lambda: fake_manager) + monkeypatch.setattr(workflow_module.global_vars, "workflow_resume", lambda workflow_id: None) + monkeypatch.setattr(workflow_module.global_vars, "is_workflow_stopped", lambda workflow_id: False) + + executor = workflow_module.WorkflowExecutor( + _build_workflow(actions=[{"id": "A", "type": "FakeAction", "name": "动作A", "data": {}}], flows=[]), + step_callback=lambda action, context, execution_state, completed: states.append(execution_state), + ) + executor.execute() + + assert isinstance(states[-1], dict) + assert states[-1]["outputs"]["A"]["opaque"] == "opaque-value" + + def test_workflow_executor_concurrency_key_serializes_parallel_nodes(monkeypatch): """相同 concurrency_key 的并行节点不应同时运行。""" calls = []