Coverage for scheduler/base/job_util.py: 100%

65 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-01-09 20:37 +0000

1""" 

2Implementation of essential functions for a `BaseJob`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9import datetime as dt 

10from typing import Optional, cast 

11 

12import typeguard as tg 

13 

14from scheduler.base.definition import JOB_TIMING_TYPE_MAPPING, JobType 

15from scheduler.base.job_timer import JobTimer 

16from scheduler.base.timingtype import TimingJobUnion 

17from scheduler.error import SchedulerError 

18from scheduler.message import ( 

19 _TZ_ERROR_MSG, 

20 DUPLICATE_EFFECTIVE_TIME, 

21 START_STOP_ERROR, 

22 TZ_ERROR_MSG, 

23) 

24from scheduler.trigger.core import Weekday 

25from scheduler.util import are_times_unique, are_weekday_times_unique 

26 

27 

28def prettify_timedelta(timedelta: dt.timedelta) -> str: 

29 """ 

30 Humanize timedelta string readability for negative values. 

31 

32 Parameters 

33 ---------- 

34 timedelta : datetime.timedelta 

35 datetime instance 

36 

37 Returns 

38 ------- 

39 str 

40 Human readable string representation rounded to seconds 

41 """ 

42 seconds = timedelta.total_seconds() 

43 if seconds < 0: 

44 res = f"-{-timedelta}" 

45 else: 

46 res = str(timedelta) 

47 return res.split(",")[0].split(".")[0] 

48 

49 

50def get_pending_timer(timers: list[JobTimer]) -> JobTimer: 

51 """Get the the timer with the largest overdue time.""" 

52 unsorted_timer_datetimes: dict[JobTimer, dt.datetime] = {} 

53 for timer in timers: 

54 unsorted_timer_datetimes[timer] = timer.datetime 

55 sorted_timers = sorted( 

56 unsorted_timer_datetimes, 

57 key=unsorted_timer_datetimes.get, # type: ignore 

58 ) 

59 return sorted_timers[0] 

60 

61 

62def sane_timing_types(job_type: JobType, timing: TimingJobUnion) -> None: 

63 """ 

64 Determine if the `JobType` is fulfilled by the type of the specified `timing`. 

65 

66 Parameters 

67 ---------- 

68 job_type : JobType 

69 :class:`~scheduler.job.JobType` to test agains. 

70 timing : TimingJobUnion 

71 The `timing` object to be tested. 

72 

73 Raises 

74 ------ 

75 TypeError 

76 If the `timing` object has the wrong `Type` for a specific `JobType`. 

77 """ 

78 try: 

79 tg.check_type("timing", timing, JOB_TIMING_TYPE_MAPPING[job_type]["type"]) 

80 if job_type == JobType.CYCLIC: 

81 if not len(timing) == 1: 

82 raise TypeError 

83 except TypeError as err: 

84 raise SchedulerError(JOB_TIMING_TYPE_MAPPING[job_type]["err"]) from err 

85 

86 

87def standardize_timing_format(job_type: JobType, timing: TimingJobUnion) -> TimingJobUnion: 

88 r""" 

89 Return timings in standardized form. 

90 

91 Clears irrelevant time positionals for `JobType.MINUTELY` and `JobType.HOURLY`. 

92 """ 

93 if job_type is JobType.MINUTELY: 

94 timing = [time.replace(hour=0, minute=0) for time in cast(list[dt.time], timing)] 

95 elif job_type is JobType.HOURLY: 

96 timing = [time.replace(hour=0) for time in cast(list[dt.time], timing)] 

97 return timing 

98 

99 

100def check_timing_tzinfo( 

101 job_type: JobType, 

102 timing: TimingJobUnion, 

103 tzinfo: Optional[dt.tzinfo], 

104): 

105 """Raise if `timing` incompatible with `tzinfo` for `job_type`.""" 

106 if job_type is JobType.WEEKLY: 

107 for weekday in cast(list[Weekday], timing): 

108 if bool(weekday.time.tzinfo) ^ bool(tzinfo): 

109 raise SchedulerError(TZ_ERROR_MSG) 

110 elif job_type in (JobType.MINUTELY, JobType.HOURLY, JobType.DAILY): 

111 for time in cast(list[dt.time], timing): 

112 if bool(time.tzinfo) ^ bool(tzinfo): 

113 raise SchedulerError(TZ_ERROR_MSG) 

114 

115 

116def check_duplicate_effective_timings( 

117 job_type: JobType, 

118 timing: TimingJobUnion, 

119 tzinfo: Optional[dt.tzinfo], 

120): 

121 """Raise given timings are not effectively duplicates.""" 

122 if job_type is JobType.WEEKLY: 

123 if not are_weekday_times_unique(cast(list[Weekday], timing), tzinfo): 

124 raise SchedulerError(DUPLICATE_EFFECTIVE_TIME) 

125 elif job_type in ( 

126 JobType.MINUTELY, 

127 JobType.HOURLY, 

128 JobType.DAILY, 

129 ): 

130 if not are_times_unique(cast(list[dt.time], timing)): 

131 raise SchedulerError(DUPLICATE_EFFECTIVE_TIME) 

132 

133 

134def set_start_check_stop_tzinfo( 

135 start: Optional[dt.datetime], 

136 stop: Optional[dt.datetime], 

137 tzinfo: Optional[dt.tzinfo], 

138) -> dt.datetime: 

139 """Raise if `start`, `stop` and `tzinfo` incompatible; Make start.""" 

140 if start: 

141 if bool(start.tzinfo) ^ bool(tzinfo): 

142 raise SchedulerError(_TZ_ERROR_MSG.format("start")) 

143 else: 

144 start = dt.datetime.now(tzinfo) 

145 if stop: 

146 if bool(stop.tzinfo) ^ bool(tzinfo): 

147 raise SchedulerError(_TZ_ERROR_MSG.format("stop")) 

148 if stop is not None: 

149 if start >= stop: 

150 raise SchedulerError(START_STOP_ERROR) 

151 return start