Source code for scheduler.base.job_timer

"""
Implementation of the essential timer for a `BaseJob`.

Author: Jendrik A. Potyka, Fabian A. Preiss
"""

from __future__ import annotations

import datetime as dt
import threading
from typing import Optional, cast

from scheduler.base.definition import JobType
from scheduler.base.timingtype import TimingJobTimerUnion
from scheduler.trigger.core import Weekday
from scheduler.util import JOB_NEXT_DAYLIKE_MAPPING, next_weekday_time_occurrence


[docs]class JobTimer: """ The class provides the internal `datetime.datetime` calculations for a |BaseJob|. Parameters ---------- job_type : JobType Indicator which defines which calculations has to be used. timing : TimingJobTimerUnion Desired execution time(s). start : datetime.datetime Timestamp reference from which future executions will be calculated. skip_missing : bool If ``True`` a |BaseJob| will only schedule it's newest planned execution and drop older ones. """ def __init__( self, job_type: JobType, timing: TimingJobTimerUnion, start: dt.datetime, skip_missing: bool = False, ): self.__lock = threading.RLock() self.__job_type = job_type self.__timing = timing self.__next_exec = start self.__skip = skip_missing self.calc_next_exec()
[docs] def calc_next_exec(self, ref: Optional[dt.datetime] = None) -> None: """ Generate the next execution `datetime.datetime` stamp. Parameters ---------- ref : Optional[datetime.datetime] Datetime reference for scheduling the next execution datetime. """ with self.__lock: if self.__job_type == JobType.CYCLIC: if self.__skip and ref is not None: self.__next_exec = ref self.__next_exec = self.__next_exec + cast(dt.timedelta, self.__timing) return if self.__job_type == JobType.WEEKLY: self.__timing = cast(Weekday, self.__timing) if self.__timing.time.tzinfo: self.__next_exec = self.__next_exec.astimezone(self.__timing.time.tzinfo) self.__next_exec = next_weekday_time_occurrence( self.__next_exec, self.__timing, self.__timing.time ) else: # self.__job_type in JOB_NEXT_DAYLIKE_MAPPING: self.__timing = cast(dt.time, self.__timing) if self.__next_exec.tzinfo: self.__next_exec = self.__next_exec.astimezone(self.__timing.tzinfo) self.__next_exec = JOB_NEXT_DAYLIKE_MAPPING[self.__job_type]( self.__next_exec, self.__timing ) if self.__skip and ref is not None and self.__next_exec < ref: self.__next_exec = ref self.calc_next_exec()
@property def datetime(self) -> dt.datetime: """ Get the `datetime.datetime` object for the planed execution. Returns ------- datetime.datetime Execution `datetime.datetime` stamp. """ with self.__lock: return self.__next_exec
[docs] def timedelta(self, dt_stamp: dt.datetime) -> dt.timedelta: """ Get the `datetime.timedelta` until the execution of this `Job`. Parameters ---------- dt_stamp : datetime.datetime Time to be compared with the planned execution time to determine the time difference. Returns ------- datetime.timedelta `datetime.timedelta` to the execution. """ with self.__lock: return self.__next_exec - dt_stamp