Coverage for scheduler/base/scheduler.py: 100%
47 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 21:31 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 21:31 +0000
1"""
2Implementation of a `BaseScheduler`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import warnings
8from abc import ABC, abstractmethod
9from functools import wraps
10from logging import Logger, getLogger
11from typing import Any, Callable, List, Optional
13from scheduler.base.job import BaseJob, BaseJobType
14from scheduler.base.timingtype import (
15 TimingCyclic,
16 TimingDailyUnion,
17 TimingOnceUnion,
18 TimingWeeklyUnion,
19)
21LOGGER = getLogger("scheduler")
24def select_jobs_by_tag(
25 jobs: set[BaseJobType],
26 tags: set[str],
27 any_tag: bool,
28) -> set[BaseJobType]:
29 r"""
30 Select |BaseJob|\ s by matching `tags`.
32 Parameters
33 ----------
34 jobs : set[BaseJob]
35 Unfiltered set of |BaseJob|\ s.
36 tags : set[str]
37 Tags to filter |BaseJob|\ s.
38 any_tag : bool
39 False: To match a |BaseJob| all tags have to match.
40 True: To match a |BaseJob| at least one tag has to match.
42 Returns
43 -------
44 set[BaseJob]
45 Selected |BaseJob|\ s.
46 """
47 if any_tag:
48 return {job for job in jobs if tags & job.tags}
49 return {job for job in jobs if tags <= job.tags}
52def deprecated(fields: List[str]) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
53 """
54 Decorator for marking specified function arguments as deprecated.
56 Parameters
57 ----------
58 fields : List[str]
59 A list of strings representing the names of the function arguments that are deprecated.
61 Examples
62 --------
63 @deprecated(['old_arg'])
64 def some_function(new_arg, old_arg=None):
65 pass
67 Calling `some_function(new_arg=5, old_arg=3)` generates a deprecation warning for using 'old_arg'.
68 """
70 def wrapper(func):
71 @wraps(func)
72 def real_wrapper(*args, **kwargs):
73 for f in fields:
74 if f in kwargs and kwargs[f] is not None:
75 # keep it in kwargs
76 warnings.warn(
77 (
78 f"Using the `{f}` argument is deprecated and will "
79 "be removed in the next minor release."
80 ),
81 DeprecationWarning,
82 stacklevel=3,
83 )
84 return func(*args, **kwargs)
86 return real_wrapper
88 return wrapper
91class BaseScheduler(ABC): # NOTE maybe a typing Protocol class is better than an ABC class
92 """
93 Interface definition of an abstract scheduler.
95 Author: Jendrik A. Potyka, Fabian A. Preiss
96 """
98 __logger: Logger
100 def __init__(self, logger: Optional[Logger] = None) -> None:
101 self.__logger = logger if logger else LOGGER
103 @abstractmethod
104 def delete_job(self, job: BaseJob) -> None:
105 """Delete a |BaseJob| from the `BaseScheduler`."""
107 @abstractmethod
108 def delete_jobs(
109 self,
110 tags: Optional[set[str]] = None,
111 any_tag: bool = False,
112 ) -> int:
113 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
115 @abstractmethod
116 def get_jobs(
117 self,
118 tags: Optional[set[str]] = None,
119 any_tag: bool = False,
120 ) -> set[BaseJob]:
121 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
123 @abstractmethod
124 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> BaseJob:
125 """Schedule a cyclic |BaseJob|."""
127 @abstractmethod
128 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
129 """Schedule a minutely |BaseJob|."""
131 @abstractmethod
132 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
133 """Schedule an hourly |BaseJob|."""
135 @abstractmethod
136 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
137 """Schedule a daily |BaseJob|."""
139 @abstractmethod
140 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
141 """Schedule a weekly |BaseJob|."""
143 @abstractmethod
144 def once(
145 self,
146 timing: TimingOnceUnion,
147 handle: Callable[..., None],
148 *,
149 args: Optional[tuple[Any]] = None,
150 kwargs: Optional[dict[str, Any]] = None,
151 tags: Optional[list[str]] = None,
152 ) -> BaseJob:
153 """Schedule a oneshot |BaseJob|."""
155 @property
156 @abstractmethod
157 def jobs(self) -> set[BaseJob]:
158 r"""Get the set of all |BaseJob|\ s."""