From af5eae47242923d09aada4ee699c320286d67b1e Mon Sep 17 00:00:00 2001 From: birdg0 Date: Thu, 26 Mar 2026 22:57:54 +0800 Subject: [PATCH] feat: add receive_timeout parameter to Connection class - Add optional receive_timeout parameter to Connection.__init__ - Implement timeout handling in _receive_loop using asyncio.wait_for - Raise RequestError.internal_error on timeout for graceful error handling This allows users to configure a timeout for receiving messages from agents, preventing indefinite hangs when an agent becomes unresponsive. --- src/acp/connection.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/acp/connection.py b/src/acp/connection.py index aca1c19..cd8e9ee 100644 --- a/src/acp/connection.py +++ b/src/acp/connection.py @@ -73,6 +73,7 @@ def __init__( sender_factory: SenderFactory | None = None, observers: list[StreamObserver] | None = None, listening: bool = True, + receive_timeout: float | None = None, ) -> None: self._handler = handler self._writer = writer @@ -102,6 +103,7 @@ def __init__( ) self._dispatcher.start() self._observers: list[StreamObserver] = list(observers or []) + self._receive_timeout = receive_timeout async def close(self) -> None: """Stop the receive loop and cancel any in-flight handler tasks.""" @@ -148,7 +150,7 @@ async def send_notification(self, method: str, params: JsonValue | None = None) async def _receive_loop(self) -> None: try: while True: - line = await self._reader.readline() + line = await asyncio.wait_for(self._reader.readline(), timeout=self._receive_timeout) if not line: break try: @@ -160,6 +162,8 @@ async def _receive_loop(self) -> None: await self._process_message(message) except asyncio.CancelledError: return + except asyncio.TimeoutError: + raise RequestError.internal_error({"details": "Agent timeout"}) from None async def _process_message(self, message: dict[str, Any]) -> None: method = message.get("method")