Coverage for scheduler/asyncio/job.py: 100%
16 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
1"""
2Implementation of job for the `asyncio` scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9from logging import Logger
10from typing import Any, Callable, Coroutine
12from scheduler.base.job import BaseJob
15class Job(BaseJob[Callable[..., Coroutine[Any, Any, None]]]):
16 r"""
17 |AioJob| class bundling time and callback function methods.
19 Parameters
20 ----------
21 job_type : JobType
22 Indicator which defines which calculations has to be used.
23 timing : TimingWeekly
24 Desired execution time(s).
25 handle : Callable[..., None]
26 Handle to a callback function.
27 args : tuple[Any]
28 Positional argument payload for the function handle within a |AioJob|.
29 kwargs : Optional[dict[str, Any]]
30 Keyword arguments payload for the function handle within a |AioJob|.
31 max_attempts : Optional[int]
32 Number of times the |AioJob| will be executed where ``0 <=> inf``.
33 A |AioJob| with no free attempt will be deleted.
34 tags : Optional[set[str]]
35 The tags of the |AioJob|.
36 delay : Optional[bool]
37 *Deprecated*: If ``True`` wait with the execution for the next scheduled time.
38 start : Optional[datetime.datetime]
39 Set the reference `datetime.datetime` stamp the |AioJob|
40 will be scheduled against. Default value is `datetime.datetime.now()`.
41 stop : Optional[datetime.datetime]
42 Define a point in time after which a |AioJob| will be stopped
43 and deleted.
44 skip_missing : Optional[bool]
45 If ``True`` a |AioJob| will only schedule it's newest planned
46 execution and drop older ones.
47 alias : Optional[str]
48 Overwrites the function handle name in the string representation.
49 tzinfo : Optional[datetime.tzinfo]
50 Set the timezone of the |AioScheduler| the |AioJob|
51 is scheduled in.
53 Returns
54 -------
55 Job
56 Instance of a scheduled |AioJob|.
57 """
59 # pylint: disable=no-member invalid-name
61 async def _exec(self, logger: Logger) -> None:
62 coroutine = self._BaseJob__handle(*self._BaseJob__args, **self._BaseJob__kwargs) # type: ignore
63 try:
64 await coroutine
65 except Exception:
66 logger.exception("Unhandled exception in `%r`!", self)
67 self._BaseJob__failed_attempts += 1 # type: ignore
68 self._BaseJob__attempts += 1 # type: ignore
70 # pylint: enable=no-member invalid-name
72 def __repr__(self) -> str:
73 params: tuple[str, ...] = self._repr()
74 return f"scheduler.asyncio.job.Job({', '.join(params)})"