Coverage for scheduler/base/job.py: 100%
147 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
1"""
2Implementation of a `BaseJob`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9import datetime as dt
10import warnings
11from abc import ABC, abstractmethod
12from logging import Logger
13from typing import Any, Callable, Generic, Optional, TypeVar, cast
15from scheduler.base.definition import JobType
16from scheduler.base.job_timer import JobTimer
17from scheduler.base.job_util import (
18 check_duplicate_effective_timings,
19 check_timing_tzinfo,
20 get_pending_timer,
21 prettify_timedelta,
22 sane_timing_types,
23 set_start_check_stop_tzinfo,
24 standardize_timing_format,
25)
26from scheduler.base.timingtype import TimingJobUnion
28T = TypeVar("T", bound=Callable[[], Any])
31class BaseJob(ABC, Generic[T]):
32 """Abstract definition basic interface for a job class."""
34 __type: JobType
35 __timing: TimingJobUnion
36 __handle: T
37 __args: tuple[Any, ...]
38 __kwargs: dict[str, Any]
39 __max_attempts: int
40 __tags: set[str]
41 __delay: bool
42 __start: Optional[dt.datetime]
43 __stop: Optional[dt.datetime]
44 __skip_missing: bool
45 __alias: Optional[str]
46 __tzinfo: Optional[dt.tzinfo]
47 __logger: Logger
49 __mark_delete: bool
50 __attempts: int
51 __failed_attempts: int
52 __pending_timer: JobTimer
53 __timers: list[JobTimer]
55 def __init__(
56 self,
57 job_type: JobType,
58 timing: TimingJobUnion,
59 handle: T,
60 *,
61 args: Optional[tuple[Any, ...]] = None,
62 kwargs: Optional[dict[str, Any]] = None,
63 max_attempts: int = 0,
64 tags: Optional[set[str]] = None,
65 delay: bool = True,
66 start: Optional[dt.datetime] = None,
67 stop: Optional[dt.datetime] = None,
68 skip_missing: bool = False,
69 alias: Optional[str] = None,
70 tzinfo: Optional[dt.tzinfo] = None,
71 ):
72 timing = standardize_timing_format(job_type, timing)
74 sane_timing_types(job_type, timing)
75 check_timing_tzinfo(job_type, timing, tzinfo)
76 check_duplicate_effective_timings(job_type, timing, tzinfo)
78 self.__start = set_start_check_stop_tzinfo(start, stop, tzinfo)
80 self.__type = job_type
81 self.__timing = timing # pylint: disable=unused-private-member
82 # NOTE: https://github.com/python/mypy/issues/708
83 # https://github.com/python/mypy/issues/2427
84 self.__handle = handle
85 self.__args = () if args is None else args
86 self.__kwargs = {} if kwargs is None else kwargs.copy()
87 self.__max_attempts = max_attempts
88 self.__tags = set() if tags is None else tags.copy()
89 self.__delay = delay
90 self.__stop = stop
91 self.__skip_missing = skip_missing
92 self.__alias = alias
93 self.__tzinfo = tzinfo
95 # self.__mark_delete will be set to True if the new Timer would be in future
96 # relativ to the self.__stop variable
97 self.__mark_delete = False
98 self.__attempts = 0
99 self.__failed_attempts = 0
101 # create JobTimers
102 self.__timers = [JobTimer(job_type, tim, self.__start, skip_missing) for tim in timing]
103 self.__pending_timer = get_pending_timer(self.__timers)
105 if self.__stop is not None:
106 if self.__pending_timer.datetime > self.__stop:
107 self.__mark_delete = True
109 def __lt__(self, other: BaseJob[T]) -> bool:
110 return self.datetime < other.datetime
112 def _calc_next_exec(self, ref_dt: dt.datetime) -> None:
113 """
114 Calculate the next estimated execution `datetime.datetime` of the `Job`.
116 Parameters
117 ----------
118 ref_dt : datetime.datetime
119 Reference time stamp to which the |BaseJob| calculates
120 it's next execution.
121 """
122 if self.__skip_missing:
123 for timer in self.__timers:
124 if (timer.datetime - ref_dt).total_seconds() <= 0:
125 timer.calc_next_exec(ref_dt)
126 else:
127 self.__pending_timer.calc_next_exec(ref_dt)
128 self.__pending_timer = get_pending_timer(self.__timers)
129 if self.__stop is not None and self.__pending_timer.datetime > self.__stop:
130 self.__mark_delete = True
132 def _repr(self) -> tuple[str, ...]:
133 return tuple(
134 repr(elem)
135 for elem in (
136 self.__type,
137 self.__timing,
138 self.__handle,
139 self.__args,
140 self.__kwargs,
141 self.__max_attempts,
142 self.__delay,
143 self.__start,
144 self.__stop,
145 self.__skip_missing,
146 self.__alias,
147 self.tzinfo,
148 )
149 )
151 @abstractmethod
152 def __repr__(self) -> str:
153 raise NotImplementedError() # pragma: no cover
155 def _str(
156 self,
157 ) -> tuple[str, ...]:
158 """Return the objects relevant for readable string representation."""
159 dt_timedelta = self.timedelta(dt.datetime.now(self.tzinfo))
160 if self.alias is not None:
161 f_args = ""
162 elif hasattr(self.handle, "__code__"):
163 f_args = "(..)" if self.handle.__code__.co_nlocals else "()"
164 else:
165 f_args = "(?)"
166 return (
167 self.type.name if self.max_attempts != 1 else "ONCE",
168 self.handle.__qualname__ if self.alias is None else self.alias,
169 f_args,
170 str(self.datetime)[:19],
171 str(self.datetime.tzname()),
172 prettify_timedelta(dt_timedelta),
173 str(self.attempts),
174 str(float("inf") if self.max_attempts == 0 else self.max_attempts),
175 )
177 def __str__(self) -> str:
178 return "{0}, {1}{2}, at={3}, tz={4}, in={5}, #{6}/{7}".format(*self._str())
180 def timedelta(self, dt_stamp: Optional[dt.datetime] = None) -> dt.timedelta:
181 """
182 Get the `datetime.timedelta` until the next execution of this `Job`.
184 Parameters
185 ----------
186 dt_stamp : Optional[datetime.datetime]
187 Time to be compared with the planned execution time to determine the time difference.
189 Returns
190 -------
191 timedelta
192 `datetime.timedelta` to the next execution.
193 """
194 if dt_stamp is None:
195 dt_stamp = dt.datetime.now(self.__tzinfo)
196 if not self.__delay and self.__attempts == 0:
197 return cast(dt.datetime, self.__start) - dt_stamp
198 return self.__pending_timer.timedelta(dt_stamp)
200 @property
201 def datetime(self) -> dt.datetime:
202 """
203 Give the `datetime.datetime` object for the planed execution.
205 Returns
206 -------
207 datetime.datetime
208 Execution `datetime.datetime` stamp.
209 """
210 if not self.__delay and self.__attempts == 0:
211 return cast(dt.datetime, self.__start)
212 return self.__pending_timer.datetime
214 @property
215 def type(self) -> JobType:
216 """
217 Return the `JobType` of the `Job` instance.
219 Returns
220 -------
221 JobType
222 :class:`~scheduler.job.JobType` of the |BaseJob|.
223 """
224 return self.__type
226 @property
227 def handle(self) -> T:
228 """
229 Get the callback function.
231 Returns
232 -------
233 handle
234 Callback function.
235 """
236 return self.__handle
238 @property
239 def args(self) -> tuple[Any, ...]:
240 r"""
241 Get the positional arguments of the function handle within a `Job`.
243 .. warning:: When running |BaseJob|\ s in parallel threads,
244 be sure to implement possible side effects of parameter accessing in a
245 thread safe manner.
247 Returns
248 -------
249 tuple[Any]
250 The payload arguments to pass to the function handle within a
251 |BaseJob|.
252 """
253 return self.__args
255 @property
256 def kwargs(self) -> dict[str, Any]:
257 r"""
258 Get the keyword arguments of the function handle within a `Job`.
260 .. warning:: When running |BaseJob|\ s in parallel threads,
261 be sure to implement possible side effects of parameter accessing in a
262 thread safe manner.
264 Returns
265 -------
266 dict[str, Any]
267 The payload arguments to pass to the function handle within a
268 |BaseJob|.
269 """
270 return self.__kwargs
272 @property
273 def max_attempts(self) -> int:
274 """
275 Get the execution limit for a `Job`.
277 Returns
278 -------
279 int
280 Max execution attempts.
281 """
282 return self.__max_attempts
284 @property
285 def tags(self) -> set[str]:
286 r"""
287 Get the tags of a `Job`.
289 Returns
290 -------
291 set[str]
292 The tags of a |BaseJob|.
293 """
294 return self.__tags.copy()
296 @property
297 def delay(self) -> bool:
298 """
299 *Deprecated*: Return ``True`` if the first `Job` execution will wait for the next scheduled time.
301 Returns
302 -------
303 bool
304 If ``True`` wait with the execution for the next scheduled time. If ``False``
305 the first execution will target the time of `Job.start`.
306 """
307 warnings.warn(
308 (
309 "Using the `delay` property is deprecated and will "
310 "be removed in the next minor release."
311 ),
312 DeprecationWarning,
313 stacklevel=2,
314 )
315 return self.__delay
317 @property
318 def start(self) -> Optional[dt.datetime]:
319 """
320 Get the timestamp at which the `JobTimer` starts.
322 Returns
323 -------
324 Optional[datetime.datetime]
325 The start datetime stamp.
326 """
327 return self.__start
329 @property
330 def stop(self) -> Optional[dt.datetime]:
331 """
332 Get the timestamp after which no more executions of the `Job` should be scheduled.
334 Returns
335 -------
336 Optional[datetime.datetime]
337 The stop datetime stamp.
338 """
339 return self.__stop
341 @property
342 def skip_missing(self) -> bool:
343 """
344 Return ``True`` if `Job` will only schedule it's newest planned execution.
346 Returns
347 -------
348 bool
349 If ``True`` a |BaseJob| will only schedule it's newest planned
350 execution and drop older ones.
351 """
352 return self.__skip_missing
354 @property
355 def alias(self) -> Optional[str]:
356 r"""
357 Get the alias of the `Job`.
359 Returns
360 -------
361 Optional[str]
362 Alias of the |BaseJob|.
363 """
364 return self.__alias
366 @property
367 def tzinfo(self) -> Optional[dt.tzinfo]:
368 r"""
369 Get the timezone of the `Job`'s next execution.
371 Returns
372 -------
373 Optional[datetime.tzinfo]
374 Timezone of the |BaseJob|\ s next execution.
375 """
376 return self.datetime.tzinfo
378 @property
379 def _tzinfo(self) -> Optional[dt.tzinfo]:
380 """
381 Get the timezone of the `Scheduler` in which the `Job` is living.
383 Returns
384 -------
385 Optional[datetime.tzinfo]
386 Timezone of the |BaseJob|.
387 """
388 return self.__tzinfo
390 @property
391 def attempts(self) -> int:
392 """
393 Get the number of executions for a `Job`.
395 Returns
396 -------
397 int
398 Execution attempts.
399 """
400 return self.__attempts
402 @property
403 def failed_attempts(self) -> int:
404 """
405 Get the number of failed executions for a `Job`.
407 Returns
408 -------
409 int
410 Failed execution attempts.
411 """
412 return self.__failed_attempts
414 @property
415 def has_attempts_remaining(self) -> bool:
416 """
417 Check if a `Job` has remaining attempts.
419 This function will return True if the |BaseJob| has open
420 execution counts and the stop argument is not in the past relative to the
421 next planed execution.
423 Returns
424 -------
425 bool
426 True if the |BaseJob| has execution attempts.
427 """
428 if self.__mark_delete:
429 return False
430 if self.__max_attempts == 0:
431 return True
432 return self.__attempts < self.__max_attempts
435BaseJobType = TypeVar("BaseJobType", bound=BaseJob[Any])