Source code for scheduler.asyncio.scheduler

"""
Implementation of a `asyncio` compatible in-process scheduler.

Author: Jendrik A. Potyka, Fabian A. Preiss
"""

from __future__ import annotations

import asyncio as aio
import datetime as dt
from logging import Logger
from typing import Any, Callable, Optional, cast

import typeguard as tg

from scheduler.asyncio.job import Job
from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
from scheduler.base.scheduler import (
    BaseJob,
    BaseScheduler,
    _warn_deprecated_delay,
    select_jobs_by_tag,
)
from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
from scheduler.base.timingtype import (
    TimingCyclic,
    TimingDailyUnion,
    TimingOnceUnion,
    TimingWeeklyUnion,
)
from scheduler.error import SchedulerError
from scheduler.message import (
    CYCLIC_TYPE_ERROR_MSG,
    DAILY_TYPE_ERROR_MSG,
    HOURLY_TYPE_ERROR_MSG,
    MINUTELY_TYPE_ERROR_MSG,
    ONCE_TYPE_ERROR_MSG,
    WEEKLY_TYPE_ERROR_MSG,
)


[docs]class Scheduler(BaseScheduler): r""" Implementation of an asyncio scheduler. This implementation enables the planning of |AioJob|\ s depending on time cycles, fixed times, weekdays, dates, offsets and execution counts. Notes ----- Due to the support of `datetime` objects, the |AioScheduler| is able to work with timezones. Parameters ---------- loop : asyncio.selector_events.BaseSelectorEventLoop Set a AsyncIO event loop, default is the global event loop tzinfo : datetime.tzinfo Set the timezone of the |AioScheduler|. logger : Optional[logging.Logger] A custom Logger instance. """ def __init__( self, *, loop: Optional[aio.selector_events.BaseSelectorEventLoop] = None, tzinfo: Optional[dt.tzinfo] = None, logger: Optional[Logger] = None, ): super().__init__(logger=logger) try: self.__loop = loop if loop else aio.get_running_loop() except RuntimeError: raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None self.__tzinfo = tzinfo self.__tz_str = check_tzname(tzinfo=tzinfo) self.__jobs: dict[Job, aio.Task[None]] = {} def __repr__(self) -> str: return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format( ", ".join((repr(elem) for elem in (self.__tzinfo,))), ", ".join([repr(job) for job in sorted(self.jobs)]), ) def __str__(self) -> str: # Scheduler meta heading scheduler_headings = "{0}, {1}\n\n".format(*self.__headings()) # Job table (we join two of the Job._repr() fields into one) # columns c_align = ("<", "<", "<", "<", ">", ">") c_width = (8, 16, 19, 12, 9, 13) c_name = ( "type", "function / alias", "due at", "tzinfo", "due in", "attempts", ) form = [ f"{{{idx}:{align}{width}}}" for idx, (align, width) in enumerate(zip(c_align, c_width)) ] if self.__tz_str is None: form = form[:3] + form[4:] fstring = " ".join(form) + "\n" job_table = fstring.format(*c_name) job_table += fstring.format(*("-" * width for width in c_width)) for job in sorted(self.jobs): row = job._str() entries = ( row[0], str_cutoff(row[1] + row[2], c_width[1], False), row[3], str_cutoff(row[4] or "", c_width[3], False), str_cutoff(row[5], c_width[4], True), str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), ) job_table += fstring.format(*entries) return scheduler_headings + job_table def __headings(self) -> list[str]: headings = [ f"tzinfo={self.__tz_str}", f"#jobs={len(self.__jobs)}", ] return headings def __schedule( self, **kwargs, ) -> Job: """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) task = self.__loop.create_task(self.__supervise_job(job)) self.__jobs[job] = task return job async def __supervise_job(self, job: Job) -> None: try: reference_dt = dt.datetime.now(tz=self.__tzinfo) while job.has_attempts_remaining: sleep_seconds: float = job.timedelta(reference_dt).total_seconds() await aio.sleep(sleep_seconds) await job._exec( logger=self._BaseScheduler__logger ) # pylint: disable=protected-access reference_dt = dt.datetime.now(tz=self.__tzinfo) job._calc_next_exec(reference_dt) # pylint: disable=protected-access except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why? # raised, when `task.cancel()` in `delete_job` was run pass # pragma: no cover else: self.delete_job(job)
[docs] def delete_job(self, job: Job) -> None: """ Delete a `Job` from the `Scheduler`. Parameters ---------- job : Job |AioJob| instance to delete. Raises ------ SchedulerError Raises if the |AioJob| of the argument is not scheduled. """ try: task: aio.Task[None] = self.__jobs.pop(job) _: bool = task.cancel() except KeyError: raise SchedulerError("An unscheduled Job can not be deleted!") from None
[docs] def delete_jobs( self, tags: Optional[set[str]] = None, any_tag: bool = False, ) -> int: r""" Delete a set of |AioJob|\ s from the |AioScheduler| by tags. If no tags or an empty set of tags are given defaults to the deletion of all |AioJob|\ s. Parameters ---------- tags : Optional[set[str]] Set of tags to identify target |AioJob|\ s. any_tag : bool False: To delete a |AioJob| all tags have to match. True: To delete a |AioJob| at least one tag has to match. """ all_jobs: set[Job] = set(self.__jobs.keys()) jobs_to_delete: set[Job] if tags is None or tags == {}: jobs_to_delete = all_jobs else: jobs_to_delete = cast( set[Job], select_jobs_by_tag(cast(set[BaseJob], all_jobs), tags, any_tag) ) for job in jobs_to_delete: self.delete_job(job) return len(jobs_to_delete)
[docs] def get_jobs( self, tags: Optional[set[str]] = None, any_tag: bool = False, ) -> set[Job]: r""" Get a set of |AioJob|\ s from the |AioScheduler| by tags. If no tags or an empty set of tags are given defaults to returning all |AioJob|\ s. Parameters ---------- tags : set[str] Tags to filter scheduled |AioJob|\ s. If no tags are given all |AioJob|\ s are returned. any_tag : bool False: To match a |AioJob| all tags have to match. True: To match a |AioJob| at least one tag has to match. Returns ------- set[Job] Currently scheduled |AioJob|\ s. """ if tags is None or tags == {}: return self.jobs return cast(set[Job], select_jobs_by_tag(cast(set[BaseJob], self.jobs), tags, any_tag))
[docs] def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job: r""" Schedule a cyclic `Job`. Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects to schedule a cyclic |AioJob|. Parameters ---------- timing : TimingTypeCyclic Desired execution time. handle : Callable[..., None] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ _warn_deprecated_delay(**kwargs) try: tg.check_type("timing", timing, TimingCyclic) except TypeError as err: raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
[docs] def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: r""" Schedule a minutely `Job`. Use a `datetime.time` object or a `list` of `datetime.time` objects to schedule a |AioJob| every minute. Notes ----- If given a `datetime.time` object with a non zero hour or minute property, these information will be ignored. Parameters ---------- timing : TimingDailyUnion Desired execution time(s). handle : Callable[..., None] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ _warn_deprecated_delay(**kwargs) try: tg.check_type("timing", timing, TimingDailyUnion) except TypeError as err: raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
[docs] def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: r""" Schedule an hourly `Job`. Use a `datetime.time` object or a `list` of `datetime.time` objects to schedule a |AioJob| every hour. Notes ----- If given a `datetime.time` object with a non zero hour property, this information will be ignored. Parameters ---------- timing : TimingDailyUnion Desired execution time(s). handle : Callable[..., None] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ _warn_deprecated_delay(**kwargs) try: tg.check_type("timing", timing, TimingDailyUnion) except TypeError as err: raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
[docs] def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: r""" Schedule a daily `Job`. Use a `datetime.time` object or a `list` of `datetime.time` objects to schedule a |AioJob| every day. Parameters ---------- timing : TimingDailyUnion Desired execution time(s). handle : Callable[..., None] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ _warn_deprecated_delay(**kwargs) try: tg.check_type("timing", timing, TimingDailyUnion) except TypeError as err: raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
[docs] def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job: r""" Schedule a weekly `Job`. Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly recurring |AioJob|. Combine multiple desired `tuples` in a `list`. If the planed execution time is `00:00` the `datetime.time` object can be ignored, just pass a `Weekday` without a `tuple`. Parameters ---------- timing : TimingWeeklyUnion Desired execution time(s). handle : Callable[..., None] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ _warn_deprecated_delay(**kwargs) try: tg.check_type("timing", timing, TimingWeeklyUnion) except TypeError as err: raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
[docs] def once( self, timing: TimingOnceUnion, handle: Callable[..., None], *, args: tuple[Any] = None, kwargs: Optional[dict[str, Any]] = None, tags: Optional[list[str]] = None, alias: str = None, ) -> Job: r""" Schedule a oneshot `Job`. Parameters ---------- timing : TimingOnceUnion Desired execution time. handle : Callable[..., None] Handle to a callback function. args : tuple[Any] Positional argument payload for the function handle within a |AioJob|. kwargs : Optional[dict[str, Any]] Keyword arguments payload for the function handle within a |AioJob|. tags : Optional[set[str]] The tags of the |AioJob|. alias : Optional[str] Overwrites the function handle name in the string representation. Returns ------- Job Instance of a scheduled |AioJob|. """ try: tg.check_type("timing", timing, TimingOnceUnion) except TypeError as err: raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err if isinstance(timing, dt.datetime): return self.__schedule( job_type=JobType.CYCLIC, timing=dt.timedelta(), handle=handle, args=args, kwargs=kwargs, max_attempts=1, tags=tags, alias=alias, delay=False, start=timing, ) return self.__schedule( job_type=JOB_TYPE_MAPPING[type(timing)], timing=timing, handle=handle, args=args, kwargs=kwargs, max_attempts=1, tags=tags, alias=alias, )
@property def jobs(self) -> set[Job]: r""" Get the set of all `Job`\ s. Returns ------- set[Job] Currently scheduled |AioJob|\ s. """ return set(self.__jobs.keys())