Coverage for scheduler/base/scheduler.py: 100%
49 statements
« prev ^ index » next coverage.py v7.0.4, created at 2024-06-09 19:18 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2024-06-09 19:18 +0000
1"""
2Implementation of a `BaseScheduler`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import warnings
8from abc import ABC, abstractmethod
9from collections.abc import Iterable
10from functools import wraps
11from logging import Logger, getLogger
12from typing import Any, Callable, Generic, List, Optional, TypeVar
14from scheduler.base.job import BaseJobType
15from scheduler.base.timingtype import (
16 TimingCyclic,
17 TimingDailyUnion,
18 TimingOnceUnion,
19 TimingWeeklyUnion,
20)
22# TODO:
23# import sys
24# if sys.version_info < (3, 10):
25# from typing_extensions import ParamSpec
26# else:
27# from typing import ParamSpec
30LOGGER = getLogger("scheduler")
33def select_jobs_by_tag(
34 jobs: set[BaseJobType],
35 tags: set[str],
36 any_tag: bool,
37) -> set[BaseJobType]:
38 r"""
39 Select |BaseJob|\ s by matching `tags`.
41 Parameters
42 ----------
43 jobs : set[BaseJob]
44 Unfiltered set of |BaseJob|\ s.
45 tags : set[str]
46 Tags to filter |BaseJob|\ s.
47 any_tag : bool
48 False: To match a |BaseJob| all tags have to match.
49 True: To match a |BaseJob| at least one tag has to match.
51 Returns
52 -------
53 set[BaseJob]
54 Selected |BaseJob|\ s.
55 """
56 if any_tag:
57 return {job for job in jobs if tags & job.tags}
58 return {job for job in jobs if tags <= job.tags}
61def deprecated(fields: List[str]) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
62 """
63 Decorator for marking specified function arguments as deprecated.
65 Parameters
66 ----------
67 fields : List[str]
68 A list of strings representing the names of the function arguments that are deprecated.
70 Examples
71 --------
72 @deprecated(['old_arg'])
73 def some_function(new_arg, old_arg=None):
74 pass
76 Calling `some_function(new_arg=5, old_arg=3)` generates a deprecation warning for using 'old_arg'.
77 """
79 def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
80 @wraps(func)
81 def real_wrapper(*args: tuple[Any, ...], **kwargs: dict[str, Any]) -> Any:
82 for f in fields:
83 if f in kwargs and kwargs[f] is not None:
84 # keep it in kwargs
85 warnings.warn(
86 (
87 f"Using the `{f}` argument is deprecated and will "
88 "be removed in the next minor release."
89 ),
90 DeprecationWarning,
91 stacklevel=3,
92 )
93 return func(*args, **kwargs)
95 return real_wrapper
97 return wrapper
100T = TypeVar("T", bound=Callable[[], Any])
103class BaseScheduler(
104 ABC, Generic[BaseJobType, T]
105): # NOTE maybe a typing Protocol class is better than an ABC class
106 """
107 Interface definition of an abstract scheduler.
109 Author: Jendrik A. Potyka, Fabian A. Preiss
110 """
112 _logger: Logger
114 def __init__(self, logger: Optional[Logger] = None) -> None:
115 self._logger = logger if logger else LOGGER
117 @abstractmethod
118 def delete_job(self, job: BaseJobType) -> None:
119 """Delete a |BaseJob| from the `BaseScheduler`."""
121 @abstractmethod
122 def delete_jobs(
123 self,
124 tags: Optional[set[str]] = None,
125 any_tag: bool = False,
126 ) -> int:
127 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
129 @abstractmethod
130 def get_jobs(
131 self,
132 tags: Optional[set[str]] = None,
133 any_tag: bool = False,
134 ) -> set[BaseJobType]:
135 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
137 @abstractmethod
138 def cyclic(self, timing: TimingCyclic, handle: T, **kwargs) -> BaseJobType:
139 """Schedule a cyclic |BaseJob|."""
141 @abstractmethod
142 def minutely(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
143 """Schedule a minutely |BaseJob|."""
145 @abstractmethod
146 def hourly(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
147 """Schedule an hourly |BaseJob|."""
149 @abstractmethod
150 def daily(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
151 """Schedule a daily |BaseJob|."""
153 @abstractmethod
154 def weekly(self, timing: TimingWeeklyUnion, handle: T, **kwargs) -> BaseJobType:
155 """Schedule a weekly |BaseJob|."""
157 @abstractmethod
158 def once(
159 self,
160 timing: TimingOnceUnion,
161 handle: T,
162 *,
163 args: Optional[tuple[Any]] = None,
164 kwargs: Optional[dict[str, Any]] = None,
165 tags: Optional[Iterable[str]] = None,
166 alias: Optional[str] = None,
167 ) -> BaseJobType:
168 """Schedule a oneshot |BaseJob|."""
170 @property
171 @abstractmethod
172 def jobs(self) -> set[BaseJobType]:
173 r"""Get the set of all |BaseJob|\ s."""