Coverage for scheduler/threading/scheduler.py: 100%
164 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:34 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:34 +0000
1"""
2Implementation of a `threading` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8import queue
9import threading
10from logging import Logger
11from typing import Any, Callable, Optional
13import typeguard as tg
15from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
16from scheduler.base.job import BaseJob
17from scheduler.base.scheduler import (
18 BaseScheduler,
19 deprecated,
20 select_jobs_by_tag,
21)
22from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
23from scheduler.base.timingtype import (
24 TimingCyclic,
25 TimingDailyUnion,
26 TimingOnceUnion,
27 TimingWeeklyUnion,
28)
29from scheduler.error import SchedulerError
30from scheduler.message import (
31 CYCLIC_TYPE_ERROR_MSG,
32 DAILY_TYPE_ERROR_MSG,
33 HOURLY_TYPE_ERROR_MSG,
34 MINUTELY_TYPE_ERROR_MSG,
35 ONCE_TYPE_ERROR_MSG,
36 TZ_ERROR_MSG,
37 WEEKLY_TYPE_ERROR_MSG,
38)
39from scheduler.prioritization import linear_priority_function
40from scheduler.threading.job import Job
43def _exec_job_worker(que: queue.Queue[Job], logger: Logger):
44 running = True
45 while running:
46 try:
47 job = que.get(block=False)
48 except queue.Empty:
49 running = False
50 else:
51 job._exec(logger=logger) # pylint: disable=protected-access
52 que.task_done()
55class Scheduler(BaseScheduler):
56 r"""
57 Implementation of a scheduler for callback functions.
59 This implementation enables the planning of |Job|\ s depending on time
60 cycles, fixed times, weekdays, dates, offsets, execution counts and weights.
62 Notes
63 -----
64 Due to the support of `datetime` objects, `scheduler` is able to work
65 with timezones.
67 Parameters
68 ----------
69 tzinfo : datetime.tzinfo
70 Set the timezone of the |Scheduler|.
71 max_exec : int
72 Limits the number of overdue |Job|\ s that can be executed
73 by calling function `Scheduler.exec_jobs()`.
74 priority_function : Callable[[float, Job, int, int], float]
75 A function handle to compute the priority of a |Job| depending
76 on the time it is overdue and its respective weight. Defaults to a linear
77 priority function.
78 jobs : set[Job]
79 A collection of job instances.
80 n_threads : int
81 The number of worker threads. 0 for unlimited, default 1.
82 logger : Optional[logging.Logger]
83 A custom Logger instance.
84 """
86 def __init__(
87 self,
88 *,
89 max_exec: int = 0,
90 tzinfo: Optional[dt.tzinfo] = None,
91 priority_function: Callable[
92 [float, BaseJob, int, int],
93 float,
94 ] = linear_priority_function,
95 jobs: Optional[set[Job]] = None,
96 n_threads: int = 1,
97 logger: Optional[Logger] = None,
98 ):
99 super().__init__(logger=logger)
100 self.__max_exec = max_exec
101 self.__tzinfo = tzinfo
102 self.__priority_function = priority_function
103 self.__jobs_lock = threading.RLock()
104 self.__jobs: set[Job] = jobs or set()
105 for job in self.__jobs:
106 if job._tzinfo != self.__tzinfo:
107 raise SchedulerError(TZ_ERROR_MSG)
109 self.__n_threads = n_threads
110 self.__tz_str = check_tzname(tzinfo=tzinfo)
112 def __repr__(self) -> str:
113 with self.__jobs_lock:
114 return "scheduler.Scheduler({0}, jobs={{{1}}})".format(
115 ", ".join(
116 (
117 repr(elem)
118 for elem in (
119 self.__max_exec,
120 self.__tzinfo,
121 self.__priority_function,
122 )
123 )
124 ),
125 ", ".join([repr(job) for job in sorted(self.jobs)]),
126 )
128 def __str__(self) -> str:
129 with self.__jobs_lock:
130 # Scheduler meta heading
131 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings())
133 # Job table (we join two of the Job._repr() fields into one)
134 # columns
135 c_align = ("<", "<", "<", "<", ">", ">", ">")
136 c_width = (8, 16, 19, 12, 9, 13, 6)
137 c_name = (
138 "type",
139 "function / alias",
140 "due at",
141 "tzinfo",
142 "due in",
143 "attempts",
144 "weight",
145 )
146 form = [
147 f"{ {idx}:{align}{width}} "
148 for idx, (align, width) in enumerate(zip(c_align, c_width))
149 ]
150 if self.__tz_str is None:
151 form = form[:3] + form[4:]
153 fstring = " ".join(form) + "\n"
154 job_table = fstring.format(*c_name) + fstring.format(
155 *("-" * width for width in c_width)
156 )
157 for job in sorted(self.jobs):
158 row = job._str()
159 entries = (
160 row[0],
161 str_cutoff(row[1] + row[2], c_width[1], False),
162 row[3],
163 str_cutoff(row[4] or "", c_width[3], False),
164 str_cutoff(row[5], c_width[4], True),
165 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
166 str_cutoff(f"{job.weight}", c_width[6], True),
167 )
168 job_table += fstring.format(*entries)
170 return scheduler_headings + job_table
172 def __headings(self) -> list[str]:
173 with self.__jobs_lock:
174 headings = [
175 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}",
176 f"tzinfo={self.__tz_str}",
177 f"priority_function={self.__priority_function.__name__}",
178 f"#jobs={len(self.__jobs)}",
179 ]
180 return headings
182 def __schedule(
183 self,
184 **kwargs,
185 ) -> Job:
186 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
187 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
188 if job.has_attempts_remaining:
189 with self.__jobs_lock:
190 self.__jobs.add(job)
191 return job
193 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int:
194 n_jobs = len(jobs)
196 que: queue.Queue[Job] = queue.Queue()
197 for job in jobs:
198 que.put(job)
200 workers = []
201 for _ in range(self.__n_threads or n_jobs):
202 worker = threading.Thread(
203 target=_exec_job_worker, args=(que, self._BaseScheduler__logger)
204 )
205 worker.daemon = True
206 worker.start()
207 workers.append(worker)
209 que.join()
210 for worker in workers:
211 worker.join()
213 for job in jobs:
214 job._calc_next_exec(ref_dt) # pylint: disable=protected-access
215 if not job.has_attempts_remaining:
216 self.delete_job(job)
218 return n_jobs
220 def exec_jobs(self, force_exec_all: bool = False) -> int:
221 r"""
222 Execute scheduled `Job`\ s.
224 By default executes the |Job|\ s that are overdue.
226 |Job|\ s are executed in order of their priority
227 :ref:`examples.weights`. If the |Scheduler| instance
228 has a limit on the job execution counts per call of
229 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument,
230 |Job|\ s of lower priority might not get executed when
231 competing |Job|\ s are overdue.
233 Parameters
234 ----------
235 force_exec_all : bool
236 Ignore the both - the status of the |Job| timers
237 as well as the execution limit of the |Scheduler|
239 Returns
240 -------
241 int
242 Number of executed |Job|\ s.
243 """
244 ref_dt = dt.datetime.now(tz=self.__tzinfo)
246 if force_exec_all:
247 return self.__exec_jobs(list(self.__jobs), ref_dt)
248 # collect the current priority for all jobs
250 job_priority: dict[Job, float] = {}
251 n_jobs = len(self.__jobs)
252 with self.__jobs_lock:
253 for job in self.__jobs:
254 delta_seconds = job.timedelta(ref_dt).total_seconds()
255 job_priority[job] = self.__priority_function(
256 -delta_seconds,
257 job,
258 self.__max_exec,
259 n_jobs,
260 )
261 # sort the jobs by priority
262 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore
263 # filter jobs by max_exec and priority greater zero
264 filtered_jobs = [
265 job
266 for idx, job in enumerate(sorted_jobs)
267 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0
268 ]
269 return self.__exec_jobs(filtered_jobs, ref_dt)
271 def delete_job(self, job: Job) -> None:
272 """
273 Delete a `Job` from the `Scheduler`.
275 Parameters
276 ----------
277 job : Job
278 |Job| instance to delete.
280 Raises
281 ------
282 SchedulerError
283 Raises if the |Job| of the argument is not scheduled.
284 """
285 try:
286 with self.__jobs_lock:
287 self.__jobs.remove(job)
288 except KeyError:
289 raise SchedulerError("An unscheduled Job can not be deleted!") from None
291 def delete_jobs(
292 self,
293 tags: Optional[set[str]] = None,
294 any_tag: bool = False,
295 ) -> int:
296 r"""
297 Delete a set of |Job|\ s from the |Scheduler| by tags.
299 If no tags or an empty set of tags are given defaults to the deletion
300 of all |Job|\ s.
302 Parameters
303 ----------
304 tags : Optional[set[str]]
305 Set of tags to identify target |Job|\ s.
306 any_tag : bool
307 False: To delete a |Job| all tags have to match.
308 True: To deleta a |Job| at least one tag has to match.
309 """
310 with self.__jobs_lock:
311 if tags is None or tags == {}:
312 n_jobs = len(self.__jobs)
313 self.__jobs = set()
314 return n_jobs
316 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag)
318 self.__jobs = self.__jobs - to_delete
319 return len(to_delete)
321 def get_jobs(
322 self,
323 tags: Optional[set[str]] = None,
324 any_tag: bool = False,
325 ) -> set[Job]:
326 r"""
327 Get a set of |Job|\ s from the |Scheduler| by tags.
329 If no tags or an empty set of tags are given defaults to returning
330 all |Job|\ s.
332 Parameters
333 ----------
334 tags : set[str]
335 Tags to filter scheduled |Job|\ s.
336 If no tags are given all |Job|\ s are returned.
337 any_tag : bool
338 False: To match a |Job| all tags have to match.
339 True: To match a |Job| at least one tag has to match.
341 Returns
342 -------
343 set[Job]
344 Currently scheduled |Job|\ s.
345 """
346 with self.__jobs_lock:
347 if tags is None or tags == {}:
348 return self.__jobs.copy()
349 return select_jobs_by_tag(self.__jobs, tags, any_tag)
351 @deprecated(["delay"])
352 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs):
353 r"""
354 Schedule a cyclic `Job`.
356 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
357 to schedule a cyclic |Job|.
359 Parameters
360 ----------
361 timing : TimingTypeCyclic
362 Desired execution time.
363 handle : Callable[..., None]
364 Handle to a callback function.
366 Returns
367 -------
368 Job
369 Instance of a scheduled |Job|.
371 Other Parameters
372 ----------------
373 **kwargs
374 |Job| properties, optional
376 `kwargs` are used to specify |Job| properties.
378 Here is a list of available |Job| properties:
380 .. include:: ../_assets/kwargs.rst
381 """
382 try:
383 tg.check_type(timing, TimingCyclic)
384 except tg.TypeCheckError as err:
385 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
386 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
388 @deprecated(["delay"])
389 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
390 r"""
391 Schedule a minutely `Job`.
393 Use a `datetime.time` object or a `list` of `datetime.time` objects
394 to schedule a |Job| every minute.
396 Notes
397 -----
398 If given a `datetime.time` object with a non zero hour or minute property, these
399 information will be ignored.
401 Parameters
402 ----------
403 timing : TimingDailyUnion
404 Desired execution time(s).
405 handle : Callable[..., None]
406 Handle to a callback function.
408 Returns
409 -------
410 Job
411 Instance of a scheduled |Job|.
413 Other Parameters
414 ----------------
415 **kwargs
416 |Job| properties, optional
418 `kwargs` are used to specify |Job| properties.
420 Here is a list of available |Job| properties:
422 .. include:: ../_assets/kwargs.rst
423 """
424 try:
425 tg.check_type(timing, TimingDailyUnion)
426 except tg.TypeCheckError as err:
427 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
428 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
430 @deprecated(["delay"])
431 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
432 r"""
433 Schedule an hourly `Job`.
435 Use a `datetime.time` object or a `list` of `datetime.time` objects
436 to schedule a |Job| every hour.
438 Notes
439 -----
440 If given a `datetime.time` object with a non zero hour property, this information
441 will be ignored.
443 Parameters
444 ----------
445 timing : TimingDailyUnion
446 Desired execution time(s).
447 handle : Callable[..., None]
448 Handle to a callback function.
450 Returns
451 -------
452 Job
453 Instance of a scheduled |Job|.
455 Other Parameters
456 ----------------
457 **kwargs
458 |Job| properties, optional
460 `kwargs` are used to specify |Job| properties.
462 Here is a list of available |Job| properties:
464 .. include:: ../_assets/kwargs.rst
465 """
466 try:
467 tg.check_type(timing, TimingDailyUnion)
468 except tg.TypeCheckError as err:
469 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
470 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
472 @deprecated(["delay"])
473 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
474 r"""
475 Schedule a daily `Job`.
477 Use a `datetime.time` object or a `list` of `datetime.time` objects
478 to schedule a |Job| every day.
480 Parameters
481 ----------
482 timing : TimingDailyUnion
483 Desired execution time(s).
484 handle : Callable[..., None]
485 Handle to a callback function.
487 Returns
488 -------
489 Job
490 Instance of a scheduled |Job|.
492 Other Parameters
493 ----------------
494 **kwargs
495 |Job| properties, optional
497 `kwargs` are used to specify |Job| properties.
499 Here is a list of available |Job| properties:
501 .. include:: ../_assets/kwargs.rst
502 """
503 try:
504 tg.check_type(timing, TimingDailyUnion)
505 except tg.TypeCheckError as err:
506 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
507 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
509 @deprecated(["delay"])
510 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs):
511 r"""
512 Schedule a weekly `Job`.
514 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
515 recurring |Job|. Combine multiple desired `tuples` in
516 a `list`. If the planed execution time is `00:00` the `datetime.time` object
517 can be ignored, just pass a `Weekday` without a `tuple`.
519 Parameters
520 ----------
521 timing : TimingWeeklyUnion
522 Desired execution time(s).
523 handle : Callable[..., None]
524 Handle to a callback function.
526 Returns
527 -------
528 Job
529 Instance of a scheduled |Job|.
531 Other Parameters
532 ----------------
533 **kwargs
534 |Job| properties, optional
536 `kwargs` are used to specify |Job| properties.
538 Here is a list of available |Job| properties:
540 .. include:: ../_assets/kwargs.rst
541 """
542 try:
543 tg.check_type(timing, TimingWeeklyUnion)
544 except tg.TypeCheckError as err:
545 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
546 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
548 def once( # pylint: disable=arguments-differ
549 self,
550 timing: TimingOnceUnion,
551 handle: Callable[..., None],
552 *,
553 args: tuple[Any] = None,
554 kwargs: Optional[dict[str, Any]] = None,
555 tags: Optional[list[str]] = None,
556 alias: str = None,
557 weight: float = 1,
558 ):
559 r"""
560 Schedule a oneshot `Job`.
562 Parameters
563 ----------
564 timing : TimingOnceUnion
565 Desired execution time.
566 handle : Callable[..., None]
567 Handle to a callback function.
568 args : tuple[Any]
569 Positional argument payload for the function handle within a |Job|.
570 kwargs : Optional[dict[str, Any]]
571 Keyword arguments payload for the function handle within a |Job|.
572 tags : Optional[set[str]]
573 The tags of the |Job|.
574 alias : Optional[str]
575 Overwrites the function handle name in the string representation.
576 weight : float
577 Relative weight against other |Job|\ s.
579 Returns
580 -------
581 Job
582 Instance of a scheduled |Job|.
583 """
584 try:
585 tg.check_type(timing, TimingOnceUnion)
586 except tg.TypeCheckError as err:
587 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
588 if isinstance(timing, dt.datetime):
589 return self.__schedule(
590 job_type=JobType.CYCLIC,
591 timing=dt.timedelta(),
592 handle=handle,
593 args=args,
594 kwargs=kwargs,
595 max_attempts=1,
596 tags=tags,
597 alias=alias,
598 weight=weight,
599 delay=False,
600 start=timing,
601 )
602 return self.__schedule(
603 job_type=JOB_TYPE_MAPPING[type(timing)],
604 timing=timing,
605 handle=handle,
606 args=args,
607 kwargs=kwargs,
608 max_attempts=1,
609 tags=tags,
610 alias=alias,
611 weight=weight,
612 )
614 @property
615 def jobs(self) -> set[Job]:
616 r"""
617 Get the set of all `Job`\ s.
619 Returns
620 -------
621 set[Job]
622 Currently scheduled |Job|\ s.
623 """
624 return self.__jobs.copy()