Coverage for scheduler/threading/scheduler.py: 100%
168 statements
« prev ^ index » next coverage.py v7.0.4, created at 2024-06-09 19:18 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2024-06-09 19:18 +0000
1"""
2Implementation of a `threading` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8import queue
9import threading
10from collections.abc import Iterable
11from logging import Logger
12from typing import Any, Callable, Optional
14import typeguard as tg
16from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
17from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag
18from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
19from scheduler.base.timingtype import (
20 TimingCyclic,
21 TimingDailyUnion,
22 TimingOnceUnion,
23 TimingWeeklyUnion,
24)
25from scheduler.error import SchedulerError
26from scheduler.message import (
27 CYCLIC_TYPE_ERROR_MSG,
28 DAILY_TYPE_ERROR_MSG,
29 HOURLY_TYPE_ERROR_MSG,
30 MINUTELY_TYPE_ERROR_MSG,
31 ONCE_TYPE_ERROR_MSG,
32 TZ_ERROR_MSG,
33 WEEKLY_TYPE_ERROR_MSG,
34)
35from scheduler.prioritization import linear_priority_function
36from scheduler.threading.job import Job
39def _exec_job_worker(que: queue.Queue[Job], logger: Logger) -> None:
40 running = True
41 while running:
42 try:
43 job = que.get(block=False)
44 except queue.Empty:
45 running = False
46 else:
47 job._exec(logger=logger) # pylint: disable=protected-access
48 que.task_done()
51class Scheduler(BaseScheduler[Job, Callable[..., None]]):
52 r"""
53 Implementation of a scheduler for callback functions.
55 This implementation enables the planning of |Job|\ s depending on time
56 cycles, fixed times, weekdays, dates, offsets, execution counts and weights.
58 Notes
59 -----
60 Due to the support of `datetime` objects, `scheduler` is able to work
61 with timezones.
63 Parameters
64 ----------
65 tzinfo : datetime.tzinfo
66 Set the timezone of the |Scheduler|.
67 max_exec : int
68 Limits the number of overdue |Job|\ s that can be executed
69 by calling function `Scheduler.exec_jobs()`.
70 priority_function : Callable[[float, Job, int, int], float]
71 A function handle to compute the priority of a |Job| depending
72 on the time it is overdue and its respective weight. Defaults to a linear
73 priority function.
74 jobs : set[Job]
75 A collection of job instances.
76 n_threads : int
77 The number of worker threads. 0 for unlimited, default 1.
78 logger : Optional[logging.Logger]
79 A custom Logger instance.
80 """
82 def __init__(
83 self,
84 *,
85 max_exec: int = 0,
86 tzinfo: Optional[dt.tzinfo] = None,
87 priority_function: Callable[
88 [float, Job, int, int],
89 float,
90 ] = linear_priority_function,
91 jobs: Optional[Iterable[Job]] = None,
92 n_threads: int = 1,
93 logger: Optional[Logger] = None,
94 ):
95 super().__init__(logger=logger)
96 self.__max_exec = max_exec
97 self.__tzinfo = tzinfo
98 self.__priority_function = priority_function
99 self.__jobs_lock = threading.RLock()
100 if not jobs:
101 self.__jobs = set()
102 elif isinstance(jobs, set):
103 self.__jobs = jobs
104 else:
105 self.__jobs = set(jobs)
107 for job in self.__jobs:
108 if job._tzinfo != self.__tzinfo:
109 raise SchedulerError(TZ_ERROR_MSG)
111 self.__n_threads = n_threads
112 self.__tz_str = check_tzname(tzinfo=tzinfo)
114 def __repr__(self) -> str:
115 with self.__jobs_lock:
116 return "scheduler.Scheduler({0}, jobs={{{1}}})".format(
117 ", ".join(
118 (
119 repr(elem)
120 for elem in (
121 self.__max_exec,
122 self.__tzinfo,
123 self.__priority_function,
124 )
125 )
126 ),
127 ", ".join([repr(job) for job in sorted(self.jobs)]),
128 )
130 def __str__(self) -> str:
131 with self.__jobs_lock:
132 # Scheduler meta heading
133 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings())
135 # Job table (we join two of the Job._repr() fields into one)
136 # columns
137 c_align = ("<", "<", "<", "<", ">", ">", ">")
138 c_width = (8, 16, 19, 12, 9, 13, 6)
139 c_name = (
140 "type",
141 "function / alias",
142 "due at",
143 "tzinfo",
144 "due in",
145 "attempts",
146 "weight",
147 )
148 form = [
149 f"{ {idx}:{align}{width}} "
150 for idx, (align, width) in enumerate(zip(c_align, c_width))
151 ]
152 if self.__tz_str is None:
153 form = form[:3] + form[4:]
155 fstring = " ".join(form) + "\n"
156 job_table = fstring.format(*c_name) + fstring.format(
157 *("-" * width for width in c_width)
158 )
159 for job in sorted(self.jobs):
160 row = job._str()
161 entries = (
162 row[0],
163 str_cutoff(row[1] + row[2], c_width[1], False),
164 row[3],
165 str_cutoff(row[4] or "", c_width[3], False),
166 str_cutoff(row[5], c_width[4], True),
167 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
168 str_cutoff(f"{job.weight}", c_width[6], True),
169 )
170 job_table += fstring.format(*entries)
172 return scheduler_headings + job_table
174 def __headings(self) -> list[str]:
175 with self.__jobs_lock:
176 headings = [
177 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}",
178 f"tzinfo={self.__tz_str}",
179 f"priority_function={self.__priority_function.__name__}",
180 f"#jobs={len(self.__jobs)}",
181 ]
182 return headings
184 def __schedule(
185 self,
186 **kwargs,
187 ) -> Job:
188 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
189 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
190 if job.has_attempts_remaining:
191 with self.__jobs_lock:
192 self.__jobs.add(job)
193 return job
195 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int:
196 n_jobs = len(jobs)
198 que: queue.Queue[Job] = queue.Queue()
199 for job in jobs:
200 que.put(job)
202 workers = []
203 for _ in range(self.__n_threads or n_jobs):
204 worker = threading.Thread(target=_exec_job_worker, args=(que, self._logger))
205 worker.daemon = True
206 worker.start()
207 workers.append(worker)
209 que.join()
210 for worker in workers:
211 worker.join()
213 for job in jobs:
214 job._calc_next_exec(ref_dt) # pylint: disable=protected-access
215 if not job.has_attempts_remaining:
216 self.delete_job(job)
218 return n_jobs
220 def exec_jobs(self, force_exec_all: bool = False) -> int:
221 r"""
222 Execute scheduled `Job`\ s.
224 By default executes the |Job|\ s that are overdue.
226 |Job|\ s are executed in order of their priority
227 :ref:`examples.weights`. If the |Scheduler| instance
228 has a limit on the job execution counts per call of
229 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument,
230 |Job|\ s of lower priority might not get executed when
231 competing |Job|\ s are overdue.
233 Parameters
234 ----------
235 force_exec_all : bool
236 Ignore the both - the status of the |Job| timers
237 as well as the execution limit of the |Scheduler|
239 Returns
240 -------
241 int
242 Number of executed |Job|\ s.
243 """
244 ref_dt = dt.datetime.now(tz=self.__tzinfo)
246 if force_exec_all:
247 return self.__exec_jobs(list(self.__jobs), ref_dt)
248 # collect the current priority for all jobs
250 job_priority: dict[Job, float] = {}
251 n_jobs = len(self.__jobs)
252 with self.__jobs_lock:
253 for job in self.__jobs:
254 delta_seconds = job.timedelta(ref_dt).total_seconds()
255 job_priority[job] = self.__priority_function(
256 -delta_seconds,
257 job,
258 self.__max_exec,
259 n_jobs,
260 )
261 # sort the jobs by priority
262 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore
263 # filter jobs by max_exec and priority greater zero
264 filtered_jobs = [
265 job
266 for idx, job in enumerate(sorted_jobs)
267 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0
268 ]
269 return self.__exec_jobs(filtered_jobs, ref_dt)
271 def delete_job(self, job: Job) -> None:
272 """
273 Delete a `Job` from the `Scheduler`.
275 Parameters
276 ----------
277 job : Job
278 |Job| instance to delete.
280 Raises
281 ------
282 SchedulerError
283 Raises if the |Job| of the argument is not scheduled.
284 """
285 try:
286 with self.__jobs_lock:
287 self.__jobs.remove(job)
288 except KeyError:
289 raise SchedulerError("An unscheduled Job can not be deleted!") from None
291 def delete_jobs(
292 self,
293 tags: Optional[set[str]] = None,
294 any_tag: bool = False,
295 ) -> int:
296 r"""
297 Delete a set of |Job|\ s from the |Scheduler| by tags.
299 If no tags or an empty set of tags are given defaults to the deletion
300 of all |Job|\ s.
302 Parameters
303 ----------
304 tags : Optional[set[str]]
305 Set of tags to identify target |Job|\ s.
306 any_tag : bool
307 False: To delete a |Job| all tags have to match.
308 True: To deleta a |Job| at least one tag has to match.
309 """
310 with self.__jobs_lock:
311 if tags is None or tags == set():
312 n_jobs = len(self.__jobs)
313 self.__jobs = set()
314 return n_jobs
316 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag)
318 self.__jobs = self.__jobs - to_delete
319 return len(to_delete)
321 def get_jobs(
322 self,
323 tags: Optional[set[str]] = None,
324 any_tag: bool = False,
325 ) -> set[Job]:
326 r"""
327 Get a set of |Job|\ s from the |Scheduler| by tags.
329 If no tags or an empty set of tags are given defaults to returning
330 all |Job|\ s.
332 Parameters
333 ----------
334 tags : set[str]
335 Tags to filter scheduled |Job|\ s.
336 If no tags are given all |Job|\ s are returned.
337 any_tag : bool
338 False: To match a |Job| all tags have to match.
339 True: To match a |Job| at least one tag has to match.
341 Returns
342 -------
343 set[Job]
344 Currently scheduled |Job|\ s.
345 """
346 with self.__jobs_lock:
347 if tags is None or tags == set():
348 return self.__jobs.copy()
349 return select_jobs_by_tag(self.__jobs, tags, any_tag)
351 @deprecated(["delay"])
352 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job:
353 r"""
354 Schedule a cyclic `Job`.
356 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
357 to schedule a cyclic |Job|.
359 Parameters
360 ----------
361 timing : TimingTypeCyclic
362 Desired execution time.
363 handle : Callable[..., None]
364 Handle to a callback function.
366 Returns
367 -------
368 Job
369 Instance of a scheduled |Job|.
371 Other Parameters
372 ----------------
373 **kwargs
374 |Job| properties, optional
376 `kwargs` are used to specify |Job| properties.
378 Here is a list of available |Job| properties:
380 .. include:: ../_assets/kwargs.rst
381 """
382 try:
383 tg.check_type(timing, TimingCyclic)
384 except tg.TypeCheckError as err:
385 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
386 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
388 @deprecated(["delay"])
389 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
390 r"""
391 Schedule a minutely `Job`.
393 Use a `datetime.time` object or a `list` of `datetime.time` objects
394 to schedule a |Job| every minute.
396 Notes
397 -----
398 If given a `datetime.time` object with a non zero hour or minute property, these
399 information will be ignored.
401 Parameters
402 ----------
403 timing : TimingDailyUnion
404 Desired execution time(s).
405 handle : Callable[..., None]
406 Handle to a callback function.
408 Returns
409 -------
410 Job
411 Instance of a scheduled |Job|.
413 Other Parameters
414 ----------------
415 **kwargs
416 |Job| properties, optional
418 `kwargs` are used to specify |Job| properties.
420 Here is a list of available |Job| properties:
422 .. include:: ../_assets/kwargs.rst
423 """
424 try:
425 tg.check_type(timing, TimingDailyUnion)
426 except tg.TypeCheckError as err:
427 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
428 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
430 @deprecated(["delay"])
431 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
432 r"""
433 Schedule an hourly `Job`.
435 Use a `datetime.time` object or a `list` of `datetime.time` objects
436 to schedule a |Job| every hour.
438 Notes
439 -----
440 If given a `datetime.time` object with a non zero hour property, this information
441 will be ignored.
443 Parameters
444 ----------
445 timing : TimingDailyUnion
446 Desired execution time(s).
447 handle : Callable[..., None]
448 Handle to a callback function.
450 Returns
451 -------
452 Job
453 Instance of a scheduled |Job|.
455 Other Parameters
456 ----------------
457 **kwargs
458 |Job| properties, optional
460 `kwargs` are used to specify |Job| properties.
462 Here is a list of available |Job| properties:
464 .. include:: ../_assets/kwargs.rst
465 """
466 try:
467 tg.check_type(timing, TimingDailyUnion)
468 except tg.TypeCheckError as err:
469 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
470 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
472 @deprecated(["delay"])
473 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
474 r"""
475 Schedule a daily `Job`.
477 Use a `datetime.time` object or a `list` of `datetime.time` objects
478 to schedule a |Job| every day.
480 Parameters
481 ----------
482 timing : TimingDailyUnion
483 Desired execution time(s).
484 handle : Callable[..., None]
485 Handle to a callback function.
487 Returns
488 -------
489 Job
490 Instance of a scheduled |Job|.
492 Other Parameters
493 ----------------
494 **kwargs
495 |Job| properties, optional
497 `kwargs` are used to specify |Job| properties.
499 Here is a list of available |Job| properties:
501 .. include:: ../_assets/kwargs.rst
502 """
503 try:
504 tg.check_type(timing, TimingDailyUnion)
505 except tg.TypeCheckError as err:
506 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
507 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
509 @deprecated(["delay"])
510 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job:
511 r"""
512 Schedule a weekly `Job`.
514 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
515 recurring |Job|. Combine multiple desired `tuples` in
516 a `list`. If the planed execution time is `00:00` the `datetime.time` object
517 can be ignored, just pass a `Weekday` without a `tuple`.
519 Parameters
520 ----------
521 timing : TimingWeeklyUnion
522 Desired execution time(s).
523 handle : Callable[..., None]
524 Handle to a callback function.
526 Returns
527 -------
528 Job
529 Instance of a scheduled |Job|.
531 Other Parameters
532 ----------------
533 **kwargs
534 |Job| properties, optional
536 `kwargs` are used to specify |Job| properties.
538 Here is a list of available |Job| properties:
540 .. include:: ../_assets/kwargs.rst
541 """
542 try:
543 tg.check_type(timing, TimingWeeklyUnion)
544 except tg.TypeCheckError as err:
545 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
546 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
548 def once( # pylint: disable=arguments-differ
549 self,
550 timing: TimingOnceUnion,
551 handle: Callable[..., None],
552 *,
553 args: Optional[tuple[Any]] = None,
554 kwargs: Optional[dict[str, Any]] = None,
555 tags: Optional[Iterable[str]] = None,
556 alias: Optional[str] = None,
557 weight: float = 1,
558 ) -> Job:
559 r"""
560 Schedule a oneshot `Job`.
562 Parameters
563 ----------
564 timing : TimingOnceUnion
565 Desired execution time.
566 handle : Callable[..., None]
567 Handle to a callback function.
568 args : tuple[Any]
569 Positional argument payload for the function handle within a |Job|.
570 kwargs : Optional[dict[str, Any]]
571 Keyword arguments payload for the function handle within a |Job|.
572 tags : Optional[Iterable[str]]
573 The tags of the |Job|.
574 alias : Optional[str]
575 Overwrites the function handle name in the string representation.
576 weight : float
577 Relative weight against other |Job|\ s.
579 Returns
580 -------
581 Job
582 Instance of a scheduled |Job|.
583 """
584 try:
585 tg.check_type(timing, TimingOnceUnion)
586 except tg.TypeCheckError as err:
587 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
588 if isinstance(timing, dt.datetime):
589 return self.__schedule(
590 job_type=JobType.CYCLIC,
591 timing=dt.timedelta(),
592 handle=handle,
593 args=args,
594 kwargs=kwargs,
595 max_attempts=1,
596 tags=set(tags) if tags else set(),
597 alias=alias,
598 weight=weight,
599 delay=False,
600 start=timing,
601 )
602 return self.__schedule(
603 job_type=JOB_TYPE_MAPPING[type(timing)],
604 timing=timing,
605 handle=handle,
606 args=args,
607 kwargs=kwargs,
608 max_attempts=1,
609 tags=tags,
610 alias=alias,
611 weight=weight,
612 )
614 @property
615 def jobs(self) -> set[Job]:
616 r"""
617 Get the set of all `Job`\ s.
619 Returns
620 -------
621 set[Job]
622 Currently scheduled |Job|\ s.
623 """
624 return self.__jobs.copy()