Coverage for scheduler/threading/scheduler.py: 100%
164 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-03-19 22:04 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-03-19 22:04 +0000
1"""
2Implementation of a `threading` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8import queue
9import threading
10from logging import Logger
11from typing import Any, Callable, Optional
13import typeguard as tg
15from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
16from scheduler.base.job import BaseJob
17from scheduler.base.scheduler import (
18 BaseScheduler,
19 _warn_deprecated_delay,
20 select_jobs_by_tag,
21)
22from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
23from scheduler.base.timingtype import (
24 TimingCyclic,
25 TimingDailyUnion,
26 TimingOnceUnion,
27 TimingWeeklyUnion,
28)
29from scheduler.error import SchedulerError
30from scheduler.message import (
31 CYCLIC_TYPE_ERROR_MSG,
32 DAILY_TYPE_ERROR_MSG,
33 HOURLY_TYPE_ERROR_MSG,
34 MINUTELY_TYPE_ERROR_MSG,
35 ONCE_TYPE_ERROR_MSG,
36 TZ_ERROR_MSG,
37 WEEKLY_TYPE_ERROR_MSG,
38)
39from scheduler.prioritization import linear_priority_function
40from scheduler.threading.job import Job
43def _exec_job_worker(que: queue.Queue[Job], logger: Logger):
44 running = True
45 while running:
46 try:
47 job = que.get(block=False)
48 except queue.Empty:
49 running = False
50 else:
51 job._exec(logger=logger) # pylint: disable=protected-access
52 que.task_done()
55class Scheduler(BaseScheduler):
56 r"""
57 Implementation of a scheduler for callback functions.
59 This implementation enables the planning of |Job|\ s depending on time
60 cycles, fixed times, weekdays, dates, offsets, execution counts and weights.
62 Notes
63 -----
64 Due to the support of `datetime` objects, `scheduler` is able to work
65 with timezones.
67 Parameters
68 ----------
69 tzinfo : datetime.tzinfo
70 Set the timezone of the |Scheduler|.
71 max_exec : int
72 Limits the number of overdue |Job|\ s that can be executed
73 by calling function `Scheduler.exec_jobs()`.
74 priority_function : Callable[[float, Job, int, int], float]
75 A function handle to compute the priority of a |Job| depending
76 on the time it is overdue and its respective weight. Defaults to a linear
77 priority function.
78 jobs : set[Job]
79 A collection of job instances.
80 n_threads : int
81 The number of worker threads. 0 for unlimited, default 1.
82 logger : Optional[logging.Logger]
83 A custom Logger instance.
84 """
86 def __init__(
87 self,
88 *,
89 max_exec: int = 0,
90 tzinfo: Optional[dt.tzinfo] = None,
91 priority_function: Callable[
92 [float, BaseJob, int, int],
93 float,
94 ] = linear_priority_function,
95 jobs: Optional[set[Job]] = None,
96 n_threads: int = 1,
97 logger: Optional[Logger] = None,
98 ):
99 super().__init__(logger=logger)
100 self.__max_exec = max_exec
101 self.__tzinfo = tzinfo
102 self.__priority_function = priority_function
103 self.__jobs_lock = threading.RLock()
104 self.__jobs: set[Job] = jobs or set()
105 for job in self.__jobs:
106 if job._tzinfo != self.__tzinfo:
107 raise SchedulerError(TZ_ERROR_MSG)
109 self.__n_threads = n_threads
110 self.__tz_str = check_tzname(tzinfo=tzinfo)
112 def __repr__(self) -> str:
113 with self.__jobs_lock:
114 return "scheduler.Scheduler({0}, jobs={{{1}}})".format(
115 ", ".join(
116 (
117 repr(elem)
118 for elem in (
119 self.__max_exec,
120 self.__tzinfo,
121 self.__priority_function,
122 )
123 )
124 ),
125 ", ".join([repr(job) for job in sorted(self.jobs)]),
126 )
128 def __str__(self) -> str:
129 with self.__jobs_lock:
130 # Scheduler meta heading
131 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings())
133 # Job table (we join two of the Job._repr() fields into one)
134 # columns
135 c_align = ("<", "<", "<", "<", ">", ">", ">")
136 c_width = (8, 16, 19, 12, 9, 13, 6)
137 c_name = (
138 "type",
139 "function / alias",
140 "due at",
141 "tzinfo",
142 "due in",
143 "attempts",
144 "weight",
145 )
146 form = [
147 f"{{{idx}:{align}{width}}}"
148 for idx, (align, width) in enumerate(zip(c_align, c_width))
149 ]
150 if self.__tz_str is None:
151 form = form[:3] + form[4:]
153 fstring = " ".join(form) + "\n"
154 job_table = fstring.format(*c_name) + fstring.format(
155 *("-" * width for width in c_width)
156 )
157 for job in sorted(self.jobs):
158 row = job._str()
159 entries = (
160 row[0],
161 str_cutoff(row[1] + row[2], c_width[1], False),
162 row[3],
163 str_cutoff(row[4] or "", c_width[3], False),
164 str_cutoff(row[5], c_width[4], True),
165 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
166 str_cutoff(f"{job.weight}", c_width[6], True),
167 )
168 job_table += fstring.format(*entries)
170 return scheduler_headings + job_table
172 def __headings(self) -> list[str]:
173 with self.__jobs_lock:
174 headings = [
175 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}",
176 f"tzinfo={self.__tz_str}",
177 f"priority_function={self.__priority_function.__name__}",
178 f"#jobs={len(self.__jobs)}",
179 ]
180 return headings
182 def __schedule(
183 self,
184 **kwargs,
185 ) -> Job:
186 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
187 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
188 if job.has_attempts_remaining:
189 with self.__jobs_lock:
190 self.__jobs.add(job)
191 return job
193 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int:
194 n_jobs = len(jobs)
196 que: queue.Queue[Job] = queue.Queue()
197 for job in jobs:
198 que.put(job)
200 workers = []
201 for _ in range(self.__n_threads or n_jobs):
202 worker = threading.Thread(
203 target=_exec_job_worker, args=(que, self._BaseScheduler__logger)
204 )
205 worker.daemon = True
206 worker.start()
207 workers.append(worker)
209 que.join()
210 for worker in workers:
211 worker.join()
213 for job in jobs:
214 job._calc_next_exec(ref_dt) # pylint: disable=protected-access
215 if not job.has_attempts_remaining:
216 self.delete_job(job)
218 return n_jobs
220 def exec_jobs(self, force_exec_all: bool = False) -> int:
221 r"""
222 Execute scheduled `Job`\ s.
224 By default executes the |Job|\ s that are overdue.
226 |Job|\ s are executed in order of their priority
227 :ref:`examples.weights`. If the |Scheduler| instance
228 has a limit on the job execution counts per call of
229 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument,
230 |Job|\ s of lower priority might not get executed when
231 competing |Job|\ s are overdue.
233 Parameters
234 ----------
235 force_exec_all : bool
236 Ignore the both - the status of the |Job| timers
237 as well as the execution limit of the |Scheduler|
239 Returns
240 -------
241 int
242 Number of executed |Job|\ s.
243 """
244 ref_dt = dt.datetime.now(tz=self.__tzinfo)
246 if force_exec_all:
247 return self.__exec_jobs(list(self.__jobs), ref_dt)
248 # collect the current priority for all jobs
250 job_priority: dict[Job, float] = {}
251 n_jobs = len(self.__jobs)
252 with self.__jobs_lock:
253 for job in self.__jobs:
254 delta_seconds = job.timedelta(ref_dt).total_seconds()
255 job_priority[job] = self.__priority_function(
256 -delta_seconds,
257 job,
258 self.__max_exec,
259 n_jobs,
260 )
261 # sort the jobs by priority
262 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore
263 # filter jobs by max_exec and priority greater zero
264 filtered_jobs = [
265 job
266 for idx, job in enumerate(sorted_jobs)
267 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0
268 ]
269 return self.__exec_jobs(filtered_jobs, ref_dt)
271 def delete_job(self, job: Job) -> None:
272 """
273 Delete a `Job` from the `Scheduler`.
275 Parameters
276 ----------
277 job : Job
278 |Job| instance to delete.
280 Raises
281 ------
282 SchedulerError
283 Raises if the |Job| of the argument is not scheduled.
284 """
285 try:
286 with self.__jobs_lock:
287 self.__jobs.remove(job)
288 except KeyError:
289 raise SchedulerError("An unscheduled Job can not be deleted!") from None
291 def delete_jobs(
292 self,
293 tags: Optional[set[str]] = None,
294 any_tag: bool = False,
295 ) -> int:
296 r"""
297 Delete a set of |Job|\ s from the |Scheduler| by tags.
299 If no tags or an empty set of tags are given defaults to the deletion
300 of all |Job|\ s.
302 Parameters
303 ----------
304 tags : Optional[set[str]]
305 Set of tags to identify target |Job|\ s.
306 any_tag : bool
307 False: To delete a |Job| all tags have to match.
308 True: To deleta a |Job| at least one tag has to match.
309 """
310 with self.__jobs_lock:
311 if tags is None or tags == {}:
312 n_jobs = len(self.__jobs)
313 self.__jobs = set()
314 return n_jobs
316 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag)
318 self.__jobs = self.__jobs - to_delete
319 return len(to_delete)
321 def get_jobs(
322 self,
323 tags: Optional[set[str]] = None,
324 any_tag: bool = False,
325 ) -> set[Job]:
326 r"""
327 Get a set of |Job|\ s from the |Scheduler| by tags.
329 If no tags or an empty set of tags are given defaults to returning
330 all |Job|\ s.
332 Parameters
333 ----------
334 tags : set[str]
335 Tags to filter scheduled |Job|\ s.
336 If no tags are given all |Job|\ s are returned.
337 any_tag : bool
338 False: To match a |Job| all tags have to match.
339 True: To match a |Job| at least one tag has to match.
341 Returns
342 -------
343 set[Job]
344 Currently scheduled |Job|\ s.
345 """
346 with self.__jobs_lock:
347 if tags is None or tags == {}:
348 return self.__jobs.copy()
349 return select_jobs_by_tag(self.__jobs, tags, any_tag)
351 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs):
352 r"""
353 Schedule a cyclic `Job`.
355 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
356 to schedule a cyclic |Job|.
358 Parameters
359 ----------
360 timing : TimingTypeCyclic
361 Desired execution time.
362 handle : Callable[..., None]
363 Handle to a callback function.
365 Returns
366 -------
367 Job
368 Instance of a scheduled |Job|.
370 Other Parameters
371 ----------------
372 **kwargs
373 |Job| properties, optional
375 `kwargs` are used to specify |Job| properties.
377 Here is a list of available |Job| properties:
379 .. include:: ../_assets/kwargs.rst
380 """
381 _warn_deprecated_delay(**kwargs)
382 try:
383 tg.check_type(timing, TimingCyclic)
384 except tg.TypeCheckError as err:
385 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
386 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
388 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
389 r"""
390 Schedule a minutely `Job`.
392 Use a `datetime.time` object or a `list` of `datetime.time` objects
393 to schedule a |Job| every minute.
395 Notes
396 -----
397 If given a `datetime.time` object with a non zero hour or minute property, these
398 information will be ignored.
400 Parameters
401 ----------
402 timing : TimingDailyUnion
403 Desired execution time(s).
404 handle : Callable[..., None]
405 Handle to a callback function.
407 Returns
408 -------
409 Job
410 Instance of a scheduled |Job|.
412 Other Parameters
413 ----------------
414 **kwargs
415 |Job| properties, optional
417 `kwargs` are used to specify |Job| properties.
419 Here is a list of available |Job| properties:
421 .. include:: ../_assets/kwargs.rst
422 """
423 _warn_deprecated_delay(**kwargs)
424 try:
425 tg.check_type(timing, TimingDailyUnion)
426 except tg.TypeCheckError as err:
427 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
428 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
430 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
431 r"""
432 Schedule an hourly `Job`.
434 Use a `datetime.time` object or a `list` of `datetime.time` objects
435 to schedule a |Job| every hour.
437 Notes
438 -----
439 If given a `datetime.time` object with a non zero hour property, this information
440 will be ignored.
442 Parameters
443 ----------
444 timing : TimingDailyUnion
445 Desired execution time(s).
446 handle : Callable[..., None]
447 Handle to a callback function.
449 Returns
450 -------
451 Job
452 Instance of a scheduled |Job|.
454 Other Parameters
455 ----------------
456 **kwargs
457 |Job| properties, optional
459 `kwargs` are used to specify |Job| properties.
461 Here is a list of available |Job| properties:
463 .. include:: ../_assets/kwargs.rst
464 """
465 _warn_deprecated_delay(**kwargs)
466 try:
467 tg.check_type(timing, TimingDailyUnion)
468 except tg.TypeCheckError as err:
469 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
470 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
472 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
473 r"""
474 Schedule a daily `Job`.
476 Use a `datetime.time` object or a `list` of `datetime.time` objects
477 to schedule a |Job| every day.
479 Parameters
480 ----------
481 timing : TimingDailyUnion
482 Desired execution time(s).
483 handle : Callable[..., None]
484 Handle to a callback function.
486 Returns
487 -------
488 Job
489 Instance of a scheduled |Job|.
491 Other Parameters
492 ----------------
493 **kwargs
494 |Job| properties, optional
496 `kwargs` are used to specify |Job| properties.
498 Here is a list of available |Job| properties:
500 .. include:: ../_assets/kwargs.rst
501 """
502 _warn_deprecated_delay(**kwargs)
503 try:
504 tg.check_type(timing, TimingDailyUnion)
505 except tg.TypeCheckError as err:
506 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
507 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
509 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs):
510 r"""
511 Schedule a weekly `Job`.
513 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
514 recurring |Job|. Combine multiple desired `tuples` in
515 a `list`. If the planed execution time is `00:00` the `datetime.time` object
516 can be ignored, just pass a `Weekday` without a `tuple`.
518 Parameters
519 ----------
520 timing : TimingWeeklyUnion
521 Desired execution time(s).
522 handle : Callable[..., None]
523 Handle to a callback function.
525 Returns
526 -------
527 Job
528 Instance of a scheduled |Job|.
530 Other Parameters
531 ----------------
532 **kwargs
533 |Job| properties, optional
535 `kwargs` are used to specify |Job| properties.
537 Here is a list of available |Job| properties:
539 .. include:: ../_assets/kwargs.rst
540 """
541 _warn_deprecated_delay(**kwargs)
542 try:
543 tg.check_type(timing, TimingWeeklyUnion)
544 except tg.TypeCheckError as err:
545 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
546 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
548 def once( # pylint: disable=arguments-differ
549 self,
550 timing: TimingOnceUnion,
551 handle: Callable[..., None],
552 *,
553 args: tuple[Any] = None,
554 kwargs: Optional[dict[str, Any]] = None,
555 tags: Optional[list[str]] = None,
556 alias: str = None,
557 weight: float = 1,
558 ):
559 r"""
560 Schedule a oneshot `Job`.
562 Parameters
563 ----------
564 timing : TimingOnceUnion
565 Desired execution time.
566 handle : Callable[..., None]
567 Handle to a callback function.
568 args : tuple[Any]
569 Positional argument payload for the function handle within a |Job|.
570 kwargs : Optional[dict[str, Any]]
571 Keyword arguments payload for the function handle within a |Job|.
572 tags : Optional[set[str]]
573 The tags of the |Job|.
574 alias : Optional[str]
575 Overwrites the function handle name in the string representation.
576 weight : float
577 Relative weight against other |Job|\ s.
579 Returns
580 -------
581 Job
582 Instance of a scheduled |Job|.
583 """
584 try:
585 tg.check_type(timing, TimingOnceUnion)
586 except tg.TypeCheckError as err:
587 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
588 if isinstance(timing, dt.datetime):
589 return self.__schedule(
590 job_type=JobType.CYCLIC,
591 timing=dt.timedelta(),
592 handle=handle,
593 args=args,
594 kwargs=kwargs,
595 max_attempts=1,
596 tags=tags,
597 alias=alias,
598 weight=weight,
599 delay=False,
600 start=timing,
601 )
602 return self.__schedule(
603 job_type=JOB_TYPE_MAPPING[type(timing)],
604 timing=timing,
605 handle=handle,
606 args=args,
607 kwargs=kwargs,
608 max_attempts=1,
609 tags=tags,
610 alias=alias,
611 weight=weight,
612 )
614 @property
615 def jobs(self) -> set[Job]:
616 r"""
617 Get the set of all `Job`\ s.
619 Returns
620 -------
621 set[Job]
622 Currently scheduled |Job|\ s.
623 """
624 return self.__jobs.copy()