Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion durabletask/testing/in_memory_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ def predicate(inst: OrchestrationInstance) -> bool:
instance = self._wait_for_state(request.instanceId, predicate, timeout=context.time_remaining())

if not instance:
with self._lock:
if request.instanceId in self._instances:
context.abort(grpc.StatusCode.DEADLINE_EXCEEDED,
f"Timed out waiting for instance '{request.instanceId}' to start")
return pb.GetInstanceResponse(exists=False)

return self._build_instance_response(instance, request.getInputsAndOutputs)
Expand All @@ -290,6 +294,10 @@ def WaitForInstanceCompletion(self, request: pb.GetInstanceRequest, context):
)

if not instance:
with self._lock:
if request.instanceId in self._instances:
context.abort(grpc.StatusCode.DEADLINE_EXCEEDED,
f"Timed out waiting for instance '{request.instanceId}' to complete")
return pb.GetInstanceResponse(exists=False)

return self._build_instance_response(instance, request.getInputsAndOutputs)
Expand All @@ -309,7 +317,14 @@ def RaiseEvent(self, request: pb.RaiseEventRequest, context):
)
instance.pending_events.append(event)
instance.last_updated_at = datetime.now(timezone.utc)
self._enqueue_orchestration(instance.instance_id)

# Don't dispatch work for suspended or terminal orchestrations;
# suspended events will be delivered when the orchestration is
# resumed, and terminal orchestrations can't process new events.
not_terminal = not self._is_terminal_status(instance.status)
not_suspended = instance.status != pb.ORCHESTRATION_STATUS_SUSPENDED
if not_terminal and not_suspended:
self._enqueue_orchestration(instance.instance_id)

self._logger.info(f"Raised event '{request.name}' for instance '{request.instanceId}'")
return pb.RaiseEventResponse()
Expand Down
Loading