"""
Implementation of a `threading` compatible in-process scheduler.
Author: Jendrik A. Potyka, Fabian A. Preiss
"""
import datetime as dt
import queue
import threading
from logging import Logger
from typing import Any, Callable, Optional
import typeguard as tg
from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
from scheduler.base.job import BaseJob
from scheduler.base.scheduler import (
BaseScheduler,
_warn_deprecated_delay,
select_jobs_by_tag,
)
from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff
from scheduler.base.timingtype import (
TimingCyclic,
TimingDailyUnion,
TimingOnceUnion,
TimingWeeklyUnion,
)
from scheduler.error import SchedulerError
from scheduler.message import (
CYCLIC_TYPE_ERROR_MSG,
DAILY_TYPE_ERROR_MSG,
HOURLY_TYPE_ERROR_MSG,
MINUTELY_TYPE_ERROR_MSG,
ONCE_TYPE_ERROR_MSG,
TZ_ERROR_MSG,
WEEKLY_TYPE_ERROR_MSG,
)
from scheduler.prioritization import linear_priority_function
from scheduler.threading.job import Job
def _exec_job_worker(que: queue.Queue[Job], logger: Logger):
running = True
while running:
try:
job = que.get(block=False)
except queue.Empty:
running = False
else:
job._exec(logger=logger) # pylint: disable=protected-access
que.task_done()
[docs]class Scheduler(BaseScheduler):
r"""
Implementation of a scheduler for callback functions.
This implementation enables the planning of |Job|\ s depending on time
cycles, fixed times, weekdays, dates, offsets, execution counts and weights.
Notes
-----
Due to the support of `datetime` objects, `scheduler` is able to work
with timezones.
Parameters
----------
tzinfo : datetime.tzinfo
Set the timezone of the |Scheduler|.
max_exec : int
Limits the number of overdue |Job|\ s that can be executed
by calling function `Scheduler.exec_jobs()`.
priority_function : Callable[[float, Job, int, int], float]
A function handle to compute the priority of a |Job| depending
on the time it is overdue and its respective weight. Defaults to a linear
priority function.
jobs : set[Job]
A collection of job instances.
n_threads : int
The number of worker threads. 0 for unlimited, default 1.
logger : Optional[logging.Logger]
A custom Logger instance.
"""
def __init__(
self,
*,
max_exec: int = 0,
tzinfo: Optional[dt.tzinfo] = None,
priority_function: Callable[
[float, BaseJob, int, int],
float,
] = linear_priority_function,
jobs: Optional[set[Job]] = None,
n_threads: int = 1,
logger: Optional[Logger] = None,
):
super().__init__(logger=logger)
self.__max_exec = max_exec
self.__tzinfo = tzinfo
self.__priority_function = priority_function
self.__jobs_lock = threading.RLock()
self.__jobs: set[Job] = jobs or set()
for job in self.__jobs:
if job._tzinfo != self.__tzinfo:
raise SchedulerError(TZ_ERROR_MSG)
self.__n_threads = n_threads
self.__tz_str = check_tzname(tzinfo=tzinfo)
def __repr__(self) -> str:
with self.__jobs_lock:
return "scheduler.Scheduler({0}, jobs={{{1}}})".format(
", ".join(
(
repr(elem)
for elem in (
self.__max_exec,
self.__tzinfo,
self.__priority_function,
)
)
),
", ".join([repr(job) for job in sorted(self.jobs)]),
)
def __str__(self) -> str:
with self.__jobs_lock:
# Scheduler meta heading
scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings())
# Job table (we join two of the Job._repr() fields into one)
# columns
c_align = ("<", "<", "<", "<", ">", ">", ">")
c_width = (8, 16, 19, 12, 9, 13, 6)
c_name = (
"type",
"function / alias",
"due at",
"tzinfo",
"due in",
"attempts",
"weight",
)
form = [
f"{{{idx}:{align}{width}}}"
for idx, (align, width) in enumerate(zip(c_align, c_width))
]
if self.__tz_str is None:
form = form[:3] + form[4:]
fstring = " ".join(form) + "\n"
job_table = fstring.format(*c_name) + fstring.format(
*("-" * width for width in c_width)
)
for job in sorted(self.jobs):
row = job._str()
entries = (
row[0],
str_cutoff(row[1] + row[2], c_width[1], False),
row[3],
str_cutoff(row[4] or "", c_width[3], False),
str_cutoff(row[5], c_width[4], True),
str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
str_cutoff(f"{job.weight}", c_width[6], True),
)
job_table += fstring.format(*entries)
return scheduler_headings + job_table
def __headings(self) -> list[str]:
with self.__jobs_lock:
headings = [
f"max_exec={self.__max_exec if self.__max_exec else float('inf')}",
f"tzinfo={self.__tz_str}",
f"priority_function={self.__priority_function.__name__}",
f"#jobs={len(self.__jobs)}",
]
return headings
def __schedule(
self,
**kwargs,
) -> Job:
"""Encapsulate the `Job` and add the `Scheduler`'s timezone."""
job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
if job.has_attempts_remaining:
with self.__jobs_lock:
self.__jobs.add(job)
return job
def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int:
n_jobs = len(jobs)
que: queue.Queue[Job] = queue.Queue()
for job in jobs:
que.put(job)
workers = []
for _ in range(self.__n_threads or n_jobs):
worker = threading.Thread(
target=_exec_job_worker, args=(que, self._BaseScheduler__logger)
)
worker.daemon = True
worker.start()
workers.append(worker)
que.join()
for worker in workers:
worker.join()
for job in jobs:
job._calc_next_exec(ref_dt) # pylint: disable=protected-access
if not job.has_attempts_remaining:
self.delete_job(job)
return n_jobs
[docs] def exec_jobs(self, force_exec_all: bool = False) -> int:
r"""
Execute scheduled `Job`\ s.
By default executes the |Job|\ s that are overdue.
|Job|\ s are executed in order of their priority
:ref:`examples.weights`. If the |Scheduler| instance
has a limit on the job execution counts per call of
:func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument,
|Job|\ s of lower priority might not get executed when
competing |Job|\ s are overdue.
Parameters
----------
force_exec_all : bool
Ignore the both - the status of the |Job| timers
as well as the execution limit of the |Scheduler|
Returns
-------
int
Number of executed |Job|\ s.
"""
ref_dt = dt.datetime.now(tz=self.__tzinfo)
if force_exec_all:
return self.__exec_jobs(list(self.__jobs), ref_dt)
# collect the current priority for all jobs
job_priority: dict[Job, float] = {}
n_jobs = len(self.__jobs)
with self.__jobs_lock:
for job in self.__jobs:
delta_seconds = job.timedelta(ref_dt).total_seconds()
job_priority[job] = self.__priority_function(
-delta_seconds,
job,
self.__max_exec,
n_jobs,
)
# sort the jobs by priority
sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore
# filter jobs by max_exec and priority greater zero
filtered_jobs = [
job
for idx, job in enumerate(sorted_jobs)
if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0
]
return self.__exec_jobs(filtered_jobs, ref_dt)
[docs] def delete_job(self, job: Job) -> None:
"""
Delete a `Job` from the `Scheduler`.
Parameters
----------
job : Job
|Job| instance to delete.
Raises
------
SchedulerError
Raises if the |Job| of the argument is not scheduled.
"""
try:
with self.__jobs_lock:
self.__jobs.remove(job)
except KeyError:
raise SchedulerError("An unscheduled Job can not be deleted!") from None
[docs] def delete_jobs(
self,
tags: Optional[set[str]] = None,
any_tag: bool = False,
) -> int:
r"""
Delete a set of |Job|\ s from the |Scheduler| by tags.
If no tags or an empty set of tags are given defaults to the deletion
of all |Job|\ s.
Parameters
----------
tags : Optional[set[str]]
Set of tags to identify target |Job|\ s.
any_tag : bool
False: To delete a |Job| all tags have to match.
True: To deleta a |Job| at least one tag has to match.
"""
with self.__jobs_lock:
if tags is None or tags == {}:
n_jobs = len(self.__jobs)
self.__jobs = set()
return n_jobs
to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag)
self.__jobs = self.__jobs - to_delete
return len(to_delete)
[docs] def get_jobs(
self,
tags: Optional[set[str]] = None,
any_tag: bool = False,
) -> set[Job]:
r"""
Get a set of |Job|\ s from the |Scheduler| by tags.
If no tags or an empty set of tags are given defaults to returning
all |Job|\ s.
Parameters
----------
tags : set[str]
Tags to filter scheduled |Job|\ s.
If no tags are given all |Job|\ s are returned.
any_tag : bool
False: To match a |Job| all tags have to match.
True: To match a |Job| at least one tag has to match.
Returns
-------
set[Job]
Currently scheduled |Job|\ s.
"""
with self.__jobs_lock:
if tags is None or tags == {}:
return self.__jobs.copy()
return select_jobs_by_tag(self.__jobs, tags, any_tag)
[docs] def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs):
r"""
Schedule a cyclic `Job`.
Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
to schedule a cyclic |Job|.
Parameters
----------
timing : TimingTypeCyclic
Desired execution time.
handle : Callable[..., None]
Handle to a callback function.
Returns
-------
Job
Instance of a scheduled |Job|.
Other Parameters
----------------
**kwargs
|Job| properties, optional
`kwargs` are used to specify |Job| properties.
Here is a list of available |Job| properties:
.. include:: ../_assets/kwargs.rst
"""
_warn_deprecated_delay(**kwargs)
try:
tg.check_type(timing, TimingCyclic)
except tg.TypeCheckError as err:
raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
[docs] def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
r"""
Schedule a minutely `Job`.
Use a `datetime.time` object or a `list` of `datetime.time` objects
to schedule a |Job| every minute.
Notes
-----
If given a `datetime.time` object with a non zero hour or minute property, these
information will be ignored.
Parameters
----------
timing : TimingDailyUnion
Desired execution time(s).
handle : Callable[..., None]
Handle to a callback function.
Returns
-------
Job
Instance of a scheduled |Job|.
Other Parameters
----------------
**kwargs
|Job| properties, optional
`kwargs` are used to specify |Job| properties.
Here is a list of available |Job| properties:
.. include:: ../_assets/kwargs.rst
"""
_warn_deprecated_delay(**kwargs)
try:
tg.check_type(timing, TimingDailyUnion)
except tg.TypeCheckError as err:
raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
[docs] def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
r"""
Schedule an hourly `Job`.
Use a `datetime.time` object or a `list` of `datetime.time` objects
to schedule a |Job| every hour.
Notes
-----
If given a `datetime.time` object with a non zero hour property, this information
will be ignored.
Parameters
----------
timing : TimingDailyUnion
Desired execution time(s).
handle : Callable[..., None]
Handle to a callback function.
Returns
-------
Job
Instance of a scheduled |Job|.
Other Parameters
----------------
**kwargs
|Job| properties, optional
`kwargs` are used to specify |Job| properties.
Here is a list of available |Job| properties:
.. include:: ../_assets/kwargs.rst
"""
_warn_deprecated_delay(**kwargs)
try:
tg.check_type(timing, TimingDailyUnion)
except tg.TypeCheckError as err:
raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
[docs] def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs):
r"""
Schedule a daily `Job`.
Use a `datetime.time` object or a `list` of `datetime.time` objects
to schedule a |Job| every day.
Parameters
----------
timing : TimingDailyUnion
Desired execution time(s).
handle : Callable[..., None]
Handle to a callback function.
Returns
-------
Job
Instance of a scheduled |Job|.
Other Parameters
----------------
**kwargs
|Job| properties, optional
`kwargs` are used to specify |Job| properties.
Here is a list of available |Job| properties:
.. include:: ../_assets/kwargs.rst
"""
_warn_deprecated_delay(**kwargs)
try:
tg.check_type(timing, TimingDailyUnion)
except tg.TypeCheckError as err:
raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
[docs] def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs):
r"""
Schedule a weekly `Job`.
Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
recurring |Job|. Combine multiple desired `tuples` in
a `list`. If the planed execution time is `00:00` the `datetime.time` object
can be ignored, just pass a `Weekday` without a `tuple`.
Parameters
----------
timing : TimingWeeklyUnion
Desired execution time(s).
handle : Callable[..., None]
Handle to a callback function.
Returns
-------
Job
Instance of a scheduled |Job|.
Other Parameters
----------------
**kwargs
|Job| properties, optional
`kwargs` are used to specify |Job| properties.
Here is a list of available |Job| properties:
.. include:: ../_assets/kwargs.rst
"""
_warn_deprecated_delay(**kwargs)
try:
tg.check_type(timing, TimingWeeklyUnion)
except tg.TypeCheckError as err:
raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
[docs] def once( # pylint: disable=arguments-differ
self,
timing: TimingOnceUnion,
handle: Callable[..., None],
*,
args: tuple[Any] = None,
kwargs: Optional[dict[str, Any]] = None,
tags: Optional[list[str]] = None,
alias: str = None,
weight: float = 1,
):
r"""
Schedule a oneshot `Job`.
Parameters
----------
timing : TimingOnceUnion
Desired execution time.
handle : Callable[..., None]
Handle to a callback function.
args : tuple[Any]
Positional argument payload for the function handle within a |Job|.
kwargs : Optional[dict[str, Any]]
Keyword arguments payload for the function handle within a |Job|.
tags : Optional[set[str]]
The tags of the |Job|.
alias : Optional[str]
Overwrites the function handle name in the string representation.
weight : float
Relative weight against other |Job|\ s.
Returns
-------
Job
Instance of a scheduled |Job|.
"""
try:
tg.check_type(timing, TimingOnceUnion)
except tg.TypeCheckError as err:
raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
if isinstance(timing, dt.datetime):
return self.__schedule(
job_type=JobType.CYCLIC,
timing=dt.timedelta(),
handle=handle,
args=args,
kwargs=kwargs,
max_attempts=1,
tags=tags,
alias=alias,
weight=weight,
delay=False,
start=timing,
)
return self.__schedule(
job_type=JOB_TYPE_MAPPING[type(timing)],
timing=timing,
handle=handle,
args=args,
kwargs=kwargs,
max_attempts=1,
tags=tags,
alias=alias,
weight=weight,
)
@property
def jobs(self) -> set[Job]:
r"""
Get the set of all `Job`\ s.
Returns
-------
set[Job]
Currently scheduled |Job|\ s.
"""
return self.__jobs.copy()