Coverage for scheduler/threading/job.py: 100%
46 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:46 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:46 +0000
1"""
2Implementation of job for the `threading` scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8import threading
9from logging import Logger
10from typing import Any, Callable, Optional
12from scheduler.base.definition import JobType
13from scheduler.base.job import BaseJob
14from scheduler.base.timingtype import TimingJobUnion
17class Job(BaseJob):
18 r"""
19 |Job| class bundling time and callback function methods.
21 Parameters
22 ----------
23 job_type : JobType
24 Indicator which defines which calculations has to be used.
25 timing : TimingWeekly
26 Desired execution time(s).
27 handle : Callable[..., None]
28 Handle to a callback function.
29 args : tuple[Any]
30 Positional argument payload for the function handle within a |Job|.
31 kwargs : Optional[dict[str, Any]]
32 Keyword arguments payload for the function handle within a |Job|.
33 max_attempts : Optional[int]
34 Number of times the |Job| will be executed where ``0 <=> inf``.
35 A |Job| with no free attempt will be deleted.
36 tags : Optional[set[str]]
37 The tags of the |Job|.
38 delay : Optional[bool]
39 *Deprecated*: If ``True`` wait with the execution for the next scheduled time.
40 start : Optional[datetime.datetime]
41 Set the reference `datetime.datetime` stamp the |Job|
42 will be scheduled against. Default value is `datetime.datetime.now()`.
43 stop : Optional[datetime.datetime]
44 Define a point in time after which a |Job| will be stopped
45 and deleted.
46 skip_missing : Optional[bool]
47 If ``True`` a |Job| will only schedule it's newest planned
48 execution and drop older ones.
49 alias : Optional[str]
50 Overwrites the function handle name in the string representation.
51 tzinfo : Optional[datetime.tzinfo]
52 Set the timezone of the |Scheduler| the |Job|
53 is scheduled in.
54 weight : Optional[float]
55 Relative `weight` against other |Job|\ s.
57 Returns
58 -------
59 Job
60 Instance of a scheduled |Job|.
61 """
63 __weight: float
64 __lock: threading.RLock
66 def __init__(
67 self,
68 job_type: JobType,
69 timing: TimingJobUnion,
70 handle: Callable[..., None],
71 *,
72 args: Optional[tuple[Any]] = None,
73 kwargs: Optional[dict[str, Any]] = None,
74 max_attempts: int = 0,
75 tags: Optional[set[str]] = None,
76 delay: bool = True,
77 start: Optional[dt.datetime] = None,
78 stop: Optional[dt.datetime] = None,
79 skip_missing: bool = False,
80 alias: str = None,
81 tzinfo: Optional[dt.tzinfo] = None,
82 weight: float = 1,
83 ):
84 super().__init__(
85 job_type,
86 timing,
87 handle,
88 args=args,
89 kwargs=kwargs,
90 max_attempts=max_attempts,
91 tags=tags,
92 delay=delay,
93 start=start,
94 stop=stop,
95 skip_missing=skip_missing,
96 alias=alias,
97 tzinfo=tzinfo,
98 )
99 self.__lock = threading.RLock()
100 self.__weight = weight
102 # pylint: disable=no-member invalid-name
104 def _exec(self, logger: Logger) -> None:
105 """Execute the callback function."""
106 with self.__lock:
107 try:
108 self._BaseJob__handle(*self._BaseJob__args, **self._BaseJob__kwargs) # type: ignore
109 except Exception as err:
110 logger.exception("Unhandled exception `%s` in `%r`!", err, self)
111 self._BaseJob__failed_attempts += 1 # type: ignore
112 self._BaseJob__attempts += 1 # type: ignore
114 # pylint: enable=no-member invalid-name
116 def _calc_next_exec(self, ref_dt: dt.datetime) -> None:
117 with self.__lock:
118 super()._calc_next_exec(ref_dt)
120 def __repr__(self) -> str:
121 with self.__lock:
122 params: tuple[str, ...] = self._repr()
123 params_sum: str = ", ".join(params[:6] + (repr(self.__weight),) + params[6:])
124 return f"scheduler.Job({params_sum})"
126 def __str__(self) -> str:
127 return f"{super().__str__()}, w={self.weight:.3g}"
129 def timedelta(self, dt_stamp: Optional[dt.datetime] = None) -> dt.timedelta:
130 with self.__lock:
131 return super().timedelta(dt_stamp)
133 @property
134 def datetime(self) -> dt.datetime:
135 with self.__lock:
136 return super().datetime
138 @property
139 def weight(self) -> float:
140 """
141 Return the weight of the `Job` instance.
143 Returns
144 -------
145 float
146 |Job| `weight`.
147 """
148 return self.__weight
150 @property
151 def has_attempts_remaining(self) -> bool:
152 with self.__lock:
153 return super().has_attempts_remaining