Coverage for scheduler/asyncio/job.py: 100%

15 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-03-19 22:07 +0000

1""" 

2Implementation of job for the `asyncio` scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9from logging import Logger 

10 

11from scheduler.base.job import BaseJob 

12 

13 

14class Job(BaseJob): 

15 r""" 

16 |AioJob| class bundling time and callback function methods. 

17 

18 Parameters 

19 ---------- 

20 job_type : JobType 

21 Indicator which defines which calculations has to be used. 

22 timing : TimingWeekly 

23 Desired execution time(s). 

24 handle : Callable[..., None] 

25 Handle to a callback function. 

26 args : tuple[Any] 

27 Positional argument payload for the function handle within a |AioJob|. 

28 kwargs : Optional[dict[str, Any]] 

29 Keyword arguments payload for the function handle within a |AioJob|. 

30 max_attempts : Optional[int] 

31 Number of times the |AioJob| will be executed where ``0 <=> inf``. 

32 A |AioJob| with no free attempt will be deleted. 

33 tags : Optional[set[str]] 

34 The tags of the |AioJob|. 

35 delay : Optional[bool] 

36 *Deprecated*: If ``True`` wait with the execution for the next scheduled time. 

37 start : Optional[datetime.datetime] 

38 Set the reference `datetime.datetime` stamp the |AioJob| 

39 will be scheduled against. Default value is `datetime.datetime.now()`. 

40 stop : Optional[datetime.datetime] 

41 Define a point in time after which a |AioJob| will be stopped 

42 and deleted. 

43 skip_missing : Optional[bool] 

44 If ``True`` a |AioJob| will only schedule it's newest planned 

45 execution and drop older ones. 

46 alias : Optional[str] 

47 Overwrites the function handle name in the string representation. 

48 tzinfo : Optional[datetime.tzinfo] 

49 Set the timezone of the |AioScheduler| the |AioJob| 

50 is scheduled in. 

51 

52 Returns 

53 ------- 

54 Job 

55 Instance of a scheduled |AioJob|. 

56 """ 

57 

58 # pylint: disable=no-member invalid-name 

59 

60 async def _exec(self, logger: Logger): 

61 coroutine = self._BaseJob__handle(*self._BaseJob__args, **self._BaseJob__kwargs) 

62 try: 

63 await coroutine 

64 except Exception as err: 

65 logger.error("Unhandled exception `%s` in `%r`!", err, self) 

66 self._BaseJob__failed_attempts += 1 

67 self._BaseJob__attempts += 1 

68 

69 # pylint: enable=no-member invalid-name 

70 

71 def __repr__(self) -> str: 

72 params: tuple[str, ...] = self._repr() 

73 return f"scheduler.asyncio.job.Job({', '.join(params)})"