Coverage for scheduler/asyncio/job.py: 100%
15 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:15 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-12-10 20:15 +0000
1"""
2Implementation of job for the `asyncio` scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9from logging import Logger
11from scheduler.base.job import BaseJob
14class Job(BaseJob):
15 r"""
16 |AioJob| class bundling time and callback function methods.
18 Parameters
19 ----------
20 job_type : JobType
21 Indicator which defines which calculations has to be used.
22 timing : TimingWeekly
23 Desired execution time(s).
24 handle : Callable[..., None]
25 Handle to a callback function.
26 args : tuple[Any]
27 Positional argument payload for the function handle within a |AioJob|.
28 kwargs : Optional[dict[str, Any]]
29 Keyword arguments payload for the function handle within a |AioJob|.
30 max_attempts : Optional[int]
31 Number of times the |AioJob| will be executed where ``0 <=> inf``.
32 A |AioJob| with no free attempt will be deleted.
33 tags : Optional[set[str]]
34 The tags of the |AioJob|.
35 delay : Optional[bool]
36 *Deprecated*: If ``True`` wait with the execution for the next scheduled time.
37 start : Optional[datetime.datetime]
38 Set the reference `datetime.datetime` stamp the |AioJob|
39 will be scheduled against. Default value is `datetime.datetime.now()`.
40 stop : Optional[datetime.datetime]
41 Define a point in time after which a |AioJob| will be stopped
42 and deleted.
43 skip_missing : Optional[bool]
44 If ``True`` a |AioJob| will only schedule it's newest planned
45 execution and drop older ones.
46 alias : Optional[str]
47 Overwrites the function handle name in the string representation.
48 tzinfo : Optional[datetime.tzinfo]
49 Set the timezone of the |AioScheduler| the |AioJob|
50 is scheduled in.
52 Returns
53 -------
54 Job
55 Instance of a scheduled |AioJob|.
56 """
58 # pylint: disable=no-member invalid-name
60 async def _exec(self, logger: Logger):
61 coroutine = self._BaseJob__handle(*self._BaseJob__args, **self._BaseJob__kwargs)
62 try:
63 await coroutine
64 except Exception as err:
65 logger.exception("Unhandled exception `%s` in `%r`!", err, self)
66 self._BaseJob__failed_attempts += 1
67 self._BaseJob__attempts += 1
69 # pylint: enable=no-member invalid-name
71 def __repr__(self) -> str:
72 params: tuple[str, ...] = self._repr()
73 return f"scheduler.asyncio.job.Job({', '.join(params)})"