Source code for scheduler.threading.job

"""
Implementation of job for the `threading` scheduler.

Author: Jendrik A. Potyka, Fabian A. Preiss
"""

import datetime as dt
import threading
from logging import Logger
from typing import Any, Callable, Optional

from scheduler.base.definition import JobType
from scheduler.base.job import BaseJob
from scheduler.base.timingtype import TimingJobUnion


[docs]class Job(BaseJob): r""" |Job| class bundling time and callback function methods. Parameters ---------- job_type : JobType Indicator which defines which calculations has to be used. timing : TimingWeekly Desired execution time(s). handle : Callable[..., None] Handle to a callback function. args : tuple[Any] Positional argument payload for the function handle within a |Job|. kwargs : Optional[dict[str, Any]] Keyword arguments payload for the function handle within a |Job|. max_attempts : Optional[int] Number of times the |Job| will be executed where ``0 <=> inf``. A |Job| with no free attempt will be deleted. tags : Optional[set[str]] The tags of the |Job|. delay : Optional[bool] *Deprecated*: If ``True`` wait with the execution for the next scheduled time. start : Optional[datetime.datetime] Set the reference `datetime.datetime` stamp the |Job| will be scheduled against. Default value is `datetime.datetime.now()`. stop : Optional[datetime.datetime] Define a point in time after which a |Job| will be stopped and deleted. skip_missing : Optional[bool] If ``True`` a |Job| will only schedule it's newest planned execution and drop older ones. alias : Optional[str] Overwrites the function handle name in the string representation. tzinfo : Optional[datetime.tzinfo] Set the timezone of the |Scheduler| the |Job| is scheduled in. weight : Optional[float] Relative `weight` against other |Job|\ s. Returns ------- Job Instance of a scheduled |Job|. """ __weight: float __lock: threading.RLock def __init__( self, job_type: JobType, timing: TimingJobUnion, handle: Callable[..., None], *, args: Optional[tuple[Any]] = None, kwargs: Optional[dict[str, Any]] = None, max_attempts: int = 0, tags: Optional[set[str]] = None, delay: bool = True, start: Optional[dt.datetime] = None, stop: Optional[dt.datetime] = None, skip_missing: bool = False, alias: str = None, tzinfo: Optional[dt.tzinfo] = None, weight: float = 1, ): super().__init__( job_type, timing, handle, args=args, kwargs=kwargs, max_attempts=max_attempts, tags=tags, delay=delay, start=start, stop=stop, skip_missing=skip_missing, alias=alias, tzinfo=tzinfo, ) self.__lock = threading.RLock() self.__weight = weight # pylint: disable=no-member invalid-name def _exec(self, logger: Logger) -> None: """Execute the callback function.""" with self.__lock: try: self._BaseJob__handle(*self._BaseJob__args, **self._BaseJob__kwargs) # type: ignore except Exception as err: logger.error("Unhandled exception `%s` in `%r`!", err, self) self._BaseJob__failed_attempts += 1 # type: ignore self._BaseJob__attempts += 1 # type: ignore # pylint: enable=no-member invalid-name def _calc_next_exec(self, ref_dt: dt.datetime) -> None: with self.__lock: super()._calc_next_exec(ref_dt) def __repr__(self) -> str: with self.__lock: params: tuple[str, ...] = self._repr() params_sum: str = ", ".join(params[:6] + (repr(self.__weight),) + params[6:]) return f"scheduler.Job({params_sum})" def __str__(self) -> str: return f"{super().__str__()}, w={self.weight:.3g}"
[docs] def timedelta(self, dt_stamp: Optional[dt.datetime] = None) -> dt.timedelta: with self.__lock: return super().timedelta(dt_stamp)
@property def datetime(self) -> dt.datetime: with self.__lock: return super().datetime @property def weight(self) -> float: """ Return the weight of the `Job` instance. Returns ------- float |Job| `weight`. """ return self.__weight @property def has_attempts_remaining(self) -> bool: with self.__lock: return super().has_attempts_remaining