Coverage for scheduler/asyncio/scheduler.py: 100%
126 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
1"""
2Implementation of a `asyncio` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9import asyncio as aio
10import datetime as dt
11from asyncio.selector_events import BaseSelectorEventLoop
12from collections.abc import Iterable
13from logging import Logger
14from typing import Any, Callable, Coroutine, Optional
16import typeguard as tg
18from scheduler.asyncio.job import Job
19from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
20from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag
21from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
22from scheduler.base.timingtype import (
23 TimingCyclic,
24 TimingDailyUnion,
25 TimingOnceUnion,
26 TimingWeeklyUnion,
27)
28from scheduler.error import SchedulerError
29from scheduler.message import (
30 CYCLIC_TYPE_ERROR_MSG,
31 DAILY_TYPE_ERROR_MSG,
32 HOURLY_TYPE_ERROR_MSG,
33 MINUTELY_TYPE_ERROR_MSG,
34 ONCE_TYPE_ERROR_MSG,
35 WEEKLY_TYPE_ERROR_MSG,
36)
39class Scheduler(BaseScheduler[Job, Callable[..., Coroutine[Any, Any, None]]]):
40 r"""
41 Implementation of an asyncio scheduler.
43 This implementation enables the planning of |AioJob|\ s depending on time
44 cycles, fixed times, weekdays, dates, offsets and execution counts.
46 Notes
47 -----
48 Due to the support of `datetime` objects, the |AioScheduler| is able to work
49 with timezones.
51 Parameters
52 ----------
53 loop : asyncio.selector_events.BaseSelectorEventLoop
54 Set a AsyncIO event loop, default is the global event loop
55 tzinfo : datetime.tzinfo
56 Set the timezone of the |AioScheduler|.
57 logger : Optional[logging.Logger]
58 A custom Logger instance.
59 """
61 def __init__(
62 self,
63 *,
64 loop: Optional[BaseSelectorEventLoop] = None,
65 tzinfo: Optional[dt.tzinfo] = None,
66 logger: Optional[Logger] = None,
67 ):
68 super().__init__(logger=logger)
69 try:
70 self.__loop = loop if loop else aio.get_running_loop()
71 except RuntimeError:
72 raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None
73 self.__tzinfo = tzinfo
74 self.__tz_str = check_tzname(tzinfo=tzinfo)
76 self._jobs: dict[Job, aio.Task[None]] = {}
78 def __repr__(self) -> str:
79 return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format(
80 ", ".join((repr(elem) for elem in (self.__tzinfo,))),
81 ", ".join([repr(job) for job in sorted(self.jobs)]),
82 )
84 def __str__(self) -> str:
85 # Scheduler meta heading
86 scheduler_headings = "{0}, {1}\n\n".format(*self.__headings())
88 # Job table (we join two of the Job._repr() fields into one)
89 # columns
90 c_align = ("<", "<", "<", "<", ">", ">")
91 c_width = (8, 16, 19, 12, 9, 13)
92 c_name = (
93 "type",
94 "function / alias",
95 "due at",
96 "tzinfo",
97 "due in",
98 "attempts",
99 )
100 form = [
101 f"{ {idx}:{align}{width}} " for idx, (align, width) in enumerate(zip(c_align, c_width))
102 ]
103 if self.__tz_str is None:
104 form = form[:3] + form[4:]
106 fstring = " ".join(form) + "\n"
107 job_table = fstring.format(*c_name)
108 job_table += fstring.format(*("-" * width for width in c_width))
109 for job in sorted(self.jobs):
110 row = job._str()
111 entries = (
112 row[0],
113 str_cutoff(row[1] + row[2], c_width[1], False),
114 row[3],
115 str_cutoff(row[4] or "", c_width[3], False),
116 str_cutoff(row[5], c_width[4], True),
117 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
118 )
119 job_table += fstring.format(*entries)
121 return scheduler_headings + job_table
123 def __headings(self) -> list[str]:
124 headings = [
125 f"tzinfo={self.__tz_str}",
126 f"#jobs={len(self._jobs)}",
127 ]
128 return headings
130 def __schedule(
131 self,
132 **kwargs,
133 ) -> Job:
134 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
135 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
137 task = self.__loop.create_task(self.__supervise_job(job))
138 self._jobs[job] = task
140 return job
142 async def __supervise_job(self, job: Job) -> None:
143 try:
144 reference_dt = dt.datetime.now(tz=self.__tzinfo)
145 while job.has_attempts_remaining:
146 sleep_seconds: float = job.timedelta(reference_dt).total_seconds()
147 await aio.sleep(sleep_seconds)
149 await job._exec(logger=self._logger) # pylint: disable=protected-access
151 reference_dt = dt.datetime.now(tz=self.__tzinfo)
152 job._calc_next_exec(reference_dt) # pylint: disable=protected-access
153 except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why?
154 # raised, when `task.cancel()` in `delete_job` was run
155 pass # pragma: no cover
156 else:
157 self.delete_job(job)
159 def delete_job(self, job: Job) -> None:
160 """
161 Delete a `Job` from the `Scheduler`.
163 Parameters
164 ----------
165 job : Job
166 |AioJob| instance to delete.
168 Raises
169 ------
170 SchedulerError
171 Raises if the |AioJob| of the argument is not scheduled.
172 """
173 try:
174 task: aio.Task[None] = self._jobs.pop(job)
175 _: bool = task.cancel()
176 except KeyError:
177 raise SchedulerError("An unscheduled Job can not be deleted!") from None
179 def delete_jobs(
180 self,
181 tags: Optional[set[str]] = None,
182 any_tag: bool = False,
183 ) -> int:
184 r"""
185 Delete a set of |AioJob|\ s from the |AioScheduler| by tags.
187 If no tags or an empty set of tags are given defaults to the deletion
188 of all |AioJob|\ s.
190 Parameters
191 ----------
192 tags : Optional[set[str]]
193 Set of tags to identify target |AioJob|\ s.
194 any_tag : bool
195 False: To delete a |AioJob| all tags have to match.
196 True: To delete a |AioJob| at least one tag has to match.
197 """
198 all_jobs: set[Job] = set(self._jobs.keys())
199 jobs_to_delete: set[Job]
201 if tags is None or tags == set():
202 jobs_to_delete = all_jobs
203 else:
204 jobs_to_delete = select_jobs_by_tag(all_jobs, tags, any_tag)
206 for job in jobs_to_delete:
207 self.delete_job(job)
209 return len(jobs_to_delete)
211 def get_jobs(
212 self,
213 tags: Optional[set[str]] = None,
214 any_tag: bool = False,
215 ) -> set[Job]:
216 r"""
217 Get a set of |AioJob|\ s from the |AioScheduler| by tags.
219 If no tags or an empty set of tags are given defaults to returning
220 all |AioJob|\ s.
222 Parameters
223 ----------
224 tags : set[str]
225 Tags to filter scheduled |AioJob|\ s.
226 If no tags are given all |AioJob|\ s are returned.
227 any_tag : bool
228 False: To match a |AioJob| all tags have to match.
229 True: To match a |AioJob| at least one tag has to match.
231 Returns
232 -------
233 set[Job]
234 Currently scheduled |AioJob|\ s.
235 """
236 if tags is None or tags == set():
237 return self.jobs
238 return select_jobs_by_tag(self.jobs, tags, any_tag)
240 @deprecated(["delay"])
241 def cyclic(
242 self, timing: TimingCyclic, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
243 ) -> Job:
244 r"""
245 Schedule a cyclic `Job`.
247 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
248 to schedule a cyclic |AioJob|.
250 Parameters
251 ----------
252 timing : TimingTypeCyclic
253 Desired execution time.
254 handle : Callable[..., Coroutine[Any, Any, None]]
255 Handle to a callback function.
257 Returns
258 -------
259 Job
260 Instance of a scheduled |AioJob|.
262 Other Parameters
263 ----------------
264 **kwargs
265 |AioJob| properties, optional
267 `kwargs` are used to specify |AioJob| properties.
269 Here is a list of available |AioJob| properties:
271 .. include:: ../_assets/aio_kwargs.rst
272 """
273 try:
274 tg.check_type(timing, TimingCyclic)
275 except tg.TypeCheckError as err:
276 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
277 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
279 @deprecated(["delay"])
280 def minutely(
281 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
282 ) -> Job:
283 r"""
284 Schedule a minutely `Job`.
286 Use a `datetime.time` object or a `list` of `datetime.time` objects
287 to schedule a |AioJob| every minute.
289 Notes
290 -----
291 If given a `datetime.time` object with a non zero hour or minute property, these
292 information will be ignored.
294 Parameters
295 ----------
296 timing : TimingDailyUnion
297 Desired execution time(s).
298 handle : Callable[..., Coroutine[Any, Any, None]]
299 Handle to a callback function.
301 Returns
302 -------
303 Job
304 Instance of a scheduled |AioJob|.
306 Other Parameters
307 ----------------
308 **kwargs
309 |AioJob| properties, optional
311 `kwargs` are used to specify |AioJob| properties.
313 Here is a list of available |AioJob| properties:
315 .. include:: ../_assets/aio_kwargs.rst
316 """
317 try:
318 tg.check_type(timing, TimingDailyUnion)
319 except tg.TypeCheckError as err:
320 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
321 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
323 @deprecated(["delay"])
324 def hourly(
325 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
326 ) -> Job:
327 r"""
328 Schedule an hourly `Job`.
330 Use a `datetime.time` object or a `list` of `datetime.time` objects
331 to schedule a |AioJob| every hour.
333 Notes
334 -----
335 If given a `datetime.time` object with a non zero hour property, this information
336 will be ignored.
338 Parameters
339 ----------
340 timing : TimingDailyUnion
341 Desired execution time(s).
342 handle : Callable[..., Coroutine[Any, Any, None]]
343 Handle to a callback function.
345 Returns
346 -------
347 Job
348 Instance of a scheduled |AioJob|.
350 Other Parameters
351 ----------------
352 **kwargs
353 |AioJob| properties, optional
355 `kwargs` are used to specify |AioJob| properties.
357 Here is a list of available |AioJob| properties:
359 .. include:: ../_assets/aio_kwargs.rst
360 """
361 try:
362 tg.check_type(timing, TimingDailyUnion)
363 except tg.TypeCheckError as err:
364 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
365 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
367 @deprecated(["delay"])
368 def daily(
369 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
370 ) -> Job:
371 r"""
372 Schedule a daily `Job`.
374 Use a `datetime.time` object or a `list` of `datetime.time` objects
375 to schedule a |AioJob| every day.
377 Parameters
378 ----------
379 timing : TimingDailyUnion
380 Desired execution time(s).
381 handle : Callable[..., Coroutine[Any, Any, None]]
382 Handle to a callback function.
384 Returns
385 -------
386 Job
387 Instance of a scheduled |AioJob|.
389 Other Parameters
390 ----------------
391 **kwargs
392 |AioJob| properties, optional
394 `kwargs` are used to specify |AioJob| properties.
396 Here is a list of available |AioJob| properties:
398 .. include:: ../_assets/aio_kwargs.rst
399 """
400 try:
401 tg.check_type(timing, TimingDailyUnion)
402 except tg.TypeCheckError as err:
403 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
404 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
406 @deprecated(["delay"])
407 def weekly(
408 self, timing: TimingWeeklyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
409 ) -> Job:
410 r"""
411 Schedule a weekly `Job`.
413 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
414 recurring |AioJob|. Combine multiple desired `tuples` in
415 a `list`. If the planed execution time is `00:00` the `datetime.time` object
416 can be ignored, just pass a `Weekday` without a `tuple`.
418 Parameters
419 ----------
420 timing : TimingWeeklyUnion
421 Desired execution time(s).
422 handle : Callable[..., Coroutine[Any, Any, None]]
423 Handle to a callback function.
425 Returns
426 -------
427 Job
428 Instance of a scheduled |AioJob|.
430 Other Parameters
431 ----------------
432 **kwargs
433 |AioJob| properties, optional
435 `kwargs` are used to specify |AioJob| properties.
437 Here is a list of available |AioJob| properties:
439 .. include:: ../_assets/aio_kwargs.rst
440 """
441 try:
442 tg.check_type(timing, TimingWeeklyUnion)
443 except tg.TypeCheckError as err:
444 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
445 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
447 def once(
448 self,
449 timing: TimingOnceUnion,
450 handle: Callable[..., Coroutine[Any, Any, None]],
451 *,
452 args: Optional[tuple[Any, ...]] = None,
453 kwargs: Optional[dict[str, Any]] = None,
454 tags: Optional[Iterable[str]] = None,
455 alias: Optional[str] = None,
456 ) -> Job:
457 r"""
458 Schedule a oneshot `Job`.
460 Parameters
461 ----------
462 timing : TimingOnceUnion
463 Desired execution time.
464 handle : Callable[..., Coroutine[Any, Any, None]]
465 Handle to a callback function.
466 args : tuple[Any]
467 Positional argument payload for the function handle within a |AioJob|.
468 kwargs : Optional[dict[str, Any]]
469 Keyword arguments payload for the function handle within a |AioJob|.
470 tags : Optional[Iterable[str]]
471 The tags of the |AioJob|.
472 alias : Optional[str]
473 Overwrites the function handle name in the string representation.
475 Returns
476 -------
477 Job
478 Instance of a scheduled |AioJob|.
479 """
480 try:
481 tg.check_type(timing, TimingOnceUnion)
482 except tg.TypeCheckError as err:
483 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
484 if isinstance(timing, dt.datetime):
485 return self.__schedule(
486 job_type=JobType.CYCLIC,
487 timing=dt.timedelta(),
488 handle=handle,
489 args=args,
490 kwargs=kwargs,
491 max_attempts=1,
492 tags=set(tags) if tags else set(),
493 alias=alias,
494 delay=False,
495 start=timing,
496 )
497 return self.__schedule(
498 job_type=JOB_TYPE_MAPPING[type(timing)],
499 timing=timing,
500 handle=handle,
501 args=args,
502 kwargs=kwargs,
503 max_attempts=1,
504 tags=tags,
505 alias=alias,
506 )
508 @property
509 def jobs(self) -> set[Job]:
510 r"""
511 Get the set of all `Job`\ s.
513 Returns
514 -------
515 set[Job]
516 Currently scheduled |AioJob|\ s.
517 """
518 return set(self._jobs.keys())