Coverage for scheduler/asyncio/scheduler.py: 100%
124 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:46 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:46 +0000
1"""
2Implementation of a `asyncio` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9import asyncio as aio
10import datetime as dt
11from logging import Logger
12from typing import Any, Callable, Optional, cast
14import typeguard as tg
16from scheduler.asyncio.job import Job
17from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
18from scheduler.base.scheduler import (
19 BaseJob,
20 BaseScheduler,
21 deprecated,
22 select_jobs_by_tag,
23)
24from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
25from scheduler.base.timingtype import (
26 TimingCyclic,
27 TimingDailyUnion,
28 TimingOnceUnion,
29 TimingWeeklyUnion,
30)
31from scheduler.error import SchedulerError
32from scheduler.message import (
33 CYCLIC_TYPE_ERROR_MSG,
34 DAILY_TYPE_ERROR_MSG,
35 HOURLY_TYPE_ERROR_MSG,
36 MINUTELY_TYPE_ERROR_MSG,
37 ONCE_TYPE_ERROR_MSG,
38 WEEKLY_TYPE_ERROR_MSG,
39)
42class Scheduler(BaseScheduler):
43 r"""
44 Implementation of an asyncio scheduler.
46 This implementation enables the planning of |AioJob|\ s depending on time
47 cycles, fixed times, weekdays, dates, offsets and execution counts.
49 Notes
50 -----
51 Due to the support of `datetime` objects, the |AioScheduler| is able to work
52 with timezones.
54 Parameters
55 ----------
56 loop : asyncio.selector_events.BaseSelectorEventLoop
57 Set a AsyncIO event loop, default is the global event loop
58 tzinfo : datetime.tzinfo
59 Set the timezone of the |AioScheduler|.
60 logger : Optional[logging.Logger]
61 A custom Logger instance.
62 """
64 def __init__(
65 self,
66 *,
67 loop: Optional[aio.selector_events.BaseSelectorEventLoop] = None,
68 tzinfo: Optional[dt.tzinfo] = None,
69 logger: Optional[Logger] = None,
70 ):
71 super().__init__(logger=logger)
72 try:
73 self.__loop = loop if loop else aio.get_running_loop()
74 except RuntimeError:
75 raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None
76 self.__tzinfo = tzinfo
77 self.__tz_str = check_tzname(tzinfo=tzinfo)
79 self.__jobs: dict[Job, aio.Task[None]] = {}
81 def __repr__(self) -> str:
82 return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format(
83 ", ".join((repr(elem) for elem in (self.__tzinfo,))),
84 ", ".join([repr(job) for job in sorted(self.jobs)]),
85 )
87 def __str__(self) -> str:
88 # Scheduler meta heading
89 scheduler_headings = "{0}, {1}\n\n".format(*self.__headings())
91 # Job table (we join two of the Job._repr() fields into one)
92 # columns
93 c_align = ("<", "<", "<", "<", ">", ">")
94 c_width = (8, 16, 19, 12, 9, 13)
95 c_name = (
96 "type",
97 "function / alias",
98 "due at",
99 "tzinfo",
100 "due in",
101 "attempts",
102 )
103 form = [
104 f"{ {idx}:{align}{width}} " for idx, (align, width) in enumerate(zip(c_align, c_width))
105 ]
106 if self.__tz_str is None:
107 form = form[:3] + form[4:]
109 fstring = " ".join(form) + "\n"
110 job_table = fstring.format(*c_name)
111 job_table += fstring.format(*("-" * width for width in c_width))
112 for job in sorted(self.jobs):
113 row = job._str()
114 entries = (
115 row[0],
116 str_cutoff(row[1] + row[2], c_width[1], False),
117 row[3],
118 str_cutoff(row[4] or "", c_width[3], False),
119 str_cutoff(row[5], c_width[4], True),
120 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
121 )
122 job_table += fstring.format(*entries)
124 return scheduler_headings + job_table
126 def __headings(self) -> list[str]:
127 headings = [
128 f"tzinfo={self.__tz_str}",
129 f"#jobs={len(self.__jobs)}",
130 ]
131 return headings
133 def __schedule(
134 self,
135 **kwargs,
136 ) -> Job:
137 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
138 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
140 task = self.__loop.create_task(self.__supervise_job(job))
141 self.__jobs[job] = task
143 return job
145 async def __supervise_job(self, job: Job) -> None:
146 try:
147 reference_dt = dt.datetime.now(tz=self.__tzinfo)
148 while job.has_attempts_remaining:
149 sleep_seconds: float = job.timedelta(reference_dt).total_seconds()
150 await aio.sleep(sleep_seconds)
152 await job._exec(
153 logger=self._BaseScheduler__logger
154 ) # pylint: disable=protected-access
156 reference_dt = dt.datetime.now(tz=self.__tzinfo)
157 job._calc_next_exec(reference_dt) # pylint: disable=protected-access
158 except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why?
159 # raised, when `task.cancel()` in `delete_job` was run
160 pass # pragma: no cover
161 else:
162 self.delete_job(job)
164 def delete_job(self, job: Job) -> None:
165 """
166 Delete a `Job` from the `Scheduler`.
168 Parameters
169 ----------
170 job : Job
171 |AioJob| instance to delete.
173 Raises
174 ------
175 SchedulerError
176 Raises if the |AioJob| of the argument is not scheduled.
177 """
178 try:
179 task: aio.Task[None] = self.__jobs.pop(job)
180 _: bool = task.cancel()
181 except KeyError:
182 raise SchedulerError("An unscheduled Job can not be deleted!") from None
184 def delete_jobs(
185 self,
186 tags: Optional[set[str]] = None,
187 any_tag: bool = False,
188 ) -> int:
189 r"""
190 Delete a set of |AioJob|\ s from the |AioScheduler| by tags.
192 If no tags or an empty set of tags are given defaults to the deletion
193 of all |AioJob|\ s.
195 Parameters
196 ----------
197 tags : Optional[set[str]]
198 Set of tags to identify target |AioJob|\ s.
199 any_tag : bool
200 False: To delete a |AioJob| all tags have to match.
201 True: To delete a |AioJob| at least one tag has to match.
202 """
203 all_jobs: set[Job] = set(self.__jobs.keys())
204 jobs_to_delete: set[Job]
206 if tags is None or tags == {}:
207 jobs_to_delete = all_jobs
208 else:
209 jobs_to_delete = cast(
210 set[Job], select_jobs_by_tag(cast(set[BaseJob], all_jobs), tags, any_tag)
211 )
213 for job in jobs_to_delete:
214 self.delete_job(job)
216 return len(jobs_to_delete)
218 def get_jobs(
219 self,
220 tags: Optional[set[str]] = None,
221 any_tag: bool = False,
222 ) -> set[Job]:
223 r"""
224 Get a set of |AioJob|\ s from the |AioScheduler| by tags.
226 If no tags or an empty set of tags are given defaults to returning
227 all |AioJob|\ s.
229 Parameters
230 ----------
231 tags : set[str]
232 Tags to filter scheduled |AioJob|\ s.
233 If no tags are given all |AioJob|\ s are returned.
234 any_tag : bool
235 False: To match a |AioJob| all tags have to match.
236 True: To match a |AioJob| at least one tag has to match.
238 Returns
239 -------
240 set[Job]
241 Currently scheduled |AioJob|\ s.
242 """
243 if tags is None or tags == {}:
244 return self.jobs
245 return cast(set[Job], select_jobs_by_tag(cast(set[BaseJob], self.jobs), tags, any_tag))
247 @deprecated(["delay"])
248 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job:
249 r"""
250 Schedule a cyclic `Job`.
252 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
253 to schedule a cyclic |AioJob|.
255 Parameters
256 ----------
257 timing : TimingTypeCyclic
258 Desired execution time.
259 handle : Callable[..., None]
260 Handle to a callback function.
262 Returns
263 -------
264 Job
265 Instance of a scheduled |AioJob|.
267 Other Parameters
268 ----------------
269 **kwargs
270 |AioJob| properties, optional
272 `kwargs` are used to specify |AioJob| properties.
274 Here is a list of available |AioJob| properties:
276 .. include:: ../_assets/aio_kwargs.rst
277 """
278 try:
279 tg.check_type(timing, TimingCyclic)
280 except tg.TypeCheckError as err:
281 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
282 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
284 @deprecated(["delay"])
285 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
286 r"""
287 Schedule a minutely `Job`.
289 Use a `datetime.time` object or a `list` of `datetime.time` objects
290 to schedule a |AioJob| every minute.
292 Notes
293 -----
294 If given a `datetime.time` object with a non zero hour or minute property, these
295 information will be ignored.
297 Parameters
298 ----------
299 timing : TimingDailyUnion
300 Desired execution time(s).
301 handle : Callable[..., None]
302 Handle to a callback function.
304 Returns
305 -------
306 Job
307 Instance of a scheduled |AioJob|.
309 Other Parameters
310 ----------------
311 **kwargs
312 |AioJob| properties, optional
314 `kwargs` are used to specify |AioJob| properties.
316 Here is a list of available |AioJob| properties:
318 .. include:: ../_assets/aio_kwargs.rst
319 """
320 try:
321 tg.check_type(timing, TimingDailyUnion)
322 except tg.TypeCheckError as err:
323 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
324 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
326 @deprecated(["delay"])
327 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
328 r"""
329 Schedule an hourly `Job`.
331 Use a `datetime.time` object or a `list` of `datetime.time` objects
332 to schedule a |AioJob| every hour.
334 Notes
335 -----
336 If given a `datetime.time` object with a non zero hour property, this information
337 will be ignored.
339 Parameters
340 ----------
341 timing : TimingDailyUnion
342 Desired execution time(s).
343 handle : Callable[..., None]
344 Handle to a callback function.
346 Returns
347 -------
348 Job
349 Instance of a scheduled |AioJob|.
351 Other Parameters
352 ----------------
353 **kwargs
354 |AioJob| properties, optional
356 `kwargs` are used to specify |AioJob| properties.
358 Here is a list of available |AioJob| properties:
360 .. include:: ../_assets/aio_kwargs.rst
361 """
362 try:
363 tg.check_type(timing, TimingDailyUnion)
364 except tg.TypeCheckError as err:
365 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
366 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
368 @deprecated(["delay"])
369 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
370 r"""
371 Schedule a daily `Job`.
373 Use a `datetime.time` object or a `list` of `datetime.time` objects
374 to schedule a |AioJob| every day.
376 Parameters
377 ----------
378 timing : TimingDailyUnion
379 Desired execution time(s).
380 handle : Callable[..., None]
381 Handle to a callback function.
383 Returns
384 -------
385 Job
386 Instance of a scheduled |AioJob|.
388 Other Parameters
389 ----------------
390 **kwargs
391 |AioJob| properties, optional
393 `kwargs` are used to specify |AioJob| properties.
395 Here is a list of available |AioJob| properties:
397 .. include:: ../_assets/aio_kwargs.rst
398 """
399 try:
400 tg.check_type(timing, TimingDailyUnion)
401 except tg.TypeCheckError as err:
402 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
403 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
405 @deprecated(["delay"])
406 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job:
407 r"""
408 Schedule a weekly `Job`.
410 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
411 recurring |AioJob|. Combine multiple desired `tuples` in
412 a `list`. If the planed execution time is `00:00` the `datetime.time` object
413 can be ignored, just pass a `Weekday` without a `tuple`.
415 Parameters
416 ----------
417 timing : TimingWeeklyUnion
418 Desired execution time(s).
419 handle : Callable[..., None]
420 Handle to a callback function.
422 Returns
423 -------
424 Job
425 Instance of a scheduled |AioJob|.
427 Other Parameters
428 ----------------
429 **kwargs
430 |AioJob| properties, optional
432 `kwargs` are used to specify |AioJob| properties.
434 Here is a list of available |AioJob| properties:
436 .. include:: ../_assets/aio_kwargs.rst
437 """
438 try:
439 tg.check_type(timing, TimingWeeklyUnion)
440 except tg.TypeCheckError as err:
441 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
442 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
444 def once(
445 self,
446 timing: TimingOnceUnion,
447 handle: Callable[..., None],
448 *,
449 args: tuple[Any] = None,
450 kwargs: Optional[dict[str, Any]] = None,
451 tags: Optional[list[str]] = None,
452 alias: str = None,
453 ) -> Job:
454 r"""
455 Schedule a oneshot `Job`.
457 Parameters
458 ----------
459 timing : TimingOnceUnion
460 Desired execution time.
461 handle : Callable[..., None]
462 Handle to a callback function.
463 args : tuple[Any]
464 Positional argument payload for the function handle within a |AioJob|.
465 kwargs : Optional[dict[str, Any]]
466 Keyword arguments payload for the function handle within a |AioJob|.
467 tags : Optional[set[str]]
468 The tags of the |AioJob|.
469 alias : Optional[str]
470 Overwrites the function handle name in the string representation.
472 Returns
473 -------
474 Job
475 Instance of a scheduled |AioJob|.
476 """
477 try:
478 tg.check_type(timing, TimingOnceUnion)
479 except tg.TypeCheckError as err:
480 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
481 if isinstance(timing, dt.datetime):
482 return self.__schedule(
483 job_type=JobType.CYCLIC,
484 timing=dt.timedelta(),
485 handle=handle,
486 args=args,
487 kwargs=kwargs,
488 max_attempts=1,
489 tags=tags,
490 alias=alias,
491 delay=False,
492 start=timing,
493 )
494 return self.__schedule(
495 job_type=JOB_TYPE_MAPPING[type(timing)],
496 timing=timing,
497 handle=handle,
498 args=args,
499 kwargs=kwargs,
500 max_attempts=1,
501 tags=tags,
502 alias=alias,
503 )
505 @property
506 def jobs(self) -> set[Job]:
507 r"""
508 Get the set of all `Job`\ s.
510 Returns
511 -------
512 set[Job]
513 Currently scheduled |AioJob|\ s.
514 """
515 return set(self.__jobs.keys())