Coverage for scheduler/base/job_timer.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-01-09 20:04 +0000

1""" 

2Implementation of the essential timer for a `BaseJob`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9import datetime as dt 

10import threading 

11from typing import Optional, cast 

12 

13from scheduler.base.definition import JobType 

14from scheduler.base.timingtype import TimingJobTimerUnion 

15from scheduler.trigger.core import Weekday 

16from scheduler.util import JOB_NEXT_DAYLIKE_MAPPING, next_weekday_time_occurrence 

17 

18 

19class JobTimer: 

20 """ 

21 The class provides the internal `datetime.datetime` calculations for a |BaseJob|. 

22 

23 Parameters 

24 ---------- 

25 job_type : JobType 

26 Indicator which defines which calculations has to be used. 

27 timing : TimingJobTimerUnion 

28 Desired execution time(s). 

29 start : datetime.datetime 

30 Timestamp reference from which future executions will be calculated. 

31 skip_missing : bool 

32 If ``True`` a |BaseJob| will only schedule it's newest planned 

33 execution and drop older ones. 

34 """ 

35 

36 def __init__( 

37 self, 

38 job_type: JobType, 

39 timing: TimingJobTimerUnion, 

40 start: dt.datetime, 

41 skip_missing: bool = False, 

42 ): 

43 self.__lock = threading.RLock() 

44 self.__job_type = job_type 

45 self.__timing = timing 

46 self.__next_exec = start 

47 self.__skip = skip_missing 

48 self.calc_next_exec() 

49 

50 def calc_next_exec(self, ref: Optional[dt.datetime] = None) -> None: 

51 """ 

52 Generate the next execution `datetime.datetime` stamp. 

53 

54 Parameters 

55 ---------- 

56 ref : Optional[datetime.datetime] 

57 Datetime reference for scheduling the next execution datetime. 

58 """ 

59 with self.__lock: 

60 if self.__job_type == JobType.CYCLIC: 

61 if self.__skip and ref is not None: 

62 self.__next_exec = ref 

63 self.__next_exec = self.__next_exec + cast(dt.timedelta, self.__timing) 

64 return 

65 

66 if self.__job_type == JobType.WEEKLY: 

67 self.__timing = cast(Weekday, self.__timing) 

68 if self.__timing.time.tzinfo: 

69 self.__next_exec = self.__next_exec.astimezone(self.__timing.time.tzinfo) 

70 self.__next_exec = next_weekday_time_occurrence( 

71 self.__next_exec, self.__timing, self.__timing.time 

72 ) 

73 

74 else: # self.__job_type in JOB_NEXT_DAYLIKE_MAPPING: 

75 self.__timing = cast(dt.time, self.__timing) 

76 if self.__next_exec.tzinfo: 

77 self.__next_exec = self.__next_exec.astimezone(self.__timing.tzinfo) 

78 self.__next_exec = JOB_NEXT_DAYLIKE_MAPPING[self.__job_type]( 

79 self.__next_exec, self.__timing 

80 ) 

81 

82 if self.__skip and ref is not None and self.__next_exec < ref: 

83 self.__next_exec = ref 

84 self.calc_next_exec() 

85 

86 @property 

87 def datetime(self) -> dt.datetime: 

88 """ 

89 Get the `datetime.datetime` object for the planed execution. 

90 

91 Returns 

92 ------- 

93 datetime.datetime 

94 Execution `datetime.datetime` stamp. 

95 """ 

96 with self.__lock: 

97 return self.__next_exec 

98 

99 def timedelta(self, dt_stamp: dt.datetime) -> dt.timedelta: 

100 """ 

101 Get the `datetime.timedelta` until the execution of this `Job`. 

102 

103 Parameters 

104 ---------- 

105 dt_stamp : datetime.datetime 

106 Time to be compared with the planned execution time 

107 to determine the time difference. 

108 

109 Returns 

110 ------- 

111 datetime.timedelta 

112 `datetime.timedelta` to the execution. 

113 """ 

114 with self.__lock: 

115 return self.__next_exec - dt_stamp