From 840b2f1087df6a8c63f1b9b6b23f54caaf06af57 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Tue, 24 Mar 2026 11:22:56 -0600 Subject: [PATCH 1/3] Fix flaky tests --- tests/durabletask/test_orchestration_async_e2e.py | 5 ++++- tests/durabletask/test_orchestration_e2e.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/durabletask/test_orchestration_async_e2e.py b/tests/durabletask/test_orchestration_async_e2e.py index 3a2b24bc..e1c70a3b 100644 --- a/tests/durabletask/test_orchestration_async_e2e.py +++ b/tests/durabletask/test_orchestration_async_e2e.py @@ -128,10 +128,13 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert state is not None assert state.runtime_status == client.OrchestrationStatus.SUSPENDED + # Small delay to ensure the suspension is fully enforced + await asyncio.sleep(1) + # Raise an event and confirm that it does NOT complete while suspended await c.raise_orchestration_event(id, "my_event", data=42) try: - state = await c.wait_for_orchestration_completion(id, timeout=3) + state = await c.wait_for_orchestration_completion(id, timeout=5) assert False, "Orchestration should not have completed" except TimeoutError: pass diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index a6f670ce..ad04392d 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -269,10 +269,13 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert state is not None assert state.runtime_status == client.OrchestrationStatus.SUSPENDED + # Small delay to ensure the suspension is fully enforced + time.sleep(1) + # Raise an event to the orchestration and confirm that it does NOT complete task_hub_client.raise_orchestration_event(id, "my_event", data=42) try: - state = task_hub_client.wait_for_orchestration_completion(id, timeout=3) + state = task_hub_client.wait_for_orchestration_completion(id, timeout=5) assert False, "Orchestration should not have completed" except TimeoutError: pass From e3209207721d1e591c6db985afc1e55feba14b37 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Tue, 24 Mar 2026 11:42:57 -0600 Subject: [PATCH 2/3] Real fix --- durabletask/testing/in_memory_backend.py | 9 ++++++++- tests/durabletask/test_orchestration_async_e2e.py | 5 +---- tests/durabletask/test_orchestration_e2e.py | 5 +---- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/durabletask/testing/in_memory_backend.py b/durabletask/testing/in_memory_backend.py index 590688ad..e98e8e15 100644 --- a/durabletask/testing/in_memory_backend.py +++ b/durabletask/testing/in_memory_backend.py @@ -309,7 +309,14 @@ def RaiseEvent(self, request: pb.RaiseEventRequest, context): ) instance.pending_events.append(event) instance.last_updated_at = datetime.now(timezone.utc) - self._enqueue_orchestration(instance.instance_id) + + # Don't dispatch work for suspended or terminal orchestrations; + # suspended events will be delivered when the orchestration is + # resumed, and terminal orchestrations can't process new events. + not_terminal = not self._is_terminal_status(instance.status) + not_suspended = instance.status != pb.ORCHESTRATION_STATUS_SUSPENDED + if not_terminal and not_suspended: + self._enqueue_orchestration(instance.instance_id) self._logger.info(f"Raised event '{request.name}' for instance '{request.instanceId}'") return pb.RaiseEventResponse() diff --git a/tests/durabletask/test_orchestration_async_e2e.py b/tests/durabletask/test_orchestration_async_e2e.py index e1c70a3b..3a2b24bc 100644 --- a/tests/durabletask/test_orchestration_async_e2e.py +++ b/tests/durabletask/test_orchestration_async_e2e.py @@ -128,13 +128,10 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert state is not None assert state.runtime_status == client.OrchestrationStatus.SUSPENDED - # Small delay to ensure the suspension is fully enforced - await asyncio.sleep(1) - # Raise an event and confirm that it does NOT complete while suspended await c.raise_orchestration_event(id, "my_event", data=42) try: - state = await c.wait_for_orchestration_completion(id, timeout=5) + state = await c.wait_for_orchestration_completion(id, timeout=3) assert False, "Orchestration should not have completed" except TimeoutError: pass diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index ad04392d..a6f670ce 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -269,13 +269,10 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert state is not None assert state.runtime_status == client.OrchestrationStatus.SUSPENDED - # Small delay to ensure the suspension is fully enforced - time.sleep(1) - # Raise an event to the orchestration and confirm that it does NOT complete task_hub_client.raise_orchestration_event(id, "my_event", data=42) try: - state = task_hub_client.wait_for_orchestration_completion(id, timeout=5) + state = task_hub_client.wait_for_orchestration_completion(id, timeout=3) assert False, "Orchestration should not have completed" except TimeoutError: pass From 760efbca68b24add320598aea1a30ee56ff5c8b5 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Tue, 24 Mar 2026 11:57:17 -0600 Subject: [PATCH 3/3] Also abort on WaitForInstanceCompletion --- durabletask/testing/in_memory_backend.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/durabletask/testing/in_memory_backend.py b/durabletask/testing/in_memory_backend.py index e98e8e15..21417184 100644 --- a/durabletask/testing/in_memory_backend.py +++ b/durabletask/testing/in_memory_backend.py @@ -277,6 +277,10 @@ def predicate(inst: OrchestrationInstance) -> bool: instance = self._wait_for_state(request.instanceId, predicate, timeout=context.time_remaining()) if not instance: + with self._lock: + if request.instanceId in self._instances: + context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, + f"Timed out waiting for instance '{request.instanceId}' to start") return pb.GetInstanceResponse(exists=False) return self._build_instance_response(instance, request.getInputsAndOutputs) @@ -290,6 +294,10 @@ def WaitForInstanceCompletion(self, request: pb.GetInstanceRequest, context): ) if not instance: + with self._lock: + if request.instanceId in self._instances: + context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, + f"Timed out waiting for instance '{request.instanceId}' to complete") return pb.GetInstanceResponse(exists=False) return self._build_instance_response(instance, request.getInputsAndOutputs)