Coverage for scheduler/base/scheduler.py: 100%
39 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-01-09 20:37 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-01-09 20:37 +0000
1"""
2Implementation of a `BaseScheduler`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import warnings
8from abc import ABC, abstractmethod
9from logging import Logger, getLogger
10from typing import Any, Callable, Optional
12from scheduler.base.job import BaseJob, BaseJobType
13from scheduler.base.timingtype import (
14 TimingCyclic,
15 TimingDailyUnion,
16 TimingOnceUnion,
17 TimingWeeklyUnion,
18)
20LOGGER = getLogger("scheduler")
23def select_jobs_by_tag(
24 jobs: set[BaseJobType],
25 tags: set[str],
26 any_tag: bool,
27) -> set[BaseJobType]:
28 r"""
29 Select |BaseJob|\ s by matching `tags`.
31 Parameters
32 ----------
33 jobs : set[BaseJob]
34 Unfiltered set of |BaseJob|\ s.
35 tags : set[str]
36 Tags to filter |BaseJob|\ s.
37 any_tag : bool
38 False: To match a |BaseJob| all tags have to match.
39 True: To match a |BaseJob| at least one tag has to match.
41 Returns
42 -------
43 set[BaseJob]
44 Selected |BaseJob|\ s.
45 """
46 if any_tag:
47 return {job for job in jobs if tags & job.tags}
48 return {job for job in jobs if tags <= job.tags}
51def _warn_deprecated_delay(delay: Optional[bool] = None, **kwargs):
52 if delay is not None:
53 warnings.warn(
54 (
55 "Using the `delay` argument is deprecated and will "
56 "be removed in the next minor release."
57 ),
58 DeprecationWarning,
59 stacklevel=3,
60 )
63class BaseScheduler(ABC): # NOTE maybe a typing Protocol class is better than an ABC class
64 """
65 Interface definition of an abstract scheduler.
67 Author: Jendrik A. Potyka, Fabian A. Preiss
68 """
70 __logger: Logger
72 def __init__(self, logger: Optional[Logger] = None) -> None:
73 self.__logger = logger if logger else LOGGER
75 @abstractmethod
76 def delete_job(self, job: BaseJob) -> None:
77 """Delete a |BaseJob| from the `BaseScheduler`."""
79 @abstractmethod
80 def delete_jobs(
81 self,
82 tags: Optional[set[str]] = None,
83 any_tag: bool = False,
84 ) -> int:
85 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
87 @abstractmethod
88 def get_jobs(
89 self,
90 tags: Optional[set[str]] = None,
91 any_tag: bool = False,
92 ) -> set[BaseJob]:
93 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
95 @abstractmethod
96 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> BaseJob:
97 """Schedule a cyclic |BaseJob|."""
99 @abstractmethod
100 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
101 """Schedule a minutely |BaseJob|."""
103 @abstractmethod
104 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
105 """Schedule an hourly |BaseJob|."""
107 @abstractmethod
108 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
109 """Schedule a daily |BaseJob|."""
111 @abstractmethod
112 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
113 """Schedule a weekly |BaseJob|."""
115 @abstractmethod
116 def once(
117 self,
118 timing: TimingOnceUnion,
119 handle: Callable[..., None],
120 *,
121 args: tuple[Any] = None,
122 kwargs: Optional[dict[str, Any]] = None,
123 tags: Optional[list[str]] = None,
124 ) -> BaseJob:
125 """Schedule a oneshot |BaseJob|."""
127 @property
128 @abstractmethod
129 def jobs(self) -> set[BaseJob]:
130 r"""Get the set of all |BaseJob|\ s."""