Coverage for scheduler/asyncio/scheduler.py: 100%
124 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-03-19 22:04 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-03-19 22:04 +0000
1"""
2Implementation of a `asyncio` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9import asyncio as aio
10import datetime as dt
11from logging import Logger
12from typing import Any, Callable, Optional, cast
14import typeguard as tg
16from scheduler.asyncio.job import Job
17from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
18from scheduler.base.scheduler import (
19 BaseJob,
20 BaseScheduler,
21 _warn_deprecated_delay,
22 select_jobs_by_tag,
23)
24from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
25from scheduler.base.timingtype import (
26 TimingCyclic,
27 TimingDailyUnion,
28 TimingOnceUnion,
29 TimingWeeklyUnion,
30)
31from scheduler.error import SchedulerError
32from scheduler.message import (
33 CYCLIC_TYPE_ERROR_MSG,
34 DAILY_TYPE_ERROR_MSG,
35 HOURLY_TYPE_ERROR_MSG,
36 MINUTELY_TYPE_ERROR_MSG,
37 ONCE_TYPE_ERROR_MSG,
38 WEEKLY_TYPE_ERROR_MSG,
39)
42class Scheduler(BaseScheduler):
43 r"""
44 Implementation of an asyncio scheduler.
46 This implementation enables the planning of |AioJob|\ s depending on time
47 cycles, fixed times, weekdays, dates, offsets and execution counts.
49 Notes
50 -----
51 Due to the support of `datetime` objects, the |AioScheduler| is able to work
52 with timezones.
54 Parameters
55 ----------
56 loop : asyncio.selector_events.BaseSelectorEventLoop
57 Set a AsyncIO event loop, default is the global event loop
58 tzinfo : datetime.tzinfo
59 Set the timezone of the |AioScheduler|.
60 logger : Optional[logging.Logger]
61 A custom Logger instance.
62 """
64 def __init__(
65 self,
66 *,
67 loop: Optional[aio.selector_events.BaseSelectorEventLoop] = None,
68 tzinfo: Optional[dt.tzinfo] = None,
69 logger: Optional[Logger] = None,
70 ):
71 super().__init__(logger=logger)
72 try:
73 self.__loop = loop if loop else aio.get_running_loop()
74 except RuntimeError:
75 raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None
76 self.__tzinfo = tzinfo
77 self.__tz_str = check_tzname(tzinfo=tzinfo)
79 self.__jobs: dict[Job, aio.Task[None]] = {}
81 def __repr__(self) -> str:
82 return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format(
83 ", ".join((repr(elem) for elem in (self.__tzinfo,))),
84 ", ".join([repr(job) for job in sorted(self.jobs)]),
85 )
87 def __str__(self) -> str:
88 # Scheduler meta heading
89 scheduler_headings = "{0}, {1}\n\n".format(*self.__headings())
91 # Job table (we join two of the Job._repr() fields into one)
92 # columns
93 c_align = ("<", "<", "<", "<", ">", ">")
94 c_width = (8, 16, 19, 12, 9, 13)
95 c_name = (
96 "type",
97 "function / alias",
98 "due at",
99 "tzinfo",
100 "due in",
101 "attempts",
102 )
103 form = [
104 f"{{{idx}:{align}{width}}}" for idx, (align, width) in enumerate(zip(c_align, c_width))
105 ]
106 if self.__tz_str is None:
107 form = form[:3] + form[4:]
109 fstring = " ".join(form) + "\n"
110 job_table = fstring.format(*c_name)
111 job_table += fstring.format(*("-" * width for width in c_width))
112 for job in sorted(self.jobs):
113 row = job._str()
114 entries = (
115 row[0],
116 str_cutoff(row[1] + row[2], c_width[1], False),
117 row[3],
118 str_cutoff(row[4] or "", c_width[3], False),
119 str_cutoff(row[5], c_width[4], True),
120 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
121 )
122 job_table += fstring.format(*entries)
124 return scheduler_headings + job_table
126 def __headings(self) -> list[str]:
127 headings = [
128 f"tzinfo={self.__tz_str}",
129 f"#jobs={len(self.__jobs)}",
130 ]
131 return headings
133 def __schedule(
134 self,
135 **kwargs,
136 ) -> Job:
137 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
138 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
140 task = self.__loop.create_task(self.__supervise_job(job))
141 self.__jobs[job] = task
143 return job
145 async def __supervise_job(self, job: Job) -> None:
146 try:
147 reference_dt = dt.datetime.now(tz=self.__tzinfo)
148 while job.has_attempts_remaining:
149 sleep_seconds: float = job.timedelta(reference_dt).total_seconds()
150 await aio.sleep(sleep_seconds)
152 await job._exec(
153 logger=self._BaseScheduler__logger
154 ) # pylint: disable=protected-access
156 reference_dt = dt.datetime.now(tz=self.__tzinfo)
157 job._calc_next_exec(reference_dt) # pylint: disable=protected-access
158 except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why?
159 # raised, when `task.cancel()` in `delete_job` was run
160 pass # pragma: no cover
161 else:
162 self.delete_job(job)
164 def delete_job(self, job: Job) -> None:
165 """
166 Delete a `Job` from the `Scheduler`.
168 Parameters
169 ----------
170 job : Job
171 |AioJob| instance to delete.
173 Raises
174 ------
175 SchedulerError
176 Raises if the |AioJob| of the argument is not scheduled.
177 """
178 try:
179 task: aio.Task[None] = self.__jobs.pop(job)
180 _: bool = task.cancel()
181 except KeyError:
182 raise SchedulerError("An unscheduled Job can not be deleted!") from None
184 def delete_jobs(
185 self,
186 tags: Optional[set[str]] = None,
187 any_tag: bool = False,
188 ) -> int:
189 r"""
190 Delete a set of |AioJob|\ s from the |AioScheduler| by tags.
192 If no tags or an empty set of tags are given defaults to the deletion
193 of all |AioJob|\ s.
195 Parameters
196 ----------
197 tags : Optional[set[str]]
198 Set of tags to identify target |AioJob|\ s.
199 any_tag : bool
200 False: To delete a |AioJob| all tags have to match.
201 True: To delete a |AioJob| at least one tag has to match.
202 """
203 all_jobs: set[Job] = set(self.__jobs.keys())
204 jobs_to_delete: set[Job]
206 if tags is None or tags == {}:
207 jobs_to_delete = all_jobs
208 else:
209 jobs_to_delete = cast(
210 set[Job], select_jobs_by_tag(cast(set[BaseJob], all_jobs), tags, any_tag)
211 )
213 for job in jobs_to_delete:
214 self.delete_job(job)
216 return len(jobs_to_delete)
218 def get_jobs(
219 self,
220 tags: Optional[set[str]] = None,
221 any_tag: bool = False,
222 ) -> set[Job]:
223 r"""
224 Get a set of |AioJob|\ s from the |AioScheduler| by tags.
226 If no tags or an empty set of tags are given defaults to returning
227 all |AioJob|\ s.
229 Parameters
230 ----------
231 tags : set[str]
232 Tags to filter scheduled |AioJob|\ s.
233 If no tags are given all |AioJob|\ s are returned.
234 any_tag : bool
235 False: To match a |AioJob| all tags have to match.
236 True: To match a |AioJob| at least one tag has to match.
238 Returns
239 -------
240 set[Job]
241 Currently scheduled |AioJob|\ s.
242 """
243 if tags is None or tags == {}:
244 return self.jobs
245 return cast(set[Job], select_jobs_by_tag(cast(set[BaseJob], self.jobs), tags, any_tag))
247 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job:
248 r"""
249 Schedule a cyclic `Job`.
251 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
252 to schedule a cyclic |AioJob|.
254 Parameters
255 ----------
256 timing : TimingTypeCyclic
257 Desired execution time.
258 handle : Callable[..., None]
259 Handle to a callback function.
261 Returns
262 -------
263 Job
264 Instance of a scheduled |AioJob|.
266 Other Parameters
267 ----------------
268 **kwargs
269 |AioJob| properties, optional
271 `kwargs` are used to specify |AioJob| properties.
273 Here is a list of available |AioJob| properties:
275 .. include:: ../_assets/aio_kwargs.rst
276 """
277 _warn_deprecated_delay(**kwargs)
278 try:
279 tg.check_type(timing, TimingCyclic)
280 except tg.TypeCheckError as err:
281 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
282 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
284 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
285 r"""
286 Schedule a minutely `Job`.
288 Use a `datetime.time` object or a `list` of `datetime.time` objects
289 to schedule a |AioJob| every minute.
291 Notes
292 -----
293 If given a `datetime.time` object with a non zero hour or minute property, these
294 information will be ignored.
296 Parameters
297 ----------
298 timing : TimingDailyUnion
299 Desired execution time(s).
300 handle : Callable[..., None]
301 Handle to a callback function.
303 Returns
304 -------
305 Job
306 Instance of a scheduled |AioJob|.
308 Other Parameters
309 ----------------
310 **kwargs
311 |AioJob| properties, optional
313 `kwargs` are used to specify |AioJob| properties.
315 Here is a list of available |AioJob| properties:
317 .. include:: ../_assets/aio_kwargs.rst
318 """
319 _warn_deprecated_delay(**kwargs)
320 try:
321 tg.check_type(timing, TimingDailyUnion)
322 except tg.TypeCheckError as err:
323 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
324 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
326 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
327 r"""
328 Schedule an hourly `Job`.
330 Use a `datetime.time` object or a `list` of `datetime.time` objects
331 to schedule a |AioJob| every hour.
333 Notes
334 -----
335 If given a `datetime.time` object with a non zero hour property, this information
336 will be ignored.
338 Parameters
339 ----------
340 timing : TimingDailyUnion
341 Desired execution time(s).
342 handle : Callable[..., None]
343 Handle to a callback function.
345 Returns
346 -------
347 Job
348 Instance of a scheduled |AioJob|.
350 Other Parameters
351 ----------------
352 **kwargs
353 |AioJob| properties, optional
355 `kwargs` are used to specify |AioJob| properties.
357 Here is a list of available |AioJob| properties:
359 .. include:: ../_assets/aio_kwargs.rst
360 """
361 _warn_deprecated_delay(**kwargs)
362 try:
363 tg.check_type(timing, TimingDailyUnion)
364 except tg.TypeCheckError as err:
365 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
366 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
368 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
369 r"""
370 Schedule a daily `Job`.
372 Use a `datetime.time` object or a `list` of `datetime.time` objects
373 to schedule a |AioJob| every day.
375 Parameters
376 ----------
377 timing : TimingDailyUnion
378 Desired execution time(s).
379 handle : Callable[..., None]
380 Handle to a callback function.
382 Returns
383 -------
384 Job
385 Instance of a scheduled |AioJob|.
387 Other Parameters
388 ----------------
389 **kwargs
390 |AioJob| properties, optional
392 `kwargs` are used to specify |AioJob| properties.
394 Here is a list of available |AioJob| properties:
396 .. include:: ../_assets/aio_kwargs.rst
397 """
398 _warn_deprecated_delay(**kwargs)
399 try:
400 tg.check_type(timing, TimingDailyUnion)
401 except tg.TypeCheckError as err:
402 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
403 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
405 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job:
406 r"""
407 Schedule a weekly `Job`.
409 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
410 recurring |AioJob|. Combine multiple desired `tuples` in
411 a `list`. If the planed execution time is `00:00` the `datetime.time` object
412 can be ignored, just pass a `Weekday` without a `tuple`.
414 Parameters
415 ----------
416 timing : TimingWeeklyUnion
417 Desired execution time(s).
418 handle : Callable[..., None]
419 Handle to a callback function.
421 Returns
422 -------
423 Job
424 Instance of a scheduled |AioJob|.
426 Other Parameters
427 ----------------
428 **kwargs
429 |AioJob| properties, optional
431 `kwargs` are used to specify |AioJob| properties.
433 Here is a list of available |AioJob| properties:
435 .. include:: ../_assets/aio_kwargs.rst
436 """
437 _warn_deprecated_delay(**kwargs)
438 try:
439 tg.check_type(timing, TimingWeeklyUnion)
440 except tg.TypeCheckError as err:
441 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
442 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
444 def once(
445 self,
446 timing: TimingOnceUnion,
447 handle: Callable[..., None],
448 *,
449 args: tuple[Any] = None,
450 kwargs: Optional[dict[str, Any]] = None,
451 tags: Optional[list[str]] = None,
452 alias: str = None,
453 ) -> Job:
454 r"""
455 Schedule a oneshot `Job`.
457 Parameters
458 ----------
459 timing : TimingOnceUnion
460 Desired execution time.
461 handle : Callable[..., None]
462 Handle to a callback function.
463 args : tuple[Any]
464 Positional argument payload for the function handle within a |AioJob|.
465 kwargs : Optional[dict[str, Any]]
466 Keyword arguments payload for the function handle within a |AioJob|.
467 tags : Optional[set[str]]
468 The tags of the |AioJob|.
469 alias : Optional[str]
470 Overwrites the function handle name in the string representation.
472 Returns
473 -------
474 Job
475 Instance of a scheduled |AioJob|.
476 """
477 try:
478 tg.check_type(timing, TimingOnceUnion)
479 except tg.TypeCheckError as err:
480 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
481 if isinstance(timing, dt.datetime):
482 return self.__schedule(
483 job_type=JobType.CYCLIC,
484 timing=dt.timedelta(),
485 handle=handle,
486 args=args,
487 kwargs=kwargs,
488 max_attempts=1,
489 tags=tags,
490 alias=alias,
491 delay=False,
492 start=timing,
493 )
494 return self.__schedule(
495 job_type=JOB_TYPE_MAPPING[type(timing)],
496 timing=timing,
497 handle=handle,
498 args=args,
499 kwargs=kwargs,
500 max_attempts=1,
501 tags=tags,
502 alias=alias,
503 )
505 @property
506 def jobs(self) -> set[Job]:
507 r"""
508 Get the set of all `Job`\ s.
510 Returns
511 -------
512 set[Job]
513 Currently scheduled |AioJob|\ s.
514 """
515 return set(self.__jobs.keys())