Coverage for scheduler/base/scheduler.py: 100%
49 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
1"""Implementation of a `BaseScheduler`.
3Author: Jendrik A. Potyka, Fabian A. Preiss
4"""
6import warnings
7from abc import ABC, abstractmethod
8from collections.abc import Iterable
9from functools import wraps
10from logging import Logger, getLogger
11from typing import Any, Callable, Generic, List, Optional, TypeVar
13from scheduler.base.job import BaseJobType
14from scheduler.base.timingtype import (
15 TimingCyclic,
16 TimingDailyUnion,
17 TimingOnceUnion,
18 TimingWeeklyUnion,
19)
21# TODO:
22# import sys
23# if sys.version_info < (3, 10):
24# from typing_extensions import ParamSpec
25# else:
26# from typing import ParamSpec
29LOGGER = getLogger("scheduler")
32def select_jobs_by_tag(
33 jobs: set[BaseJobType],
34 tags: set[str],
35 any_tag: bool,
36) -> set[BaseJobType]:
37 r"""
38 Select |BaseJob|\ s by matching `tags`.
40 Parameters
41 ----------
42 jobs : set[BaseJob]
43 Unfiltered set of |BaseJob|\ s.
44 tags : set[str]
45 Tags to filter |BaseJob|\ s.
46 any_tag : bool
47 False: To match a |BaseJob| all tags have to match.
48 True: To match a |BaseJob| at least one tag has to match.
50 Returns
51 -------
52 set[BaseJob]
53 Selected |BaseJob|\ s.
54 """
55 if any_tag:
56 return {job for job in jobs if tags & job.tags}
57 return {job for job in jobs if tags <= job.tags}
60def deprecated(fields: List[str]) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
61 """
62 Decorator for marking specified function arguments as deprecated.
64 Parameters
65 ----------
66 fields : List[str]
67 A list of strings representing the names of the function arguments that are deprecated.
69 Examples
70 --------
71 .. code-block:: python
73 @deprecated(["old_arg"])
74 def some_function(new_arg, old_arg=None):
75 pass
77 Calling `some_function(new_arg=5, old_arg=3)` generates a deprecation warning for using 'old_arg'.
78 """
80 def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
81 @wraps(func)
82 def real_wrapper(*args: tuple[Any, ...], **kwargs: dict[str, Any]) -> Any:
83 for f in fields:
84 if f in kwargs and kwargs[f] is not None:
85 # keep it in kwargs
86 warnings.warn(
87 (
88 f"Using the `{f}` argument is deprecated and will "
89 "be removed in the next minor release."
90 ),
91 DeprecationWarning,
92 stacklevel=3,
93 )
94 return func(*args, **kwargs)
96 return real_wrapper
98 return wrapper
101T = TypeVar("T", bound=Callable[[], Any])
104class BaseScheduler(
105 ABC, Generic[BaseJobType, T]
106): # NOTE maybe a typing Protocol class is better than an ABC class
107 """
108 Interface definition of an abstract scheduler.
110 Author: Jendrik A. Potyka, Fabian A. Preiss
111 """
113 _logger: Logger
115 def __init__(self, logger: Optional[Logger] = None) -> None:
116 self._logger = logger if logger else LOGGER
118 @abstractmethod
119 def delete_job(self, job: BaseJobType) -> None:
120 """Delete a |BaseJob| from the `BaseScheduler`."""
122 @abstractmethod
123 def delete_jobs(
124 self,
125 tags: Optional[set[str]] = None,
126 any_tag: bool = False,
127 ) -> int:
128 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
130 @abstractmethod
131 def get_jobs(
132 self,
133 tags: Optional[set[str]] = None,
134 any_tag: bool = False,
135 ) -> set[BaseJobType]:
136 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
138 @abstractmethod
139 def cyclic(self, timing: TimingCyclic, handle: T, **kwargs) -> BaseJobType:
140 """Schedule a cyclic |BaseJob|."""
142 @abstractmethod
143 def minutely(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
144 """Schedule a minutely |BaseJob|."""
146 @abstractmethod
147 def hourly(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
148 """Schedule an hourly |BaseJob|."""
150 @abstractmethod
151 def daily(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
152 """Schedule a daily |BaseJob|."""
154 @abstractmethod
155 def weekly(self, timing: TimingWeeklyUnion, handle: T, **kwargs) -> BaseJobType:
156 """Schedule a weekly |BaseJob|."""
158 @abstractmethod
159 def once(
160 self,
161 timing: TimingOnceUnion,
162 handle: T,
163 *,
164 args: Optional[tuple[Any]] = None,
165 kwargs: Optional[dict[str, Any]] = None,
166 tags: Optional[Iterable[str]] = None,
167 alias: Optional[str] = None,
168 ) -> BaseJobType:
169 """Schedule a oneshot |BaseJob|."""
171 @property
172 @abstractmethod
173 def jobs(self) -> set[BaseJobType]:
174 r"""Get the set of all |BaseJob|\ s."""