Coverage for scheduler/base/scheduler.py: 100%
47 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:46 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:46 +0000
1"""
2Implementation of a `BaseScheduler`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import warnings
8from abc import ABC, abstractmethod
9from logging import Logger, getLogger
10from functools import wraps
11from typing import Any, Callable, Optional, List
13from scheduler.base.job import BaseJob, BaseJobType
14from scheduler.base.timingtype import (
15 TimingCyclic,
16 TimingDailyUnion,
17 TimingOnceUnion,
18 TimingWeeklyUnion,
19)
21LOGGER = getLogger("scheduler")
24def select_jobs_by_tag(
25 jobs: set[BaseJobType],
26 tags: set[str],
27 any_tag: bool,
28) -> set[BaseJobType]:
29 r"""
30 Select |BaseJob|\ s by matching `tags`.
32 Parameters
33 ----------
34 jobs : set[BaseJob]
35 Unfiltered set of |BaseJob|\ s.
36 tags : set[str]
37 Tags to filter |BaseJob|\ s.
38 any_tag : bool
39 False: To match a |BaseJob| all tags have to match.
40 True: To match a |BaseJob| at least one tag has to match.
42 Returns
43 -------
44 set[BaseJob]
45 Selected |BaseJob|\ s.
46 """
47 if any_tag:
48 return {job for job in jobs if tags & job.tags}
49 return {job for job in jobs if tags <= job.tags}
52def deprecated(fields: List[str]):
53 def wrapper(func):
54 @wraps(func)
55 def real_wrapper(*args, **kwargs):
56 for f in fields:
57 if f in kwargs and kwargs[f] is not None:
58 # keep it in kwargs
59 warnings.warn(
60 (
61 f"Using the `{f}` argument is deprecated and will "
62 "be removed in the next minor release."
63 ),
64 DeprecationWarning,
65 stacklevel=3,
66 )
67 return func(*args, **kwargs)
68 return real_wrapper
70 return wrapper
73class BaseScheduler(ABC): # NOTE maybe a typing Protocol class is better than an ABC class
74 """
75 Interface definition of an abstract scheduler.
77 Author: Jendrik A. Potyka, Fabian A. Preiss
78 """
80 __logger: Logger
82 def __init__(self, logger: Optional[Logger] = None) -> None:
83 self.__logger = logger if logger else LOGGER
85 @abstractmethod
86 def delete_job(self, job: BaseJob) -> None:
87 """Delete a |BaseJob| from the `BaseScheduler`."""
89 @abstractmethod
90 def delete_jobs(
91 self,
92 tags: Optional[set[str]] = None,
93 any_tag: bool = False,
94 ) -> int:
95 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
97 @abstractmethod
98 def get_jobs(
99 self,
100 tags: Optional[set[str]] = None,
101 any_tag: bool = False,
102 ) -> set[BaseJob]:
103 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
105 @abstractmethod
106 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> BaseJob:
107 """Schedule a cyclic |BaseJob|."""
109 @abstractmethod
110 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
111 """Schedule a minutely |BaseJob|."""
113 @abstractmethod
114 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
115 """Schedule an hourly |BaseJob|."""
117 @abstractmethod
118 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
119 """Schedule a daily |BaseJob|."""
121 @abstractmethod
122 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
123 """Schedule a weekly |BaseJob|."""
125 @abstractmethod
126 def once(
127 self,
128 timing: TimingOnceUnion,
129 handle: Callable[..., None],
130 *,
131 args: tuple[Any] = None,
132 kwargs: Optional[dict[str, Any]] = None,
133 tags: Optional[list[str]] = None,
134 ) -> BaseJob:
135 """Schedule a oneshot |BaseJob|."""
137 @property
138 @abstractmethod
139 def jobs(self) -> set[BaseJob]:
140 r"""Get the set of all |BaseJob|\ s."""