Coverage for scheduler/base/job_timer.py: 100%
42 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-01-09 20:23 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-01-09 20:23 +0000
1"""
2Implementation of the essential timer for a `BaseJob`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9import datetime as dt
10import threading
11from typing import Optional, cast
13from scheduler.base.definition import JobType
14from scheduler.base.timingtype import TimingJobTimerUnion
15from scheduler.trigger.core import Weekday
16from scheduler.util import JOB_NEXT_DAYLIKE_MAPPING, next_weekday_time_occurrence
19class JobTimer:
20 """
21 The class provides the internal `datetime.datetime` calculations for a |BaseJob|.
23 Parameters
24 ----------
25 job_type : JobType
26 Indicator which defines which calculations has to be used.
27 timing : TimingJobTimerUnion
28 Desired execution time(s).
29 start : datetime.datetime
30 Timestamp reference from which future executions will be calculated.
31 skip_missing : bool
32 If ``True`` a |BaseJob| will only schedule it's newest planned
33 execution and drop older ones.
34 """
36 def __init__(
37 self,
38 job_type: JobType,
39 timing: TimingJobTimerUnion,
40 start: dt.datetime,
41 skip_missing: bool = False,
42 ):
43 self.__lock = threading.RLock()
44 self.__job_type = job_type
45 self.__timing = timing
46 self.__next_exec = start
47 self.__skip = skip_missing
48 self.calc_next_exec()
50 def calc_next_exec(self, ref: Optional[dt.datetime] = None) -> None:
51 """
52 Generate the next execution `datetime.datetime` stamp.
54 Parameters
55 ----------
56 ref : Optional[datetime.datetime]
57 Datetime reference for scheduling the next execution datetime.
58 """
59 with self.__lock:
60 if self.__job_type == JobType.CYCLIC:
61 if self.__skip and ref is not None:
62 self.__next_exec = ref
63 self.__next_exec = self.__next_exec + cast(dt.timedelta, self.__timing)
64 return
66 if self.__job_type == JobType.WEEKLY:
67 self.__timing = cast(Weekday, self.__timing)
68 if self.__timing.time.tzinfo:
69 self.__next_exec = self.__next_exec.astimezone(self.__timing.time.tzinfo)
70 self.__next_exec = next_weekday_time_occurrence(
71 self.__next_exec, self.__timing, self.__timing.time
72 )
74 else: # self.__job_type in JOB_NEXT_DAYLIKE_MAPPING:
75 self.__timing = cast(dt.time, self.__timing)
76 if self.__next_exec.tzinfo:
77 self.__next_exec = self.__next_exec.astimezone(self.__timing.tzinfo)
78 self.__next_exec = JOB_NEXT_DAYLIKE_MAPPING[self.__job_type](
79 self.__next_exec, self.__timing
80 )
82 if self.__skip and ref is not None and self.__next_exec < ref:
83 self.__next_exec = ref
84 self.calc_next_exec()
86 @property
87 def datetime(self) -> dt.datetime:
88 """
89 Get the `datetime.datetime` object for the planed execution.
91 Returns
92 -------
93 datetime.datetime
94 Execution `datetime.datetime` stamp.
95 """
96 with self.__lock:
97 return self.__next_exec
99 def timedelta(self, dt_stamp: dt.datetime) -> dt.timedelta:
100 """
101 Get the `datetime.timedelta` until the execution of this `Job`.
103 Parameters
104 ----------
105 dt_stamp : datetime.datetime
106 Time to be compared with the planned execution time
107 to determine the time difference.
109 Returns
110 -------
111 datetime.timedelta
112 `datetime.timedelta` to the execution.
113 """
114 with self.__lock:
115 return self.__next_exec - dt_stamp