"""Implementation of a `BaseScheduler`.
Author: Jendrik A. Potyka, Fabian A. Preiss
"""
import warnings
from abc import ABC, abstractmethod
from collections.abc import Iterable
from functools import wraps
from logging import Logger, getLogger
from typing import Any, Callable, Generic, List, Optional, TypeVar
from scheduler.base.job import BaseJobType
from scheduler.base.timingtype import (
TimingCyclic,
TimingDailyUnion,
TimingOnceUnion,
TimingWeeklyUnion,
)
# TODO:
# import sys
# if sys.version_info < (3, 10):
# from typing_extensions import ParamSpec
# else:
# from typing import ParamSpec
LOGGER = getLogger("scheduler")
[docs]def select_jobs_by_tag(
jobs: set[BaseJobType],
tags: set[str],
any_tag: bool,
) -> set[BaseJobType]:
r"""
Select |BaseJob|\ s by matching `tags`.
Parameters
----------
jobs : set[BaseJob]
Unfiltered set of |BaseJob|\ s.
tags : set[str]
Tags to filter |BaseJob|\ s.
any_tag : bool
False: To match a |BaseJob| all tags have to match.
True: To match a |BaseJob| at least one tag has to match.
Returns
-------
set[BaseJob]
Selected |BaseJob|\ s.
"""
if any_tag:
return {job for job in jobs if tags & job.tags}
return {job for job in jobs if tags <= job.tags}
[docs]def deprecated(fields: List[str]) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Decorator for marking specified function arguments as deprecated.
Parameters
----------
fields : List[str]
A list of strings representing the names of the function arguments that are deprecated.
Examples
--------
.. code-block:: python
@deprecated(["old_arg"])
def some_function(new_arg, old_arg=None):
pass
Calling `some_function(new_arg=5, old_arg=3)` generates a deprecation warning for using 'old_arg'.
"""
def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def real_wrapper(*args: tuple[Any, ...], **kwargs: dict[str, Any]) -> Any:
for f in fields:
if f in kwargs and kwargs[f] is not None:
# keep it in kwargs
warnings.warn(
(
f"Using the `{f}` argument is deprecated and will "
"be removed in the next minor release."
),
DeprecationWarning,
stacklevel=3,
)
return func(*args, **kwargs)
return real_wrapper
return wrapper
T = TypeVar("T", bound=Callable[[], Any])
[docs]class BaseScheduler(
ABC, Generic[BaseJobType, T]
): # NOTE maybe a typing Protocol class is better than an ABC class
"""
Interface definition of an abstract scheduler.
Author: Jendrik A. Potyka, Fabian A. Preiss
"""
_logger: Logger
def __init__(self, logger: Optional[Logger] = None) -> None:
self._logger = logger if logger else LOGGER
[docs] @abstractmethod
def delete_job(self, job: BaseJobType) -> None:
"""Delete a |BaseJob| from the `BaseScheduler`."""
[docs] @abstractmethod
def delete_jobs(
self,
tags: Optional[set[str]] = None,
any_tag: bool = False,
) -> int:
r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
[docs] @abstractmethod
def get_jobs(
self,
tags: Optional[set[str]] = None,
any_tag: bool = False,
) -> set[BaseJobType]:
r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags."""
[docs] @abstractmethod
def cyclic(self, timing: TimingCyclic, handle: T, **kwargs) -> BaseJobType:
"""Schedule a cyclic |BaseJob|."""
[docs] @abstractmethod
def minutely(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
"""Schedule a minutely |BaseJob|."""
[docs] @abstractmethod
def hourly(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
"""Schedule an hourly |BaseJob|."""
[docs] @abstractmethod
def daily(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType:
"""Schedule a daily |BaseJob|."""
[docs] @abstractmethod
def weekly(self, timing: TimingWeeklyUnion, handle: T, **kwargs) -> BaseJobType:
"""Schedule a weekly |BaseJob|."""
[docs] @abstractmethod
def once(
self,
timing: TimingOnceUnion,
handle: T,
*,
args: Optional[tuple[Any]] = None,
kwargs: Optional[dict[str, Any]] = None,
tags: Optional[Iterable[str]] = None,
alias: Optional[str] = None,
) -> BaseJobType:
"""Schedule a oneshot |BaseJob|."""
@property
@abstractmethod
def jobs(self) -> set[BaseJobType]:
r"""Get the set of all |BaseJob|\ s."""