feat(feishu): enhance message handling with file and voice support, add reaction management

This commit is contained in:
jxxghp
2026-05-13 00:34:03 +08:00
parent 0989439d25
commit d86d24fc4f
10 changed files with 432 additions and 41 deletions

View File

@@ -211,6 +211,24 @@ class StreamingHandler:
# 执行最后一次刷新
await self._flush()
message_response = self._message_response
if (
message_response
and message_response.channel == MessageChannel.Feishu
and isinstance(message_response.metadata, dict)
):
stream_meta = message_response.metadata.get("feishu_streaming") or {}
card_id = str(stream_meta.get("card_id") or "").strip()
sequence = int(stream_meta.get("sequence") or 1) + 1
if card_id:
await run_in_threadpool(
_StreamChain().run_module,
"close_feishu_streaming_card",
card_id=card_id,
sequence=sequence,
source=message_response.source,
)
# 检查是否所有缓冲内容都已发送
with self._lock:
# 当前消息的文本 = buffer 中从 _msg_start_offset 开始的部分
@@ -535,6 +553,7 @@ class StreamingHandler:
chat_id=self._message_response.chat_id,
text=current_text,
title=self._title,
metadata=self._message_response.metadata,
)
if success:
with self._lock:

View File

@@ -1548,6 +1548,7 @@ class ChainBase(metaclass=ABCMeta):
text: str,
title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""
编辑已发送的消息
@@ -1569,6 +1570,7 @@ class ChainBase(metaclass=ABCMeta):
text=text,
title=title,
buttons=buttons,
metadata=metadata,
)
def send_direct_message(self, message: Notification) -> Optional[MessageResponse]:

View File

@@ -159,29 +159,25 @@ class MessageChain(ChainBase):
text=text,
)
processing_marker = self._mark_message_processing_started(
self._mark_message_processing_started(
channel=channel,
source=source,
original_message_id=original_message_id,
text=text,
)
try:
self._handle_message_core(
channel=channel,
source=source,
userid=userid,
username=username,
text=text,
original_message_id=original_message_id,
original_chat_id=original_chat_id,
images=images,
audio_refs=audio_refs,
files=files,
has_audio_input=has_audio_input,
)
finally:
self._mark_message_processing_finished(processing_marker)
self._handle_message_core(
channel=channel,
source=source,
userid=userid,
username=username,
text=text,
original_message_id=original_message_id,
original_chat_id=original_chat_id,
images=images,
audio_refs=audio_refs,
files=files,
has_audio_input=has_audio_input,
)
def _handle_message_core(
self,
@@ -341,20 +337,6 @@ class MessageChain(ChainBase):
reaction_id=str(reaction_id),
)
def _mark_message_processing_finished(
self,
marker: Optional[_ProcessingMarker],
) -> None:
"""清理渠道“消息正在处理”标记。"""
if not marker:
return
self.run_module(
"delete_feishu_message_reaction",
message_id=marker.message_id,
reaction_id=marker.reaction_id,
source=marker.source,
)
def _handle_callback(
self,
text: str,

View File

@@ -440,6 +440,7 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]):
text: str,
title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
metadata: Optional[dict] = None,
) -> bool:
"""
编辑消息

View File

@@ -160,6 +160,7 @@ class FeishuModule(_ModuleBase, _MessageBase[Feishu]):
text: str,
title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
metadata: Optional[dict] = None,
) -> bool:
if channel != self._channel:
return False
@@ -172,6 +173,7 @@ class FeishuModule(_ModuleBase, _MessageBase[Feishu]):
title=title,
text=text,
buttons=buttons,
metadata=metadata,
):
return True
return False
@@ -218,6 +220,7 @@ class FeishuModule(_ModuleBase, _MessageBase[Feishu]):
chat_id=result.get("chat_id"),
channel=MessageChannel.Feishu,
source=conf.name,
metadata=result.get("metadata"),
success=True,
)
return None
@@ -231,8 +234,22 @@ class FeishuModule(_ModuleBase, _MessageBase[Feishu]):
client = self.get_instance(client_config.name)
if not client:
return None
image_key = image_ref.replace("feishu://image/", "", 1)
downloaded = client._download_image_bytes(image_key)
resource_path = image_ref.replace("feishu://image/", "", 1)
message_id = None
image_key = resource_path
if "/" in resource_path:
message_id, image_key = resource_path.split("/", 1)
message_id = message_id.strip() or None
image_key = image_key.strip()
downloaded = None
if message_id:
downloaded = client._download_message_resource_bytes(
message_id=message_id,
file_key=image_key,
resource_type="image",
)
if not downloaded:
downloaded = client._download_image_bytes(image_key)
if not downloaded:
return None
content, _, content_type = downloaded
@@ -287,3 +304,17 @@ class FeishuModule(_ModuleBase, _MessageBase[Feishu]):
if not client:
return False
return client.delete_message_reaction(message_id=message_id, reaction_id=reaction_id)
def close_feishu_streaming_card(
self,
card_id: str,
sequence: int,
source: str,
) -> bool:
client_config = self.get_config(source)
if not client_config:
return False
client = self.get_instance(client_config.name)
if not client:
return False
return client.close_streaming_card(card_id=card_id, sequence=sequence)

View File

@@ -9,6 +9,14 @@ from typing import Any, Dict, List, Optional, Tuple
import lark_oapi as lark
import lark_oapi.ws.client as lark_ws_client_module
from lark_oapi.api.cardkit.v1 import (
ContentCardElementRequest,
ContentCardElementRequestBody,
CreateCardRequest,
CreateCardRequestBody,
SettingsCardRequest,
SettingsCardRequestBody,
)
from lark_oapi.api.im.v1 import (
CreateFileRequest,
CreateFileRequestBody,
@@ -24,6 +32,7 @@ from lark_oapi.api.im.v1 import (
GetMessageResourceRequest,
PatchMessageRequest,
PatchMessageRequestBody,
P2ImMessageMessageReadV1,
P2ImMessageReceiveV1,
ReplyMessageRequest,
ReplyMessageRequestBody,
@@ -40,7 +49,7 @@ from app.core.config import settings
from app.core.context import Context, MediaInfo
from app.log import logger
from app.schemas import CommingMessage, Notification
from app.schemas.types import MessageChannel
from app.schemas.types import MessageChannel, NotificationType
from app.utils.http import RequestUtils
@@ -48,6 +57,8 @@ class Feishu:
"""飞书通知客户端,负责长连接收消息与主动发送通知。"""
PROCESSING_REACTION_EMOJI = "GLANCE"
STREAM_CARD_TITLE_ELEMENT_ID = "mp_stream_title"
STREAM_CARD_BODY_ELEMENT_ID = "mp_stream_body"
def __init__(
self,
@@ -107,6 +118,7 @@ class Feishu:
level=LogLevel.INFO,
)
builder.register_p2_im_message_receive_v1(self._on_message)
builder.register_p2_im_message_message_read_v1(self._on_message_read)
builder.register_p2_card_action_trigger(self._on_card_action)
return builder.build()
@@ -202,8 +214,12 @@ class Feishu:
if message_type == "image":
image_key = str(content.get("image_key") or "").strip()
message_id = str(getattr(message, "message_id", None) or "").strip()
if image_key:
images = [CommingMessage.MessageImage(ref=f"feishu://image/{image_key}")]
if message_id:
images = [CommingMessage.MessageImage(ref=f"feishu://image/{message_id}/{image_key}")]
else:
images = [CommingMessage.MessageImage(ref=f"feishu://image/{image_key}")]
elif message_type in {"audio", "media", "file"}:
file_key = str(content.get("file_key") or "").strip()
file_name = str(content.get("file_name") or "").strip() or None
@@ -332,6 +348,17 @@ class Feishu:
}
)
@staticmethod
def _on_message_read(data: P2ImMessageMessageReadV1) -> None:
"""忽略消息已读事件,避免长连接打印未注册处理器错误。"""
event = getattr(data, "event", None)
reader = getattr(event, "reader", None)
logger.debug(
"收到飞书消息已读事件reader=%s, message_count=%s",
getattr(reader, "open_id", None) or getattr(reader, "user_id", None),
len(getattr(event, "message_id_list", None) or []),
)
def get_state(self) -> bool:
"""返回飞书客户端是否已就绪。"""
return self._ready.is_set() and self._api_client is not None
@@ -548,6 +575,171 @@ class Feishu:
"elements": elements,
}
def _build_streaming_card_payload(
self,
title: Optional[str],
text: Optional[str],
) -> Dict[str, Any]:
"""构建支持 CardKit 流式更新的飞书卡片 JSON 2.0。"""
elements: List[dict] = []
title_content = self._escape_card_text(title).strip() if title else ""
if title_content:
elements.append(
{
"tag": "markdown",
"element_id": self.STREAM_CARD_TITLE_ELEMENT_ID,
"content": f"**{title_content}**",
}
)
elements.append(
{
"tag": "markdown",
"element_id": self.STREAM_CARD_BODY_ELEMENT_ID,
"content": self._escape_card_text(text).strip() or " ",
}
)
return {
"schema": "2.0",
"config": {
"wide_screen_mode": True,
"enable_forward": True,
"update_multi": True,
"streaming_mode": True,
"summary": {
"content": title or "MoviePilot助手",
},
"streaming_config": {
"print_frequency_ms": {"default": 70},
"print_step": {"default": 1},
"print_strategy": "fast",
},
},
"body": {
"direction": "vertical",
"padding": "12px 12px 12px 12px",
"elements": elements,
},
}
def _create_streaming_card(self, title: Optional[str], text: Optional[str]) -> Optional[str]:
if not self._api_client:
return None
response = self._api_client.cardkit.v1.card.create(
CreateCardRequest.builder()
.request_body(
CreateCardRequestBody.builder()
.type("card_json")
.data(json.dumps(self._build_streaming_card_payload(title=title, text=text), ensure_ascii=False))
.build()
)
.build()
)
if response.success():
data = getattr(response, "data", None)
return getattr(data, "card_id", None)
logger.error(
"飞书流式卡片创建失败code=%s, msg=%s, log_id=%s",
response.code,
response.msg,
response.get_log_id(),
)
return None
def _send_streaming_card_message(
self,
title: Optional[str],
text: Optional[str],
userid: Optional[str] = None,
chat_id: Optional[str] = None,
receive_id_type: Optional[str] = None,
) -> Optional[dict]:
card_id = self._create_streaming_card(title=title, text=text)
if not card_id:
return None
receive_id, resolved_receive_id_type = self._resolve_target(
userid=userid,
chat_id=chat_id,
receive_id_type=receive_id_type,
)
result = self._send_message(
receive_id,
resolved_receive_id_type,
"interactive",
{"type": "card", "data": {"card_id": card_id}},
)
if not result:
return None
result["metadata"] = {
"feishu_streaming": {
"card_id": card_id,
"element_id": self.STREAM_CARD_BODY_ELEMENT_ID,
"sequence": 1,
}
}
return result
def _update_streaming_card_content(
self,
card_id: str,
element_id: str,
content: str,
sequence: int,
) -> bool:
if not self._api_client:
return False
response = self._api_client.cardkit.v1.card_element.content(
ContentCardElementRequest.builder()
.card_id(card_id)
.element_id(element_id)
.request_body(
ContentCardElementRequestBody.builder()
.uuid(str(uuid.uuid4()))
.content(content or " ")
.sequence(sequence)
.build()
)
.build()
)
if response.success():
return True
logger.error(
"飞书流式卡片内容更新失败card_id=%s, element_id=%s, sequence=%s, code=%s, msg=%s, log_id=%s",
card_id,
element_id,
sequence,
response.code,
response.msg,
response.get_log_id(),
)
return False
def close_streaming_card(self, card_id: str, sequence: int) -> bool:
if not self._api_client or not card_id:
return False
response = self._api_client.cardkit.v1.card.settings(
SettingsCardRequest.builder()
.card_id(card_id)
.request_body(
SettingsCardRequestBody.builder()
.settings(json.dumps({"config": {"streaming_mode": False}}, ensure_ascii=False))
.uuid(str(uuid.uuid4()))
.sequence(sequence)
.build()
)
.build()
)
if response.success():
return True
logger.error(
"飞书关闭流式卡片失败card_id=%s, sequence=%s, code=%s, msg=%s, log_id=%s",
card_id,
sequence,
response.code,
response.msg,
response.get_log_id(),
)
return False
def _send_message(self, receive_id: str, receive_id_type: str, msg_type: str, content: dict) -> Optional[dict]:
"""调用飞书 IM API 发送消息,并返回统一结果结构。"""
if not self._api_client:
@@ -915,6 +1107,29 @@ class Feishu:
original_message_id: Optional[str] = None,
) -> Optional[dict]:
"""发送通知消息,优先使用交互卡片承载按钮。"""
is_streaming_agent_text = (
message.mtype == NotificationType.Agent
and not message.buttons
and not message.link
and not original_message_id
)
if is_streaming_agent_text:
try:
result = self._send_streaming_card_message(
title=message.title,
text=message.text,
userid=userid,
chat_id=chat_id,
receive_id_type=receive_id_type,
)
except Exception as err:
logger.error(f"飞书流式卡片发送失败:{err}")
return {"success": False}
if not result:
return {"success": False}
result["chat_id"] = result.get("chat_id") or chat_id or self._user_chat_mapping.get(userid or "") or self._default_chat_id
return result
payload = self._build_card(
title=message.title,
text=message.text,
@@ -949,11 +1164,25 @@ class Feishu:
result["chat_id"] = result.get("chat_id") or chat_id or self._user_chat_mapping.get(userid or "") or self._default_chat_id
return result
def edit_message(self, message_id: str, title: Optional[str] = None, text: Optional[str] = None, buttons: Optional[List[List[dict]]] = None) -> bool:
def edit_message(self, message_id: str, title: Optional[str] = None, text: Optional[str] = None, buttons: Optional[List[List[dict]]] = None, metadata: Optional[dict] = None) -> bool:
"""编辑已发送的飞书交互卡片消息。"""
if not self._api_client:
return False
stream_meta = (metadata or {}).get("feishu_streaming") if isinstance(metadata, dict) else None
if isinstance(stream_meta, dict) and not buttons:
card_id = str(stream_meta.get("card_id") or "").strip()
element_id = str(stream_meta.get("element_id") or self.STREAM_CARD_BODY_ELEMENT_ID).strip()
sequence = int(stream_meta.get("sequence") or 1) + 1
if card_id and element_id and self._update_streaming_card_content(
card_id=card_id,
element_id=element_id,
content=self._escape_card_text(text).strip() or " ",
sequence=sequence,
):
stream_meta["sequence"] = sequence
return True
card = self._build_card(title=title, text=text, link=None, buttons=buttons)
try:
response = self._api_client.im.v1.message.patch(

View File

@@ -558,6 +558,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
text: str,
title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
metadata: Optional[dict] = None,
) -> bool:
"""
编辑消息

View File

@@ -565,6 +565,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
text: str,
title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
metadata: Optional[dict] = None,
) -> bool:
"""
编辑消息

View File

@@ -20,6 +20,8 @@ class MessageResponse(BaseModel):
channel: Optional[MessageChannel] = None
# 消息来源
source: Optional[str] = None
# 渠道自定义上下文(如飞书流式卡片 card_id/element_id/sequence
metadata: Optional[Dict[str, Any]] = None
# 是否发送成功
success: bool = False

View File

@@ -22,7 +22,7 @@ from app.modules.feishu import FeishuModule
from app.modules.feishu.feishu import Feishu
from app.schemas import Notification
from app.schemas.message import ChannelCapability, ChannelCapabilityManager
from app.schemas.types import MessageChannel
from app.schemas.types import MessageChannel, NotificationType
class TestFeishu(unittest.TestCase):
@@ -57,7 +57,14 @@ class TestFeishu(unittest.TestCase):
return response
@staticmethod
def _build_message_api(create_response=None, patch_response=None, reply_response=None, reaction_create_response=None, reaction_delete_response=None, image_create_response=None, file_create_response=None, image_get_response=None, file_get_response=None, message_resource_response=None):
def _card_create_success_response(card_id="card_test"):
response = MagicMock()
response.success.return_value = True
response.data = SimpleNamespace(card_id=card_id)
return response
@staticmethod
def _build_message_api(create_response=None, patch_response=None, reply_response=None, reaction_create_response=None, reaction_delete_response=None, card_create_response=None, card_settings_response=None, card_content_response=None, image_create_response=None, file_create_response=None, image_get_response=None, file_get_response=None, message_resource_response=None):
message_api = SimpleNamespace(
create=MagicMock(return_value=create_response),
patch=MagicMock(return_value=patch_response),
@@ -88,7 +95,18 @@ class TestFeishu(unittest.TestCase):
file=file_api,
message_resource=message_resource_api,
)
)
),
cardkit=SimpleNamespace(
v1=SimpleNamespace(
card=SimpleNamespace(
create=MagicMock(return_value=card_create_response),
settings=MagicMock(return_value=card_settings_response),
),
card_element=SimpleNamespace(
content=MagicMock(return_value=card_content_response),
),
)
),
)
return api_client, message_api
@@ -256,6 +274,74 @@ class TestFeishu(unittest.TestCase):
self.assertEqual(delete_request.message_id, "om_origin")
self.assertEqual(delete_request.reaction_id, "reaction_1")
def test_send_notification_uses_streaming_card_for_agent_text(self):
client = self._build_client()
client._api_client, message_api = self._build_message_api(
create_response=self._success_response(message_id="om_stream", chat_id="oc_stream"),
card_create_response=self._card_create_success_response("card_stream"),
)
result = client.send_notification(
Notification(
mtype=NotificationType.Agent,
title="MoviePilot助手",
text="第一帧内容",
),
userid="ou_user_stream",
)
self.assertTrue(result["success"])
self.assertEqual(result["metadata"]["feishu_streaming"]["card_id"], "card_stream")
card_request = client._api_client.cardkit.v1.card.create.call_args.args[0]
self.assertEqual(card_request.request_body.type, "card_json")
card_payload = json.loads(card_request.request_body.data)
self.assertTrue(card_payload["config"]["streaming_mode"])
self.assertEqual(card_payload["body"]["elements"][-1]["element_id"], Feishu.STREAM_CARD_BODY_ELEMENT_ID)
message_request = message_api.create.call_args.args[0]
self.assertEqual(message_request.request_body.msg_type, "interactive")
self.assertEqual(json.loads(message_request.request_body.content)["data"]["card_id"], "card_stream")
def test_edit_message_uses_cardkit_content_for_streaming_card(self):
client = self._build_client()
client._api_client, message_api = self._build_message_api(
patch_response=self._success_response(),
card_content_response=self._success_response(),
)
success = client.edit_message(
message_id="om_stream",
text="第二帧内容",
metadata={
"feishu_streaming": {
"card_id": "card_stream",
"element_id": Feishu.STREAM_CARD_BODY_ELEMENT_ID,
"sequence": 1,
}
},
)
self.assertTrue(success)
client._api_client.cardkit.v1.card_element.content.assert_called_once()
message_api.patch.assert_not_called()
content_request = client._api_client.cardkit.v1.card_element.content.call_args.args[0]
self.assertEqual(content_request.card_id, "card_stream")
self.assertEqual(content_request.element_id, Feishu.STREAM_CARD_BODY_ELEMENT_ID)
self.assertEqual(content_request.request_body.sequence, 2)
def test_close_streaming_card_updates_card_settings(self):
client = self._build_client()
client._api_client, _ = self._build_message_api(
card_settings_response=self._success_response(),
)
success = client.close_streaming_card(card_id="card_stream", sequence=3)
self.assertTrue(success)
settings_request = client._api_client.cardkit.v1.card.settings.call_args.args[0]
self.assertEqual(settings_request.card_id, "card_stream")
settings_payload = json.loads(settings_request.request_body.settings)
self.assertFalse(settings_payload["config"]["streaming_mode"])
def test_parse_message_supports_image_and_file_payloads(self):
client = self._build_client()
@@ -290,6 +376,24 @@ class TestFeishu(unittest.TestCase):
self.assertEqual(image_message.images[0].ref, "feishu://image/img_v2_test")
self.assertEqual(file_message.files[0].ref, "feishu://file/file_key/report.pdf")
def test_on_message_wraps_feishu_image_ref_with_message_id(self):
client = self._build_client()
message = SimpleNamespace(
message_id="om_img_evt",
chat_id="oc_chat_evt",
chat_type="p2p",
message_type="image",
content=json.dumps({"image_key": "img_v2_evt"}),
)
sender = SimpleNamespace(sender_id=SimpleNamespace(open_id="ou_user_evt", user_id=None))
event = SimpleNamespace(sender=sender, message=message)
with patch.object(client, "_forward_to_message_chain") as forward:
client._on_message(SimpleNamespace(event=event))
payload = forward.call_args.args[0]
self.assertEqual(payload["images"][0]["ref"], "feishu://image/om_img_evt/img_v2_evt")
def test_feishu_channel_capabilities_enable_images_and_files(self):
self.assertTrue(
ChannelCapabilityManager.supports_capability(
@@ -451,15 +555,21 @@ class TestFeishu(unittest.TestCase):
client = MagicMock()
client._download_image_bytes.return_value = (b"image", "poster.png", "image/png")
client._download_file_bytes.return_value = (b"file", "note.txt", "text/plain")
client._download_message_resource_bytes.return_value = (b"image", "poster.png", "image/png")
with patch.object(module, "get_config", return_value=SimpleNamespace(name="feishu-main")), patch.object(
module, "get_instance", return_value=client
):
data_url = module.download_feishu_image_to_data_url("feishu://image/img_v2_xxx", "feishu-main")
data_url = module.download_feishu_image_to_data_url("feishu://image/om_msg/img_v2_xxx", "feishu-main")
file_bytes = module.download_feishu_file_bytes("feishu://file/file_xxx/note.txt", "feishu-main")
self.assertTrue(data_url.startswith("data:image/png;base64,"))
self.assertEqual(file_bytes, b"file")
client._download_message_resource_bytes.assert_called_once_with(
message_id="om_msg",
file_key="img_v2_xxx",
resource_type="image",
)
def test_module_message_reaction_helpers_delegate_to_client(self):
module = FeishuModule()
@@ -476,6 +586,19 @@ class TestFeishu(unittest.TestCase):
self.assertEqual(reaction_id, "reaction_2")
self.assertTrue(deleted)
def test_module_close_streaming_card_delegates_to_client(self):
module = FeishuModule()
client = MagicMock()
client.close_streaming_card.return_value = True
with patch.object(module, "get_config", return_value=SimpleNamespace(name="feishu-main")), patch.object(
module, "get_instance", return_value=client
):
success = module.close_feishu_streaming_card("card_stream", 4, "feishu-main")
self.assertTrue(success)
client.close_streaming_card.assert_called_once_with(card_id="card_stream", sequence=4)
def test_module_post_message_prefers_file_and_voice_paths(self):
module = FeishuModule()
conf = SimpleNamespace(name="feishu-main")