Coverage for scheduler/base/job.py: 100%
146 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 21:31 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 21:31 +0000
1"""
2Implementation of a `BaseJob`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9import datetime as dt
10import warnings
11from abc import ABC, abstractmethod
12from logging import Logger
13from typing import Any, Callable, Optional, TypeVar, cast
15from scheduler.base.definition import JobType
16from scheduler.base.job_timer import JobTimer
17from scheduler.base.job_util import (
18 check_duplicate_effective_timings,
19 check_timing_tzinfo,
20 get_pending_timer,
21 prettify_timedelta,
22 sane_timing_types,
23 set_start_check_stop_tzinfo,
24 standardize_timing_format,
25)
26from scheduler.base.timingtype import TimingJobUnion
29class BaseJob(ABC):
30 """Abstract definition basic interface for a job class."""
32 __type: JobType
33 __timing: TimingJobUnion
34 __handle: Callable[..., None]
35 __args: tuple[Any, ...]
36 __kwargs: dict[str, Any]
37 __max_attempts: int
38 __tags: set[str]
39 __delay: bool
40 __start: Optional[dt.datetime]
41 __stop: Optional[dt.datetime]
42 __skip_missing: bool
43 __alias: Optional[str]
44 __tzinfo: Optional[dt.tzinfo]
45 __logger: Logger
47 __mark_delete: bool
48 __attempts: int
49 __failed_attempts: int
50 __pending_timer: JobTimer
51 __timers: list[JobTimer]
53 def __init__(
54 self,
55 job_type: JobType,
56 timing: TimingJobUnion,
57 handle: Callable[..., None],
58 *,
59 args: Optional[tuple[Any]] = None,
60 kwargs: Optional[dict[str, Any]] = None,
61 max_attempts: int = 0,
62 tags: Optional[set[str]] = None,
63 delay: bool = True,
64 start: Optional[dt.datetime] = None,
65 stop: Optional[dt.datetime] = None,
66 skip_missing: bool = False,
67 alias: str = None,
68 tzinfo: Optional[dt.tzinfo] = None,
69 ):
70 timing = standardize_timing_format(job_type, timing)
72 sane_timing_types(job_type, timing)
73 check_timing_tzinfo(job_type, timing, tzinfo)
74 check_duplicate_effective_timings(job_type, timing, tzinfo)
76 self.__start = set_start_check_stop_tzinfo(start, stop, tzinfo)
78 self.__type = job_type
79 self.__timing = timing # pylint: disable=unused-private-member
80 # NOTE: https://github.com/python/mypy/issues/708
81 # https://github.com/python/mypy/issues/2427
82 self.__handle = handle # type: ignore
83 self.__args = () if args is None else args
84 self.__kwargs = {} if kwargs is None else kwargs.copy()
85 self.__max_attempts = max_attempts
86 self.__tags = set() if tags is None else tags.copy()
87 self.__delay = delay
88 self.__stop = stop
89 self.__skip_missing = skip_missing
90 self.__alias = alias
91 self.__tzinfo = tzinfo
93 # self.__mark_delete will be set to True if the new Timer would be in future
94 # relativ to the self.__stop variable
95 self.__mark_delete = False
96 self.__attempts = 0
97 self.__failed_attempts = 0
99 # create JobTimers
100 self.__timers = [JobTimer(job_type, tim, self.__start, skip_missing) for tim in timing]
101 self.__pending_timer = get_pending_timer(self.__timers)
103 if self.__stop is not None:
104 if self.__pending_timer.datetime > self.__stop:
105 self.__mark_delete = True
107 def __lt__(self, other: BaseJob):
108 return self.datetime < other.datetime
110 def _calc_next_exec(self, ref_dt: dt.datetime) -> None:
111 """
112 Calculate the next estimated execution `datetime.datetime` of the `Job`.
114 Parameters
115 ----------
116 ref_dt : datetime.datetime
117 Reference time stamp to which the |BaseJob| calculates
118 it's next execution.
119 """
120 if self.__skip_missing:
121 for timer in self.__timers:
122 if (timer.datetime - ref_dt).total_seconds() <= 0:
123 timer.calc_next_exec(ref_dt)
124 else:
125 self.__pending_timer.calc_next_exec(ref_dt)
126 self.__pending_timer = get_pending_timer(self.__timers)
127 if self.__stop is not None and self.__pending_timer.datetime > self.__stop:
128 self.__mark_delete = True
130 def _repr(self) -> tuple[str, ...]:
131 return tuple(
132 repr(elem)
133 for elem in (
134 self.__type,
135 self.__timing,
136 self.__handle,
137 self.__args,
138 self.__kwargs,
139 self.__max_attempts,
140 self.__delay,
141 self.__start,
142 self.__stop,
143 self.__skip_missing,
144 self.__alias,
145 self.tzinfo,
146 )
147 )
149 @abstractmethod
150 def __repr__(self) -> str:
151 raise NotImplementedError() # pragma: no cover
153 def _str(
154 self,
155 ) -> tuple[str, ...]:
156 """Return the objects relevant for readable string representation."""
157 dt_timedelta = self.timedelta(dt.datetime.now(self.tzinfo))
158 if self.alias is not None:
159 f_args = ""
160 elif hasattr(self.handle, "__code__"):
161 f_args = "(..)" if self.handle.__code__.co_nlocals else "()"
162 else:
163 f_args = "(?)"
164 return (
165 self.type.name if self.max_attempts != 1 else "ONCE",
166 self.handle.__qualname__ if self.alias is None else self.alias,
167 f_args,
168 str(self.datetime)[:19],
169 str(self.datetime.tzname()),
170 prettify_timedelta(dt_timedelta),
171 str(self.attempts),
172 str(float("inf") if self.max_attempts == 0 else self.max_attempts),
173 )
175 def __str__(self) -> str:
176 return "{0}, {1}{2}, at={3}, tz={4}, in={5}, #{6}/{7}".format(*self._str())
178 def timedelta(self, dt_stamp: Optional[dt.datetime] = None) -> dt.timedelta:
179 """
180 Get the `datetime.timedelta` until the next execution of this `Job`.
182 Parameters
183 ----------
184 dt_stamp : Optional[datetime.datetime]
185 Time to be compared with the planned execution time to determine the time difference.
187 Returns
188 -------
189 timedelta
190 `datetime.timedelta` to the next execution.
191 """
192 if dt_stamp is None:
193 dt_stamp = dt.datetime.now(self.__tzinfo)
194 if not self.__delay and self.__attempts == 0:
195 return cast(dt.datetime, self.__start) - dt_stamp
196 return self.__pending_timer.timedelta(dt_stamp)
198 @property
199 def datetime(self) -> dt.datetime:
200 """
201 Give the `datetime.datetime` object for the planed execution.
203 Returns
204 -------
205 datetime.datetime
206 Execution `datetime.datetime` stamp.
207 """
208 if not self.__delay and self.__attempts == 0:
209 return cast(dt.datetime, self.__start)
210 return self.__pending_timer.datetime
212 @property
213 def type(self) -> JobType:
214 """
215 Return the `JobType` of the `Job` instance.
217 Returns
218 -------
219 JobType
220 :class:`~scheduler.job.JobType` of the |BaseJob|.
221 """
222 return self.__type
224 @property
225 def handle(self) -> Callable[..., None]:
226 """
227 Get the callback function.
229 Returns
230 -------
231 handle
232 Callback function.
233 """
234 return self.__handle
236 @property
237 def args(self) -> tuple[Any, ...]:
238 r"""
239 Get the positional arguments of the function handle within a `Job`.
241 .. warning:: When running |BaseJob|\ s in parallel threads,
242 be sure to implement possible side effects of parameter accessing in a
243 thread safe manner.
245 Returns
246 -------
247 tuple[Any]
248 The payload arguments to pass to the function handle within a
249 |BaseJob|.
250 """
251 return self.__args
253 @property
254 def kwargs(self) -> dict[str, Any]:
255 r"""
256 Get the keyword arguments of the function handle within a `Job`.
258 .. warning:: When running |BaseJob|\ s in parallel threads,
259 be sure to implement possible side effects of parameter accessing in a
260 thread safe manner.
262 Returns
263 -------
264 dict[str, Any]
265 The payload arguments to pass to the function handle within a
266 |BaseJob|.
267 """
268 return self.__kwargs
270 @property
271 def max_attempts(self) -> int:
272 """
273 Get the execution limit for a `Job`.
275 Returns
276 -------
277 int
278 Max execution attempts.
279 """
280 return self.__max_attempts
282 @property
283 def tags(self) -> set[str]:
284 r"""
285 Get the tags of a `Job`.
287 Returns
288 -------
289 set[str]
290 The tags of a |BaseJob|.
291 """
292 return self.__tags.copy()
294 @property
295 def delay(self) -> bool:
296 """
297 *Deprecated*: Return ``True`` if the first `Job` execution will wait for the next scheduled time.
299 Returns
300 -------
301 bool
302 If ``True`` wait with the execution for the next scheduled time. If ``False``
303 the first execution will target the time of `Job.start`.
304 """
305 warnings.warn(
306 (
307 "Using the `delay` property is deprecated and will "
308 "be removed in the next minor release."
309 ),
310 DeprecationWarning,
311 stacklevel=2,
312 )
313 return self.__delay
315 @property
316 def start(self) -> Optional[dt.datetime]:
317 """
318 Get the timestamp at which the `JobTimer` starts.
320 Returns
321 -------
322 Optional[datetime.datetime]
323 The start datetime stamp.
324 """
325 return self.__start
327 @property
328 def stop(self) -> Optional[dt.datetime]:
329 """
330 Get the timestamp after which no more executions of the `Job` should be scheduled.
332 Returns
333 -------
334 Optional[datetime.datetime]
335 The stop datetime stamp.
336 """
337 return self.__stop
339 @property
340 def skip_missing(self) -> bool:
341 """
342 Return ``True`` if `Job` will only schedule it's newest planned execution.
344 Returns
345 -------
346 bool
347 If ``True`` a |BaseJob| will only schedule it's newest planned
348 execution and drop older ones.
349 """
350 return self.__skip_missing
352 @property
353 def alias(self) -> Optional[str]:
354 r"""
355 Get the alias of the `Job`.
357 Returns
358 -------
359 Optional[str]
360 Alias of the |BaseJob|.
361 """
362 return self.__alias
364 @property
365 def tzinfo(self) -> Optional[dt.tzinfo]:
366 r"""
367 Get the timezone of the `Job`'s next execution.
369 Returns
370 -------
371 Optional[datetime.tzinfo]
372 Timezone of the |BaseJob|\ s next execution.
373 """
374 return self.datetime.tzinfo
376 @property
377 def _tzinfo(self) -> Optional[dt.tzinfo]:
378 """
379 Get the timezone of the `Scheduler` in which the `Job` is living.
381 Returns
382 -------
383 Optional[datetime.tzinfo]
384 Timezone of the |BaseJob|.
385 """
386 return self.__tzinfo
388 @property
389 def attempts(self) -> int:
390 """
391 Get the number of executions for a `Job`.
393 Returns
394 -------
395 int
396 Execution attempts.
397 """
398 return self.__attempts
400 @property
401 def failed_attempts(self) -> int:
402 """
403 Get the number of failed executions for a `Job`.
405 Returns
406 -------
407 int
408 Failed execution attempts.
409 """
410 return self.__failed_attempts
412 @property
413 def has_attempts_remaining(self) -> bool:
414 """
415 Check if a `Job` has remaining attempts.
417 This function will return True if the |BaseJob| has open
418 execution counts and the stop argument is not in the past relative to the
419 next planed execution.
421 Returns
422 -------
423 bool
424 True if the |BaseJob| has execution attempts.
425 """
426 if self.__mark_delete:
427 return False
428 if self.__max_attempts == 0:
429 return True
430 return self.__attempts < self.__max_attempts
433BaseJobType = TypeVar("BaseJobType", bound=BaseJob)