Source code for scheduler.asyncio.scheduler

"""
Implementation of a `asyncio` compatible in-process scheduler.

Author: Jendrik A. Potyka, Fabian A. Preiss
"""

from __future__ import annotations

import asyncio as aio
import datetime as dt
from asyncio.selector_events import BaseSelectorEventLoop
from collections.abc import Iterable
from logging import Logger
from typing import Any, Callable, Coroutine, Optional

import typeguard as tg

from scheduler.asyncio.job import Job
from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag
from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
from scheduler.base.timingtype import (
    TimingCyclic,
    TimingDailyUnion,
    TimingOnceUnion,
    TimingWeeklyUnion,
)
from scheduler.error import SchedulerError
from scheduler.message import (
    CYCLIC_TYPE_ERROR_MSG,
    DAILY_TYPE_ERROR_MSG,
    HOURLY_TYPE_ERROR_MSG,
    MINUTELY_TYPE_ERROR_MSG,
    ONCE_TYPE_ERROR_MSG,
    WEEKLY_TYPE_ERROR_MSG,
)


[docs]class Scheduler(BaseScheduler[Job, Callable[..., Coroutine[Any, Any, None]]]): r""" Implementation of an asyncio scheduler. This implementation enables the planning of |AioJob|\ s depending on time cycles, fixed times, weekdays, dates, offsets and execution counts. Notes ----- Due to the support of `datetime` objects, the |AioScheduler| is able to work with timezones. Parameters ---------- loop : asyncio.selector_events.BaseSelectorEventLoop Set a AsyncIO event loop, default is the global event loop tzinfo : datetime.tzinfo Set the timezone of the |AioScheduler|. logger : Optional[logging.Logger] A custom Logger instance. """ def __init__( self, *, loop: Optional[BaseSelectorEventLoop] = None, tzinfo: Optional[dt.tzinfo] = None, logger: Optional[Logger] = None, ): super().__init__(logger=logger) try: self.__loop = loop if loop else aio.get_running_loop() except RuntimeError: raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None self.__tzinfo = tzinfo self.__tz_str = check_tzname(tzinfo=tzinfo) self._jobs: dict[Job, aio.Task[None]] = {} def __repr__(self) -> str: return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format( ", ".join((repr(elem) for elem in (self.__tzinfo,))), ", ".join([repr(job) for job in sorted(self.jobs)]), ) def __str__(self) -> str: # Scheduler meta heading scheduler_headings = "{0}, {1}\n\n".format(*self.__headings()) # Job table (we join two of the Job._repr() fields into one) # columns c_align = ("<", "<", "<", "<", ">", ">") c_width = (8, 16, 19, 12, 9, 13) c_name = ( "type", "function / alias", "due at", "tzinfo", "due in", "attempts", ) form = [ f"{{{idx}:{align}{width}}}" for idx, (align, width) in enumerate(zip(c_align, c_width)) ] if self.__tz_str is None: form = form[:3] + form[4:] fstring = " ".join(form) + "\n" job_table = fstring.format(*c_name) job_table += fstring.format(*("-" * width for width in c_width)) for job in sorted(self.jobs): row = job._str() entries = ( row[0], str_cutoff(row[1] + row[2], c_width[1], False), row[3], str_cutoff(row[4] or "", c_width[3], False), str_cutoff(row[5], c_width[4], True), str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), ) job_table += fstring.format(*entries) return scheduler_headings + job_table def __headings(self) -> list[str]: headings = [ f"tzinfo={self.__tz_str}", f"#jobs={len(self._jobs)}", ] return headings def __schedule( self, **kwargs, ) -> Job: """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) task = self.__loop.create_task(self.__supervise_job(job)) self._jobs[job] = task return job async def __supervise_job(self, job: Job) -> None: try: reference_dt = dt.datetime.now(tz=self.__tzinfo) while job.has_attempts_remaining: sleep_seconds: float = job.timedelta(reference_dt).total_seconds() await aio.sleep(sleep_seconds) await job._exec(logger=self._logger) # pylint: disable=protected-access reference_dt = dt.datetime.now(tz=self.__tzinfo) job._calc_next_exec(reference_dt) # pylint: disable=protected-access except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why? # raised, when `task.cancel()` in `delete_job` was run pass # pragma: no cover else: self.delete_job(job)
[docs] def delete_job(self, job: Job) -> None: """ Delete a `Job` from the `Scheduler`. Parameters ---------- job : Job |AioJob| instance to delete. Raises ------ SchedulerError Raises if the |AioJob| of the argument is not scheduled. """ try: task: aio.Task[None] = self._jobs.pop(job) _: bool = task.cancel() except KeyError: raise SchedulerError("An unscheduled Job can not be deleted!") from None
[docs] def delete_jobs( self, tags: Optional[set[str]] = None, any_tag: bool = False, ) -> int: r""" Delete a set of |AioJob|\ s from the |AioScheduler| by tags. If no tags or an empty set of tags are given defaults to the deletion of all |AioJob|\ s. Parameters ---------- tags : Optional[set[str]] Set of tags to identify target |AioJob|\ s. any_tag : bool False: To delete a |AioJob| all tags have to match. True: To delete a |AioJob| at least one tag has to match. """ all_jobs: set[Job] = set(self._jobs.keys()) jobs_to_delete: set[Job] if tags is None or tags == set(): jobs_to_delete = all_jobs else: jobs_to_delete = select_jobs_by_tag(all_jobs, tags, any_tag) for job in jobs_to_delete: self.delete_job(job) return len(jobs_to_delete)
[docs] def get_jobs( self, tags: Optional[set[str]] = None, any_tag: bool = False, ) -> set[Job]: r""" Get a set of |AioJob|\ s from the |AioScheduler| by tags. If no tags or an empty set of tags are given defaults to returning all |AioJob|\ s. Parameters ---------- tags : set[str] Tags to filter scheduled |AioJob|\ s. If no tags are given all |AioJob|\ s are returned. any_tag : bool False: To match a |AioJob| all tags have to match. True: To match a |AioJob| at least one tag has to match. Returns ------- set[Job] Currently scheduled |AioJob|\ s. """ if tags is None or tags == set(): return self.jobs return select_jobs_by_tag(self.jobs, tags, any_tag)
[docs] @deprecated(["delay"]) def cyclic( self, timing: TimingCyclic, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs ) -> Job: r""" Schedule a cyclic `Job`. Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects to schedule a cyclic |AioJob|. Parameters ---------- timing : TimingTypeCyclic Desired execution time. handle : Callable[..., Coroutine[Any, Any, None]] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ try: tg.check_type(timing, TimingCyclic) except tg.TypeCheckError as err: raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
[docs] @deprecated(["delay"]) def minutely( self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs ) -> Job: r""" Schedule a minutely `Job`. Use a `datetime.time` object or a `list` of `datetime.time` objects to schedule a |AioJob| every minute. Notes ----- If given a `datetime.time` object with a non zero hour or minute property, these information will be ignored. Parameters ---------- timing : TimingDailyUnion Desired execution time(s). handle : Callable[..., Coroutine[Any, Any, None]] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ try: tg.check_type(timing, TimingDailyUnion) except tg.TypeCheckError as err: raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
[docs] @deprecated(["delay"]) def hourly( self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs ) -> Job: r""" Schedule an hourly `Job`. Use a `datetime.time` object or a `list` of `datetime.time` objects to schedule a |AioJob| every hour. Notes ----- If given a `datetime.time` object with a non zero hour property, this information will be ignored. Parameters ---------- timing : TimingDailyUnion Desired execution time(s). handle : Callable[..., Coroutine[Any, Any, None]] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ try: tg.check_type(timing, TimingDailyUnion) except tg.TypeCheckError as err: raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
[docs] @deprecated(["delay"]) def daily( self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs ) -> Job: r""" Schedule a daily `Job`. Use a `datetime.time` object or a `list` of `datetime.time` objects to schedule a |AioJob| every day. Parameters ---------- timing : TimingDailyUnion Desired execution time(s). handle : Callable[..., Coroutine[Any, Any, None]] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ try: tg.check_type(timing, TimingDailyUnion) except tg.TypeCheckError as err: raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
[docs] @deprecated(["delay"]) def weekly( self, timing: TimingWeeklyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs ) -> Job: r""" Schedule a weekly `Job`. Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly recurring |AioJob|. Combine multiple desired `tuples` in a `list`. If the planed execution time is `00:00` the `datetime.time` object can be ignored, just pass a `Weekday` without a `tuple`. Parameters ---------- timing : TimingWeeklyUnion Desired execution time(s). handle : Callable[..., Coroutine[Any, Any, None]] Handle to a callback function. Returns ------- Job Instance of a scheduled |AioJob|. Other Parameters ---------------- **kwargs |AioJob| properties, optional `kwargs` are used to specify |AioJob| properties. Here is a list of available |AioJob| properties: .. include:: ../_assets/aio_kwargs.rst """ try: tg.check_type(timing, TimingWeeklyUnion) except tg.TypeCheckError as err: raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
[docs] def once( self, timing: TimingOnceUnion, handle: Callable[..., Coroutine[Any, Any, None]], *, args: Optional[tuple[Any, ...]] = None, kwargs: Optional[dict[str, Any]] = None, tags: Optional[Iterable[str]] = None, alias: Optional[str] = None, ) -> Job: r""" Schedule a oneshot `Job`. Parameters ---------- timing : TimingOnceUnion Desired execution time. handle : Callable[..., Coroutine[Any, Any, None]] Handle to a callback function. args : tuple[Any] Positional argument payload for the function handle within a |AioJob|. kwargs : Optional[dict[str, Any]] Keyword arguments payload for the function handle within a |AioJob|. tags : Optional[Iterable[str]] The tags of the |AioJob|. alias : Optional[str] Overwrites the function handle name in the string representation. Returns ------- Job Instance of a scheduled |AioJob|. """ try: tg.check_type(timing, TimingOnceUnion) except tg.TypeCheckError as err: raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err if isinstance(timing, dt.datetime): return self.__schedule( job_type=JobType.CYCLIC, timing=dt.timedelta(), handle=handle, args=args, kwargs=kwargs, max_attempts=1, tags=set(tags) if tags else set(), alias=alias, delay=False, start=timing, ) return self.__schedule( job_type=JOB_TYPE_MAPPING[type(timing)], timing=timing, handle=handle, args=args, kwargs=kwargs, max_attempts=1, tags=tags, alias=alias, )
@property def jobs(self) -> set[Job]: r""" Get the set of all `Job`\ s. Returns ------- set[Job] Currently scheduled |AioJob|\ s. """ return set(self._jobs.keys())