"""
Implementation of a `BaseScheduler`.
Author: Jendrik A. Potyka, Fabian A. Preiss
"""
import warnings
from abc import ABC, abstractmethod
from logging import Logger, getLogger
from typing import Any, Callable, Optional
from scheduler.base.job import BaseJob, BaseJobType
from scheduler.base.timingtype import (
TimingCyclic,
TimingDailyUnion,
TimingOnceUnion,
TimingWeeklyUnion,
)
LOGGER = getLogger("scheduler")
[docs]def select_jobs_by_tag(
jobs: set[BaseJobType],
tags: set[str],
any_tag: bool,
) -> set[BaseJobType]:
r"""
Select |BaseJob|\ s by matching `tags`.
Parameters
----------
jobs : set[BaseJob]
Unfiltered set of |BaseJob|\ s.
tags : set[str]
Tags to filter |BaseJob|\ s.
any_tag : bool
False: To match a |BaseJob| all tags have to match.
True: To match a |BaseJob| at least one tag has to match.
Returns
-------
set[BaseJob]
Selected |BaseJob|\ s.
"""
if any_tag:
return {job for job in jobs if tags & job.tags}
return {job for job in jobs if tags <= job.tags}
def _warn_deprecated_delay(delay: Optional[bool] = None, **kwargs):
if delay is not None:
warnings.warn(
(
"Using the `delay` argument is deprecated and will "
"be removed in the next minor release."
),
DeprecationWarning,
stacklevel=3,
)
[docs]class BaseScheduler(ABC): # NOTE maybe a typing Protocol class is better than an ABC class
"""
Interface definition of an abstract scheduler.
Author: Jendrik A. Potyka, Fabian A. Preiss
"""
__logger: Logger
def __init__(self, logger: Optional[Logger] = None) -> None:
self.__logger = logger if logger else LOGGER
[docs] @abstractmethod
def delete_job(self, job: BaseJob) -> None:
"""Delete a |BaseJob| from the `BaseScheduler`."""
[docs] @abstractmethod
def delete_jobs(
self,
tags: Optional[set[str]] = None,
any_tag: bool = False,
) -> int:
r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
[docs] @abstractmethod
def get_jobs(
self,
tags: Optional[set[str]] = None,
any_tag: bool = False,
) -> set[BaseJob]:
r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
[docs] @abstractmethod
def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> BaseJob:
"""Schedule a cyclic |BaseJob|."""
[docs] @abstractmethod
def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
"""Schedule a minutely |BaseJob|."""
[docs] @abstractmethod
def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
"""Schedule an hourly |BaseJob|."""
[docs] @abstractmethod
def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
"""Schedule a daily |BaseJob|."""
[docs] @abstractmethod
def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> BaseJob:
"""Schedule a weekly |BaseJob|."""
[docs] @abstractmethod
def once(
self,
timing: TimingOnceUnion,
handle: Callable[..., None],
*,
args: tuple[Any] = None,
kwargs: Optional[dict[str, Any]] = None,
tags: Optional[list[str]] = None,
) -> BaseJob:
"""Schedule a oneshot |BaseJob|."""
@property
@abstractmethod
def jobs(self) -> set[BaseJob]:
r"""Get the set of all |BaseJob|\ s."""