Coverage for scheduler/threading/scheduler.py: 100%
164 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-01-09 20:37 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-01-09 20:37 +0000
1"""
2Implementation of a `threading` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8import queue
9import threading
10from logging import Logger
11from typing import Any, Callable, Optional
13import typeguard as tg
15from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
16from scheduler.base.job import BaseJob
17from scheduler.base.scheduler import BaseScheduler, _warn_deprecated_delay, select_jobs_by_tag
18from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
19from scheduler.base.timingtype import (
20 TimingCyclic,
21 TimingDailyUnion,
22 TimingOnceUnion,
23 TimingWeeklyUnion,
24)
25from scheduler.error import SchedulerError
26from scheduler.message import (
27 CYCLIC_TYPE_ERROR_MSG,
28 DAILY_TYPE_ERROR_MSG,
29 HOURLY_TYPE_ERROR_MSG,
30 MINUTELY_TYPE_ERROR_MSG,
31 ONCE_TYPE_ERROR_MSG,
32 TZ_ERROR_MSG,
33 WEEKLY_TYPE_ERROR_MSG,
34)
35from scheduler.prioritization import linear_priority_function
36from scheduler.threading.job import Job
39def _exec_job_worker(que: queue.Queue[Job], logger: Logger):
40 running = True
41 while running:
42 try:
43 job = que.get(block=False)
44 except queue.Empty:
45 running = False
46 else:
47 job._exec(logger=logger) # pylint: disable=protected-access
48 que.task_done()
51class Scheduler(BaseScheduler):
52 r"""
53 Implementation of a scheduler for callback functions.
55 This implementation enables the planning of |Job|\ s depending on time
56 cycles, fixed times, weekdays, dates, offsets, execution counts and weights.
58 Notes
59 -----
60 Due to the support of `datetime` objects, `scheduler` is able to work
61 with timezones.
63 Parameters
64 ----------
65 tzinfo : datetime.tzinfo
66 Set the timezone of the |Scheduler|.
67 max_exec : int
68 Limits the number of overdue |Job|\ s that can be executed
69 by calling function `Scheduler.exec_jobs()`.
70 priority_function : Callable[[float, Job, int, int], float]
71 A function handle to compute the priority of a |Job| depending
72 on the time it is overdue and its respective weight. Defaults to a linear
73 priority function.
74 jobs : set[Job]
75 A collection of job instances.
76 n_threads : int
77 The number of worker threads. 0 for unlimited, default 1.
78 logger : Optional[logging.Logger]
79 A custom Logger instance.
80 """
82 def __init__(
83 self,
84 *,
85 max_exec: int = 0,
86 tzinfo: Optional[dt.tzinfo] = None,
87 priority_function: Callable[
88 [float, BaseJob, int, int],
89 float,
90 ] = linear_priority_function,
91 jobs: Optional[set[Job]] = None,
92 n_threads: int = 1,
93 logger: Optional[Logger] = None,
94 ):
95 super().__init__(logger=logger)
96 self.__max_exec = max_exec
97 self.__tzinfo = tzinfo
98 self.__priority_function = priority_function
99 self.__jobs_lock = threading.RLock()
100 self.__jobs: set[Job] = jobs or set()
101 for job in self.__jobs:
102 if job._tzinfo != self.__tzinfo:
103 raise SchedulerError(TZ_ERROR_MSG)
105 self.__n_threads = n_threads
106 self.__tz_str = check_tzname(tzinfo=tzinfo)
108 def __repr__(self) -> str:
109 with self.__jobs_lock:
110 return "scheduler.Scheduler({0}, jobs={{{1}}})".format(
111 ", ".join(
112 (
113 repr(elem)
114 for elem in (
115 self.__max_exec,
116 self.__tzinfo,
117 self.__priority_function,
118 )
119 )
120 ),
121 ", ".join([repr(job) for job in sorted(self.jobs)]),
122 )
124 def __str__(self) -> str:
125 with self.__jobs_lock:
126 # Scheduler meta heading
127 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings())
129 # Job table (we join two of the Job._repr() fields into one)
130 # columns
131 c_align = ("<", "<", "<", "<", ">", ">", ">")
132 c_width = (8, 16, 19, 12, 9, 13, 6)
133 c_name = (
134 "type",
135 "function / alias",
136 "due at",
137 "tzinfo",
138 "due in",
139 "attempts",
140 "weight",
141 )
142 form = [
143 f"{{{idx}:{align}{width}}}"
144 for idx, (align, width) in enumerate(zip(c_align, c_width))
145 ]
146 if self.__tz_str is None:
147 form = form[:3] + form[4:]
149 fstring = " ".join(form) + "\n"
150 job_table = fstring.format(*c_name) + fstring.format(
151 *("-" * width for width in c_width)
152 )
153 for job in sorted(self.jobs):
154 row = job._str()
155 entries = (
156 row[0],
157 str_cutoff(row[1] + row[2], c_width[1], False),
158 row[3],
159 str_cutoff(row[4] or "", c_width[3], False),
160 str_cutoff(row[5], c_width[4], True),
161 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
162 str_cutoff(f"{job.weight}", c_width[6], True),
163 )
164 job_table += fstring.format(*entries)
166 return scheduler_headings + job_table
168 def __headings(self) -> list[str]:
169 with self.__jobs_lock:
170 headings = [
171 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}",
172 f"tzinfo={self.__tz_str}",
173 f"priority_function={self.__priority_function.__name__}",
174 f"#jobs={len(self.__jobs)}",
175 ]
176 return headings
178 def __schedule(
179 self,
180 **kwargs,
181 ) -> Job:
182 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
183 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
184 if job.has_attempts_remaining:
185 with self.__jobs_lock:
186 self.__jobs.add(job)
187 return job
189 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int:
190 n_jobs = len(jobs)
192 que: queue.Queue[Job] = queue.Queue()
193 for job in jobs:
194 que.put(job)
196 workers = []
197 for _ in range(self.__n_threads or n_jobs):
198 worker = threading.Thread(
199 target=_exec_job_worker, args=(que, self._BaseScheduler__logger)
200 )
201 worker.daemon = True
202 worker.start()
203 workers.append(worker)
205 que.join()
206 for worker in workers:
207 worker.join()
209 for job in jobs:
210 job._calc_next_exec(ref_dt) # pylint: disable=protected-access
211 if not job.has_attempts_remaining:
212 self.delete_job(job)
214 return n_jobs
216 def exec_jobs(self, force_exec_all: bool = False) -> int:
217 r"""
218 Execute scheduled `Job`\ s.
220 By default executes the |Job|\ s that are overdue.
222 |Job|\ s are executed in order of their priority
223 :ref:`examples.weights`. If the |Scheduler| instance
224 has a limit on the job execution counts per call of
225 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument,
226 |Job|\ s of lower priority might not get executed when
227 competing |Job|\ s are overdue.
229 Parameters
230 ----------
231 force_exec_all : bool
232 Ignore the both - the status of the |Job| timers
233 as well as the execution limit of the |Scheduler|
235 Returns
236 -------
237 int
238 Number of executed |Job|\ s.
239 """
240 ref_dt = dt.datetime.now(tz=self.__tzinfo)
242 if force_exec_all:
243 return self.__exec_jobs(list(self.__jobs), ref_dt)
244 # collect the current priority for all jobs
246 job_priority: dict[Job, float] = {}
247 n_jobs = len(self.__jobs)
248 with self.__jobs_lock:
249 for job in self.__jobs:
250 delta_seconds = job.timedelta(ref_dt).total_seconds()
251 job_priority[job] = self.__priority_function(
252 -delta_seconds,
253 job,
254 self.__max_exec,
255 n_jobs,
256 )
257 # sort the jobs by priority
258 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore
259 # filter jobs by max_exec and priority greater zero
260 filtered_jobs = [
261 job
262 for idx, job in enumerate(sorted_jobs)
263 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0
264 ]
265 return self.__exec_jobs(filtered_jobs, ref_dt)
267 def delete_job(self, job: Job) -> None:
268 """
269 Delete a `Job` from the `Scheduler`.
271 Parameters
272 ----------
273 job : Job
274 |Job| instance to delete.
276 Raises
277 ------
278 SchedulerError
279 Raises if the |Job| of the argument is not scheduled.
280 """
281 try:
282 with self.__jobs_lock:
283 self.__jobs.remove(job)
284 except KeyError:
285 raise SchedulerError("An unscheduled Job can not be deleted!") from None
287 def delete_jobs(
288 self,
289 tags: Optional[set[str]] = None,
290 any_tag: bool = False,
291 ) -> int:
292 r"""
293 Delete a set of |Job|\ s from the |Scheduler| by tags.
295 If no tags or an empty set of tags are given defaults to the deletion
296 of all |Job|\ s.
298 Parameters
299 ----------
300 tags : Optional[set[str]]
301 Set of tags to identify target |Job|\ s.
302 any_tag : bool
303 False: To delete a |Job| all tags have to match.
304 True: To deleta a |Job| at least one tag has to match.
305 """
306 with self.__jobs_lock:
307 if tags is None or tags == {}:
308 n_jobs = len(self.__jobs)
309 self.__jobs = set()
310 return n_jobs
312 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag)
314 self.__jobs = self.__jobs - to_delete
315 return len(to_delete)
317 def get_jobs(
318 self,
319 tags: Optional[set[str]] = None,
320 any_tag: bool = False,
321 ) -> set[Job]:
322 r"""
323 Get a set of |Job|\ s from the |Scheduler| by tags.
325 If no tags or an empty set of tags are given defaults to returning
326 all |Job|\ s.
328 Parameters
329 ----------
330 tags : set[str]
331 Tags to filter scheduled |Job|\ s.
332 If no tags are given all |Job|\ s are returned.
333 any_tag : bool
334 False: To match a |Job| all tags have to match.
335 True: To match a |Job| at least one tag has to match.
337 Returns
338 -------
339 set[Job]
340 Currently scheduled |Job|\ s.
341 """
342 with self.__jobs_lock:
343 if tags is None or tags == {}:
344 return self.__jobs.copy()
345 return select_jobs_by_tag(self.__jobs, tags, any_tag)
347 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs):
348 r"""
349 Schedule a cyclic `Job`.
351 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
352 to schedule a cyclic |Job|.
354 Parameters
355 ----------
356 timing : TimingTypeCyclic
357 Desired execution time.
358 handle : Callable[..., None]
359 Handle to a callback function.
361 Returns
362 -------
363 Job
364 Instance of a scheduled |Job|.
366 Other Parameters
367 ----------------
368 **kwargs
369 |Job| properties, optional
371 `kwargs` are used to specify |Job| properties.
373 Here is a list of available |Job| properties:
375 .. include:: ../_assets/kwargs.rst
376 """
377 _warn_deprecated_delay(**kwargs)
378 try:
379 tg.check_type("timing", timing, TimingCyclic)
380 except TypeError as err:
381 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
382 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
384 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
385 r"""
386 Schedule a minutely `Job`.
388 Use a `datetime.time` object or a `list` of `datetime.time` objects
389 to schedule a |Job| every minute.
391 Notes
392 -----
393 If given a `datetime.time` object with a non zero hour or minute property, these
394 information will be ignored.
396 Parameters
397 ----------
398 timing : TimingDailyUnion
399 Desired execution time(s).
400 handle : Callable[..., None]
401 Handle to a callback function.
403 Returns
404 -------
405 Job
406 Instance of a scheduled |Job|.
408 Other Parameters
409 ----------------
410 **kwargs
411 |Job| properties, optional
413 `kwargs` are used to specify |Job| properties.
415 Here is a list of available |Job| properties:
417 .. include:: ../_assets/kwargs.rst
418 """
419 _warn_deprecated_delay(**kwargs)
420 try:
421 tg.check_type("timing", timing, TimingDailyUnion)
422 except TypeError as err:
423 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
424 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
426 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
427 r"""
428 Schedule an hourly `Job`.
430 Use a `datetime.time` object or a `list` of `datetime.time` objects
431 to schedule a |Job| every hour.
433 Notes
434 -----
435 If given a `datetime.time` object with a non zero hour property, this information
436 will be ignored.
438 Parameters
439 ----------
440 timing : TimingDailyUnion
441 Desired execution time(s).
442 handle : Callable[..., None]
443 Handle to a callback function.
445 Returns
446 -------
447 Job
448 Instance of a scheduled |Job|.
450 Other Parameters
451 ----------------
452 **kwargs
453 |Job| properties, optional
455 `kwargs` are used to specify |Job| properties.
457 Here is a list of available |Job| properties:
459 .. include:: ../_assets/kwargs.rst
460 """
461 _warn_deprecated_delay(**kwargs)
462 try:
463 tg.check_type("timing", timing, TimingDailyUnion)
464 except TypeError as err:
465 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
466 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
468 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
469 r"""
470 Schedule a daily `Job`.
472 Use a `datetime.time` object or a `list` of `datetime.time` objects
473 to schedule a |Job| every day.
475 Parameters
476 ----------
477 timing : TimingDailyUnion
478 Desired execution time(s).
479 handle : Callable[..., None]
480 Handle to a callback function.
482 Returns
483 -------
484 Job
485 Instance of a scheduled |Job|.
487 Other Parameters
488 ----------------
489 **kwargs
490 |Job| properties, optional
492 `kwargs` are used to specify |Job| properties.
494 Here is a list of available |Job| properties:
496 .. include:: ../_assets/kwargs.rst
497 """
498 _warn_deprecated_delay(**kwargs)
499 try:
500 tg.check_type("timing", timing, TimingDailyUnion)
501 except TypeError as err:
502 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
503 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
505 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs):
506 r"""
507 Schedule a weekly `Job`.
509 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
510 recurring |Job|. Combine multiple desired `tuples` in
511 a `list`. If the planed execution time is `00:00` the `datetime.time` object
512 can be ignored, just pass a `Weekday` without a `tuple`.
514 Parameters
515 ----------
516 timing : TimingWeeklyUnion
517 Desired execution time(s).
518 handle : Callable[..., None]
519 Handle to a callback function.
521 Returns
522 -------
523 Job
524 Instance of a scheduled |Job|.
526 Other Parameters
527 ----------------
528 **kwargs
529 |Job| properties, optional
531 `kwargs` are used to specify |Job| properties.
533 Here is a list of available |Job| properties:
535 .. include:: ../_assets/kwargs.rst
536 """
537 _warn_deprecated_delay(**kwargs)
538 try:
539 tg.check_type("timing", timing, TimingWeeklyUnion)
540 except TypeError as err:
541 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
542 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
544 def once( # pylint: disable=arguments-differ
545 self,
546 timing: TimingOnceUnion,
547 handle: Callable[..., None],
548 *,
549 args: tuple[Any] = None,
550 kwargs: Optional[dict[str, Any]] = None,
551 tags: Optional[list[str]] = None,
552 alias: str = None,
553 weight: float = 1,
554 ):
555 r"""
556 Schedule a oneshot `Job`.
558 Parameters
559 ----------
560 timing : TimingOnceUnion
561 Desired execution time.
562 handle : Callable[..., None]
563 Handle to a callback function.
564 args : tuple[Any]
565 Positional argument payload for the function handle within a |Job|.
566 kwargs : Optional[dict[str, Any]]
567 Keyword arguments payload for the function handle within a |Job|.
568 tags : Optional[set[str]]
569 The tags of the |Job|.
570 alias : Optional[str]
571 Overwrites the function handle name in the string representation.
572 weight : float
573 Relative weight against other |Job|\ s.
575 Returns
576 -------
577 Job
578 Instance of a scheduled |Job|.
579 """
580 try:
581 tg.check_type("timing", timing, TimingOnceUnion)
582 except TypeError as err:
583 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
584 if isinstance(timing, dt.datetime):
585 return self.__schedule(
586 job_type=JobType.CYCLIC,
587 timing=dt.timedelta(),
588 handle=handle,
589 args=args,
590 kwargs=kwargs,
591 max_attempts=1,
592 tags=tags,
593 alias=alias,
594 weight=weight,
595 delay=False,
596 start=timing,
597 )
598 return self.__schedule(
599 job_type=JOB_TYPE_MAPPING[type(timing)],
600 timing=timing,
601 handle=handle,
602 args=args,
603 kwargs=kwargs,
604 max_attempts=1,
605 tags=tags,
606 alias=alias,
607 weight=weight,
608 )
610 @property
611 def jobs(self) -> set[Job]:
612 r"""
613 Get the set of all `Job`\ s.
615 Returns
616 -------
617 set[Job]
618 Currently scheduled |Job|\ s.
619 """
620 return self.__jobs.copy()