Coverage for scheduler/asyncio/job.py: 100%

16 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-21 13:55 +0000

1""" 

2Implementation of job for the `asyncio` scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9from logging import Logger 

10from typing import Any, Callable, Coroutine 

11 

12from scheduler.base.job import BaseJob 

13 

14 

15class Job(BaseJob[Callable[..., Coroutine[Any, Any, None]]]): 

16 r""" 

17 |AioJob| class bundling time and callback function methods. 

18 

19 Parameters 

20 ---------- 

21 job_type : JobType 

22 Indicator which defines which calculations has to be used. 

23 timing : TimingWeekly 

24 Desired execution time(s). 

25 handle : Callable[..., None] 

26 Handle to a callback function. 

27 args : tuple[Any] 

28 Positional argument payload for the function handle within a |AioJob|. 

29 kwargs : Optional[dict[str, Any]] 

30 Keyword arguments payload for the function handle within a |AioJob|. 

31 max_attempts : Optional[int] 

32 Number of times the |AioJob| will be executed where ``0 <=> inf``. 

33 A |AioJob| with no free attempt will be deleted. 

34 tags : Optional[set[str]] 

35 The tags of the |AioJob|. 

36 delay : Optional[bool] 

37 *Deprecated*: If ``True`` wait with the execution for the next scheduled time. 

38 start : Optional[datetime.datetime] 

39 Set the reference `datetime.datetime` stamp the |AioJob| 

40 will be scheduled against. Default value is `datetime.datetime.now()`. 

41 stop : Optional[datetime.datetime] 

42 Define a point in time after which a |AioJob| will be stopped 

43 and deleted. 

44 skip_missing : Optional[bool] 

45 If ``True`` a |AioJob| will only schedule it's newest planned 

46 execution and drop older ones. 

47 alias : Optional[str] 

48 Overwrites the function handle name in the string representation. 

49 tzinfo : Optional[datetime.tzinfo] 

50 Set the timezone of the |AioScheduler| the |AioJob| 

51 is scheduled in. 

52 

53 Returns 

54 ------- 

55 Job 

56 Instance of a scheduled |AioJob|. 

57 """ 

58 

59 # pylint: disable=no-member invalid-name 

60 

61 async def _exec(self, logger: Logger) -> None: 

62 coroutine = self._BaseJob__handle(*self._BaseJob__args, **self._BaseJob__kwargs) # type: ignore 

63 try: 

64 await coroutine 

65 except Exception: 

66 logger.exception("Unhandled exception in `%r`!", self) 

67 self._BaseJob__failed_attempts += 1 # type: ignore 

68 self._BaseJob__attempts += 1 # type: ignore 

69 

70 # pylint: enable=no-member invalid-name 

71 

72 def __repr__(self) -> str: 

73 params: tuple[str, ...] = self._repr() 

74 return f"scheduler.asyncio.job.Job({', '.join(params)})"