diff --git a/app/modules/discord/discord.py b/app/modules/discord/discord.py index 435b09dd..d10f953e 100644 --- a/app/modules/discord/discord.py +++ b/app/modules/discord/discord.py @@ -82,6 +82,7 @@ class Discord: self._typing_tasks: Dict[str, asyncio.Task] = {} self._typing_stop_events: Dict[str, asyncio.Event] = {} self._typing_interval_seconds = 5 + self._typing_initial_delay_seconds = 1 self._typing_max_duration_seconds = 5 * 60 self._register_events() @@ -379,6 +380,7 @@ class Discord: userid: Optional[str] = None, chat_id: Optional[str] = None, max_duration_seconds: Optional[float] = None, + initial_delay_seconds: Optional[float] = None, ) -> bool: """ 持续发送 Discord typing 指示,直到显式停止或达到最大续期。 @@ -395,6 +397,7 @@ class Discord: userid=userid, chat_id=chat_id, max_duration_seconds=max_duration_seconds, + initial_delay_seconds=initial_delay_seconds, ), self._loop, ) @@ -438,6 +441,7 @@ class Discord: userid: Optional[str] = None, chat_id: Optional[str] = None, max_duration_seconds: Optional[float] = None, + initial_delay_seconds: Optional[float] = None, ) -> bool: await self._stop_typing_task(typing_key) channel = await self._resolve_channel(userid=userid, chat_id=chat_id) @@ -445,10 +449,26 @@ class Discord: return False stop_event = asyncio.Event() max_duration = max_duration_seconds or self._typing_max_duration_seconds + initial_delay = ( + self._typing_initial_delay_seconds + if initial_delay_seconds is None + else max(initial_delay_seconds, 0) + ) async def _typing_worker() -> None: started_at = self._loop.time() try: + # Discord typing 触发后也会在客户端自然保留一段时间, + # 先给短响应一个取消窗口,避免回复后残留输入状态。 + if initial_delay: + try: + await asyncio.wait_for( + stop_event.wait(), + timeout=initial_delay, + ) + return + except asyncio.TimeoutError: + pass while not stop_event.is_set(): if self._loop.time() - started_at >= max_duration: logger.warning( diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index 4cf39299..9a87a659 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -46,6 +46,7 @@ class Telegram: _typing_stop_flags: Dict[str, threading.Event] = {} # chat_id -> 停止信号 _typing_lock = threading.RLock() _typing_interval_seconds = 5 + _typing_initial_delay_seconds = 1 _typing_max_duration_seconds = 5 * 60 _typing_command_max_duration_seconds = 30 _typing_callback_max_duration_seconds = 60 @@ -340,6 +341,7 @@ class Telegram: self, chat_id: Union[str, int], max_duration_seconds: Optional[float] = None, + initial_delay_seconds: Optional[float] = None, ) -> None: """ 启动持续发送正在输入状态的任务 @@ -351,11 +353,20 @@ class Telegram: # 使用独立 Event 避免同一 chat 新旧 typing 线程互相误改停止标记。 stop_event = threading.Event() max_duration = max_duration_seconds or self._typing_max_duration_seconds + initial_delay = ( + self._typing_initial_delay_seconds + if initial_delay_seconds is None + else max(initial_delay_seconds, 0) + ) def typing_worker(): - """定期发送typing状态的后台线程""" + """延迟首发并定期发送 typing 状态的后台线程。""" started_at = time.monotonic() try: + # Telegram 没有撤销 typing 的接口,短响应先等待一小段时间, + # 避免回复已经发出后客户端仍残留几秒“正在输入”。 + if initial_delay and stop_event.wait(initial_delay): + return while not stop_event.is_set(): if time.monotonic() - started_at >= max_duration: logger.warning( diff --git a/tests/test_message_processing_status.py b/tests/test_message_processing_status.py index ca2edcf2..a16c2391 100644 --- a/tests/test_message_processing_status.py +++ b/tests/test_message_processing_status.py @@ -1,10 +1,13 @@ +import asyncio import json import unittest from types import SimpleNamespace -from unittest.mock import MagicMock, patch +from unittest import IsolatedAsyncioTestCase +from unittest.mock import AsyncMock, MagicMock, patch from app.agent import _finish_processing_status from app.modules.discord import DiscordModule +from app.modules.discord.discord import Discord from app.modules.slack import SlackModule from app.schemas.message import ChannelCapability, ChannelCapabilityManager from app.schemas.types import MessageChannel @@ -144,5 +147,35 @@ class TestMessageProcessingStatus(unittest.TestCase): ) +class TestDiscordTypingLifecycle(IsolatedAsyncioTestCase): + async def test_short_typing_task_can_stop_before_first_trigger(self): + """ + 短响应在首次 Discord typing 触发前结束时,不应留下客户端自然保留的输入状态。 + """ + discord_client = Discord.__new__(Discord) + discord_client._loop = asyncio.get_running_loop() + discord_client._typing_tasks = {} + discord_client._typing_stop_events = {} + discord_client._typing_interval_seconds = 0.01 + discord_client._typing_max_duration_seconds = 1 + channel = MagicMock() + channel.trigger_typing = AsyncMock() + + with patch.object(discord_client, "_resolve_channel", return_value=channel): + started = await discord_client._start_typing_task( + typing_key="chat:30003", + chat_id="30003", + max_duration_seconds=1, + initial_delay_seconds=0.05, + ) + stopped = await discord_client._stop_typing_task("chat:30003") + await asyncio.sleep(0.08) + + self.assertTrue(started) + self.assertTrue(stopped) + channel.trigger_typing.assert_not_called() + self.assertNotIn("chat:30003", discord_client._typing_tasks) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_telegram_typing_lifecycle.py b/tests/test_telegram_typing_lifecycle.py index fafe0546..cab70de3 100644 --- a/tests/test_telegram_typing_lifecycle.py +++ b/tests/test_telegram_typing_lifecycle.py @@ -46,7 +46,11 @@ class TestTelegramTypingLifecycle(unittest.TestCase): def test_start_typing_can_stop_by_chat_id(self): telegram = self._telegram_client() - telegram._start_typing_task("chat-1", max_duration_seconds=1) + telegram._start_typing_task( + "chat-1", + max_duration_seconds=1, + initial_delay_seconds=0, + ) time.sleep(0.03) self.assertIn("chat-1", Telegram._typing_tasks) @@ -58,7 +62,11 @@ class TestTelegramTypingLifecycle(unittest.TestCase): telegram = self._telegram_client() Telegram._user_chat_mapping["10001"] = "chat-2" - telegram._start_typing_task("chat-2", max_duration_seconds=1) + telegram._start_typing_task( + "chat-2", + max_duration_seconds=1, + initial_delay_seconds=0, + ) time.sleep(0.03) self.assertTrue(telegram.stop_typing(userid="10001")) @@ -67,11 +75,32 @@ class TestTelegramTypingLifecycle(unittest.TestCase): def test_typing_task_has_max_duration_guard(self): telegram = self._telegram_client() - telegram._start_typing_task("chat-3", max_duration_seconds=0.02) + telegram._start_typing_task( + "chat-3", + max_duration_seconds=0.02, + initial_delay_seconds=0, + ) time.sleep(0.08) self.assertNotIn("chat-3", Telegram._typing_tasks) + def test_short_typing_task_can_stop_before_first_chat_action(self): + """ + 短响应在首次 typing 发出前结束时,不应留下客户端自然过期的残留状态。 + """ + telegram = self._telegram_client() + + telegram._start_typing_task( + "chat-4", + max_duration_seconds=1, + initial_delay_seconds=0.05, + ) + telegram.stop_typing(chat_id="chat-4") + time.sleep(0.08) + + telegram._bot.send_chat_action.assert_not_called() + self.assertNotIn("chat-4", Telegram._typing_tasks) + def test_agent_managed_send_msg_keeps_typing_for_worker_cleanup(self): telegram = self._telegram_client() sent = SimpleNamespace(message_id=1, chat=SimpleNamespace(id="chat-1"))