diff --git a/durabletask/testing/in_memory_backend.py b/durabletask/testing/in_memory_backend.py index 590688ad..21417184 100644 --- a/durabletask/testing/in_memory_backend.py +++ b/durabletask/testing/in_memory_backend.py @@ -277,6 +277,10 @@ def predicate(inst: OrchestrationInstance) -> bool: instance = self._wait_for_state(request.instanceId, predicate, timeout=context.time_remaining()) if not instance: + with self._lock: + if request.instanceId in self._instances: + context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, + f"Timed out waiting for instance '{request.instanceId}' to start") return pb.GetInstanceResponse(exists=False) return self._build_instance_response(instance, request.getInputsAndOutputs) @@ -290,6 +294,10 @@ def WaitForInstanceCompletion(self, request: pb.GetInstanceRequest, context): ) if not instance: + with self._lock: + if request.instanceId in self._instances: + context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, + f"Timed out waiting for instance '{request.instanceId}' to complete") return pb.GetInstanceResponse(exists=False) return self._build_instance_response(instance, request.getInputsAndOutputs) @@ -309,7 +317,14 @@ def RaiseEvent(self, request: pb.RaiseEventRequest, context): ) instance.pending_events.append(event) instance.last_updated_at = datetime.now(timezone.utc) - self._enqueue_orchestration(instance.instance_id) + + # Don't dispatch work for suspended or terminal orchestrations; + # suspended events will be delivered when the orchestration is + # resumed, and terminal orchestrations can't process new events. + not_terminal = not self._is_terminal_status(instance.status) + not_suspended = instance.status != pb.ORCHESTRATION_STATUS_SUSPENDED + if not_terminal and not_suspended: + self._enqueue_orchestration(instance.instance_id) self._logger.info(f"Raised event '{request.name}' for instance '{request.instanceId}'") return pb.RaiseEventResponse()