Coverage for scheduler/base/job_util.py: 100%
65 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:15 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:15 +0000
1"""
2Implementation of essential functions for a `BaseJob`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9import datetime as dt
10from typing import Optional, cast
12import typeguard as tg
14from scheduler.base.definition import JOB_TIMING_TYPE_MAPPING, JobType
15from scheduler.base.job_timer import JobTimer
16from scheduler.base.timingtype import TimingJobUnion
17from scheduler.error import SchedulerError
18from scheduler.message import (
19 _TZ_ERROR_MSG,
20 DUPLICATE_EFFECTIVE_TIME,
21 START_STOP_ERROR,
22 TZ_ERROR_MSG,
23)
24from scheduler.trigger.core import Weekday
25from scheduler.util import are_times_unique, are_weekday_times_unique
28def prettify_timedelta(timedelta: dt.timedelta) -> str:
29 """
30 Humanize timedelta string readability for negative values.
32 Parameters
33 ----------
34 timedelta : datetime.timedelta
35 datetime instance
37 Returns
38 -------
39 str
40 Human readable string representation rounded to seconds
41 """
42 seconds = timedelta.total_seconds()
43 if seconds < 0:
44 res = f"-{-timedelta}"
45 else:
46 res = str(timedelta)
47 return res.split(",")[0].split(".")[0]
50def get_pending_timer(timers: list[JobTimer]) -> JobTimer:
51 """Get the the timer with the largest overdue time."""
52 unsorted_timer_datetimes: dict[JobTimer, dt.datetime] = {}
53 for timer in timers:
54 unsorted_timer_datetimes[timer] = timer.datetime
55 sorted_timers = sorted(
56 unsorted_timer_datetimes,
57 key=unsorted_timer_datetimes.get, # type: ignore
58 )
59 return sorted_timers[0]
62def sane_timing_types(job_type: JobType, timing: TimingJobUnion) -> None:
63 """
64 Determine if the `JobType` is fulfilled by the type of the specified `timing`.
66 Parameters
67 ----------
68 job_type : JobType
69 :class:`~scheduler.job.JobType` to test agains.
70 timing : TimingJobUnion
71 The `timing` object to be tested.
73 Raises
74 ------
75 TypeError
76 If the `timing` object has the wrong `Type` for a specific `JobType`.
77 """
78 try:
79 tg.check_type(timing, JOB_TIMING_TYPE_MAPPING[job_type]["type"])
80 if job_type == JobType.CYCLIC:
81 if not len(timing) == 1:
82 raise TypeError
83 except TypeError as err:
84 raise SchedulerError(JOB_TIMING_TYPE_MAPPING[job_type]["err"]) from err
87def standardize_timing_format(job_type: JobType, timing: TimingJobUnion) -> TimingJobUnion:
88 r"""
89 Return timings in standardized form.
91 Clears irrelevant time positionals for `JobType.MINUTELY` and `JobType.HOURLY`.
92 """
93 if job_type is JobType.MINUTELY:
94 timing = [time.replace(hour=0, minute=0) for time in cast(list[dt.time], timing)]
95 elif job_type is JobType.HOURLY:
96 timing = [time.replace(hour=0) for time in cast(list[dt.time], timing)]
97 return timing
100def check_timing_tzinfo(
101 job_type: JobType,
102 timing: TimingJobUnion,
103 tzinfo: Optional[dt.tzinfo],
104):
105 """Raise if `timing` incompatible with `tzinfo` for `job_type`."""
106 if job_type is JobType.WEEKLY:
107 for weekday in cast(list[Weekday], timing):
108 if bool(weekday.time.tzinfo) ^ bool(tzinfo):
109 raise SchedulerError(TZ_ERROR_MSG)
110 elif job_type in (JobType.MINUTELY, JobType.HOURLY, JobType.DAILY):
111 for time in cast(list[dt.time], timing):
112 if bool(time.tzinfo) ^ bool(tzinfo):
113 raise SchedulerError(TZ_ERROR_MSG)
116def check_duplicate_effective_timings(
117 job_type: JobType,
118 timing: TimingJobUnion,
119 tzinfo: Optional[dt.tzinfo],
120):
121 """Raise given timings are not effectively duplicates."""
122 if job_type is JobType.WEEKLY:
123 if not are_weekday_times_unique(cast(list[Weekday], timing), tzinfo):
124 raise SchedulerError(DUPLICATE_EFFECTIVE_TIME)
125 elif job_type in (
126 JobType.MINUTELY,
127 JobType.HOURLY,
128 JobType.DAILY,
129 ):
130 if not are_times_unique(cast(list[dt.time], timing)):
131 raise SchedulerError(DUPLICATE_EFFECTIVE_TIME)
134def set_start_check_stop_tzinfo(
135 start: Optional[dt.datetime],
136 stop: Optional[dt.datetime],
137 tzinfo: Optional[dt.tzinfo],
138) -> dt.datetime:
139 """Raise if `start`, `stop` and `tzinfo` incompatible; Make start."""
140 if start:
141 if bool(start.tzinfo) ^ bool(tzinfo):
142 raise SchedulerError(_TZ_ERROR_MSG.format("start"))
143 else:
144 start = dt.datetime.now(tzinfo)
145 if stop:
146 if bool(stop.tzinfo) ^ bool(tzinfo):
147 raise SchedulerError(_TZ_ERROR_MSG.format("stop"))
148 if stop is not None:
149 if start >= stop:
150 raise SchedulerError(START_STOP_ERROR)
151 return start