-
Notifications
You must be signed in to change notification settings - Fork 25
Improve TimerTask #122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Improve TimerTask #122
Changes from all commits
36910b2
57d8b7d
9777a36
87dc5f7
ceb9c0d
ea7f3f6
a55ed39
5c2728c
1f940f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| [flake8] | ||
| ignore = E501,C901 | ||
| ignore = E501,C901,W503 | ||
| exclude = | ||
| .git | ||
| *_pb2* | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -98,7 +98,7 @@ def set_custom_status(self, custom_status: Any) -> None: | |
| pass | ||
|
|
||
| @abstractmethod | ||
| def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task: | ||
| def create_timer(self, fire_at: Union[datetime, timedelta]) -> CancellableTask: | ||
| """Create a Timer Task to fire after at the specified deadline. | ||
|
|
||
| Parameters | ||
|
|
@@ -228,10 +228,10 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput | |
| """ | ||
| pass | ||
|
|
||
| # TOOD: Add a timeout parameter, which allows the task to be canceled if the event is | ||
| # TOOD: Add a timeout parameter, which allows the task to be cancelled if the event is | ||
| # not received within the specified timeout. This requires support for task cancellation. | ||
| @abstractmethod | ||
| def wait_for_external_event(self, name: str) -> CompletableTask: | ||
| def wait_for_external_event(self, name: str) -> CancellableTask: | ||
| """Wait asynchronously for an event to be raised with the name `name`. | ||
|
|
||
| Parameters | ||
|
|
@@ -324,6 +324,10 @@ class OrchestrationStateError(Exception): | |
| pass | ||
|
|
||
|
|
||
| class TaskCancelledError(Exception): | ||
| """Exception type for cancelled orchestration tasks.""" | ||
|
|
||
|
|
||
| class Task(ABC, Generic[T]): | ||
| """Abstract base class for asynchronous tasks in a durable orchestration.""" | ||
| _result: T | ||
|
|
@@ -435,6 +439,48 @@ def fail(self, message: str, details: Union[Exception, pb.TaskFailureDetails]): | |
| self._parent.on_child_completed(self) | ||
|
|
||
|
|
||
| class CancellableTask(CompletableTask[T]): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd. |
||
| """A completable task that can be cancelled before it finishes.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| super().__init__() | ||
| self._is_cancelled = False | ||
| self._cancel_handler: Optional[Callable[[], None]] = None | ||
|
|
||
| @property | ||
| def is_cancelled(self) -> bool: | ||
| """Returns True if the task was cancelled, False otherwise.""" | ||
| return self._is_cancelled | ||
|
|
||
| def get_result(self) -> T: | ||
| if self._is_cancelled: | ||
| raise TaskCancelledError('The task was cancelled.') | ||
| return super().get_result() | ||
andystaples marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def set_cancel_handler(self, cancel_handler: Callable[[], None]) -> None: | ||
| self._cancel_handler = cancel_handler | ||
|
|
||
| def cancel(self) -> bool: | ||
| """Attempts to cancel this task. | ||
|
|
||
| Returns | ||
| ------- | ||
| bool | ||
| True if cancellation was applied, False if the task had already completed. | ||
| """ | ||
| if self._is_complete: | ||
| return False | ||
|
|
||
| if self._cancel_handler is not None: | ||
| self._cancel_handler() | ||
|
|
||
| self._is_cancelled = True | ||
| self._is_complete = True | ||
| if self._parent is not None: | ||
| self._parent.on_child_completed(self) | ||
| return True | ||
|
|
||
|
|
||
| class RetryableTask(CompletableTask[T]): | ||
| """A task that can be retried according to a retry policy.""" | ||
|
|
||
|
|
@@ -474,13 +520,32 @@ def compute_next_delay(self) -> Optional[timedelta]: | |
| return None | ||
|
|
||
|
|
||
| class TimerTask(CompletableTask[T]): | ||
| class TimerTask(CancellableTask[None]): | ||
| def set_retryable_parent(self, retryable_task: RetryableTask): | ||
| self._retryable_parent = retryable_task | ||
|
|
||
| def complete(self, _: datetime) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd. |
||
| super().complete(None) | ||
|
|
||
| def __init__(self) -> None: | ||
|
|
||
| class LongTimerTask(TimerTask): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than a dedicated LongTimer class - try to make the existing Timer support long timers also. Similar how to dotnet has a single CreateTimer. https://github.com/microsoft/durabletask-dotnet/blob/45ab40eeff0090f5124a98d0ca15148b144cf79c/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs#L266 |
||
| def __init__(self, final_fire_at: datetime, maximum_timer_interval: timedelta): | ||
| super().__init__() | ||
| self._final_fire_at = final_fire_at | ||
| self._maximum_timer_interval = maximum_timer_interval | ||
|
|
||
| def set_retryable_parent(self, retryable_task: RetryableTask): | ||
| self._retryable_parent = retryable_task | ||
| def start(self, current_utc_datetime: datetime) -> datetime: | ||
| return self._get_next_fire_at(current_utc_datetime) | ||
|
|
||
| def complete(self, current_utc_datetime: datetime) -> Optional[datetime]: | ||
| if current_utc_datetime < self._final_fire_at: | ||
| return self._get_next_fire_at(current_utc_datetime) | ||
| return super().complete(current_utc_datetime) | ||
|
|
||
| def _get_next_fire_at(self, current_utc_datetime: datetime) -> datetime: | ||
| if current_utc_datetime + self._maximum_timer_interval < self._final_fire_at: | ||
| return current_utc_datetime + self._maximum_timer_interval | ||
| return self._final_fire_at | ||
|
|
||
|
|
||
| class WhenAnyTask(CompositeTask[Task]): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The .NET SDK handles long timer chunking entirely within Please merge class TimerTask(CancellableTask[None]):
def __init__(self, final_fire_at: Optional[datetime] = None,
maximum_timer_interval: Optional[timedelta] = None):
super().__init__()
self._final_fire_at = final_fire_at
self._maximum_timer_interval = maximum_timer_interval
def set_retryable_parent(self, retryable_task: RetryableTask):
self._retryable_parent = retryable_task
def complete(self, current_utc_datetime: datetime) -> Optional[datetime]:
if (self._final_fire_at is not None
and self._maximum_timer_interval is not None
and current_utc_datetime < self._final_fire_at):
return self._get_next_fire_at(current_utc_datetime)
super().complete(None)
return None
def _get_next_fire_at(self, current_utc_datetime: datetime) -> datetime:
if current_utc_datetime + self._maximum_timer_interval < self._final_fire_at:
return current_utc_datetime + self._maximum_timer_interval
return self._final_fire_atDisclaimer: This review was generated by GitHub Copilot on behalf of Bernd. |
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the .NET SDK,
MaximumTimerIntervaldefaults to 3 days inDurableTaskWorkerOptionsand is not overridden for DTS — DTS workers still use the default 3-day maximum. Here you're passingNoneto disable chunking entirely for DTS. Can you confirm this is the intended behavior? If DTS truly supports indefinite timers, theNoneis fine, but it'd be good to document the rationale (e.g., "DTS natively supports long timers so chunking is unnecessary").Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.