Source code for scheduler.base.job

"""
Implementation of a `BaseJob`.

Author: Jendrik A. Potyka, Fabian A. Preiss
"""

from __future__ import annotations

import datetime as dt
import warnings
from abc import ABC, abstractmethod
from logging import Logger
from typing import Any, Callable, Optional, TypeVar, cast

from scheduler.base.definition import JobType
from scheduler.base.job_timer import JobTimer
from scheduler.base.job_util import (
    check_duplicate_effective_timings,
    check_timing_tzinfo,
    get_pending_timer,
    prettify_timedelta,
    sane_timing_types,
    set_start_check_stop_tzinfo,
    standardize_timing_format,
)
from scheduler.base.timingtype import TimingJobUnion


[docs]class BaseJob(ABC): """Abstract definition basic interface for a job class.""" __type: JobType __timing: TimingJobUnion __handle: Callable[..., None] __args: tuple[Any, ...] __kwargs: dict[str, Any] __max_attempts: int __tags: set[str] __delay: bool __start: Optional[dt.datetime] __stop: Optional[dt.datetime] __skip_missing: bool __alias: Optional[str] __tzinfo: Optional[dt.tzinfo] __logger: Logger __mark_delete: bool __attempts: int __failed_attempts: int __pending_timer: JobTimer __timers: list[JobTimer] def __init__( self, job_type: JobType, timing: TimingJobUnion, handle: Callable[..., None], *, args: Optional[tuple[Any]] = None, kwargs: Optional[dict[str, Any]] = None, max_attempts: int = 0, tags: Optional[set[str]] = None, delay: bool = True, start: Optional[dt.datetime] = None, stop: Optional[dt.datetime] = None, skip_missing: bool = False, alias: str = None, tzinfo: Optional[dt.tzinfo] = None, ): timing = standardize_timing_format(job_type, timing) sane_timing_types(job_type, timing) check_timing_tzinfo(job_type, timing, tzinfo) check_duplicate_effective_timings(job_type, timing, tzinfo) self.__start = set_start_check_stop_tzinfo(start, stop, tzinfo) self.__type = job_type self.__timing = timing # pylint: disable=unused-private-member # NOTE: https://github.com/python/mypy/issues/708 # https://github.com/python/mypy/issues/2427 self.__handle = handle # type: ignore self.__args = () if args is None else args self.__kwargs = {} if kwargs is None else kwargs.copy() self.__max_attempts = max_attempts self.__tags = set() if tags is None else tags.copy() self.__delay = delay self.__stop = stop self.__skip_missing = skip_missing self.__alias = alias self.__tzinfo = tzinfo # self.__mark_delete will be set to True if the new Timer would be in future # relativ to the self.__stop variable self.__mark_delete = False self.__attempts = 0 self.__failed_attempts = 0 # create JobTimers self.__timers = [JobTimer(job_type, tim, self.__start, skip_missing) for tim in timing] self.__pending_timer = get_pending_timer(self.__timers) if self.__stop is not None: if self.__pending_timer.datetime > self.__stop: self.__mark_delete = True def __lt__(self, other: BaseJob): return self.datetime < other.datetime def _calc_next_exec(self, ref_dt: dt.datetime) -> None: """ Calculate the next estimated execution `datetime.datetime` of the `Job`. Parameters ---------- ref_dt : datetime.datetime Reference time stamp to which the |BaseJob| calculates it's next execution. """ if self.__skip_missing: for timer in self.__timers: if (timer.datetime - ref_dt).total_seconds() <= 0: timer.calc_next_exec(ref_dt) else: self.__pending_timer.calc_next_exec(ref_dt) self.__pending_timer = get_pending_timer(self.__timers) if self.__stop is not None and self.__pending_timer.datetime > self.__stop: self.__mark_delete = True def _repr(self) -> tuple[str, ...]: return tuple( repr(elem) for elem in ( self.__type, self.__timing, self.__handle, self.__args, self.__kwargs, self.__max_attempts, self.__delay, self.__start, self.__stop, self.__skip_missing, self.__alias, self.tzinfo, ) ) @abstractmethod def __repr__(self) -> str: raise NotImplementedError() # pragma: no cover def _str( self, ) -> tuple[str, ...]: """Return the objects relevant for readable string representation.""" dt_timedelta = self.timedelta(dt.datetime.now(self.tzinfo)) if self.alias is not None: f_args = "" elif hasattr(self.handle, "__code__"): f_args = "(..)" if self.handle.__code__.co_nlocals else "()" else: f_args = "(?)" return ( self.type.name if self.max_attempts != 1 else "ONCE", self.handle.__qualname__ if self.alias is None else self.alias, f_args, str(self.datetime)[:19], str(self.datetime.tzname()), prettify_timedelta(dt_timedelta), str(self.attempts), str(float("inf") if self.max_attempts == 0 else self.max_attempts), ) def __str__(self) -> str: return "{0}, {1}{2}, at={3}, tz={4}, in={5}, #{6}/{7}".format(*self._str())
[docs] def timedelta(self, dt_stamp: Optional[dt.datetime] = None) -> dt.timedelta: """ Get the `datetime.timedelta` until the next execution of this `Job`. Parameters ---------- dt_stamp : Optional[datetime.datetime] Time to be compared with the planned execution time to determine the time difference. Returns ------- timedelta `datetime.timedelta` to the next execution. """ if dt_stamp is None: dt_stamp = dt.datetime.now(self.__tzinfo) if not self.__delay and self.__attempts == 0: return cast(dt.datetime, self.__start) - dt_stamp return self.__pending_timer.timedelta(dt_stamp)
@property def datetime(self) -> dt.datetime: """ Give the `datetime.datetime` object for the planed execution. Returns ------- datetime.datetime Execution `datetime.datetime` stamp. """ if not self.__delay and self.__attempts == 0: return cast(dt.datetime, self.__start) return self.__pending_timer.datetime @property def type(self) -> JobType: """ Return the `JobType` of the `Job` instance. Returns ------- JobType :class:`~scheduler.job.JobType` of the |BaseJob|. """ return self.__type @property def handle(self) -> Callable[..., None]: """ Get the callback function. Returns ------- handle Callback function. """ return self.__handle @property def args(self) -> tuple[Any, ...]: r""" Get the positional arguments of the function handle within a `Job`. .. warning:: When running |BaseJob|\ s in parallel threads, be sure to implement possible side effects of parameter accessing in a thread safe manner. Returns ------- tuple[Any] The payload arguments to pass to the function handle within a |BaseJob|. """ return self.__args @property def kwargs(self) -> dict[str, Any]: r""" Get the keyword arguments of the function handle within a `Job`. .. warning:: When running |BaseJob|\ s in parallel threads, be sure to implement possible side effects of parameter accessing in a thread safe manner. Returns ------- dict[str, Any] The payload arguments to pass to the function handle within a |BaseJob|. """ return self.__kwargs @property def max_attempts(self) -> int: """ Get the execution limit for a `Job`. Returns ------- int Max execution attempts. """ return self.__max_attempts @property def tags(self) -> set[str]: r""" Get the tags of a `Job`. Returns ------- set[str] The tags of a |BaseJob|. """ return self.__tags.copy() @property def delay(self) -> bool: """ *Deprecated*: Return ``True`` if the first `Job` execution will wait for the next scheduled time. Returns ------- bool If ``True`` wait with the execution for the next scheduled time. If ``False`` the first execution will target the time of `Job.start`. """ warnings.warn( ( "Using the `delay` property is deprecated and will " "be removed in the next minor release." ), DeprecationWarning, stacklevel=2, ) return self.__delay @property def start(self) -> Optional[dt.datetime]: """ Get the timestamp at which the `JobTimer` starts. Returns ------- Optional[datetime.datetime] The start datetime stamp. """ return self.__start @property def stop(self) -> Optional[dt.datetime]: """ Get the timestamp after which no more executions of the `Job` should be scheduled. Returns ------- Optional[datetime.datetime] The stop datetime stamp. """ return self.__stop @property def skip_missing(self) -> bool: """ Return ``True`` if `Job` will only schedule it's newest planned execution. Returns ------- bool If ``True`` a |BaseJob| will only schedule it's newest planned execution and drop older ones. """ return self.__skip_missing @property def alias(self) -> Optional[str]: r""" Get the alias of the `Job`. Returns ------- Optional[str] Alias of the |BaseJob|. """ return self.__alias @property def tzinfo(self) -> Optional[dt.tzinfo]: r""" Get the timezone of the `Job`'s next execution. Returns ------- Optional[datetime.tzinfo] Timezone of the |BaseJob|\ s next execution. """ return self.datetime.tzinfo @property def _tzinfo(self) -> Optional[dt.tzinfo]: """ Get the timezone of the `Scheduler` in which the `Job` is living. Returns ------- Optional[datetime.tzinfo] Timezone of the |BaseJob|. """ return self.__tzinfo @property def attempts(self) -> int: """ Get the number of executions for a `Job`. Returns ------- int Execution attempts. """ return self.__attempts @property def failed_attempts(self) -> int: """ Get the number of failed executions for a `Job`. Returns ------- int Failed execution attempts. """ return self.__failed_attempts @property def has_attempts_remaining(self) -> bool: """ Check if a `Job` has remaining attempts. This function will return True if the |BaseJob| has open execution counts and the stop argument is not in the past relative to the next planed execution. Returns ------- bool True if the |BaseJob| has execution attempts. """ if self.__mark_delete: return False if self.__max_attempts == 0: return True return self.__attempts < self.__max_attempts
BaseJobType = TypeVar("BaseJobType", bound=BaseJob)