Coverage for scheduler/threading/job.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-03-19 22:07 +0000

1""" 

2Implementation of job for the `threading` scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import datetime as dt 

8import threading 

9from logging import Logger 

10from typing import Any, Callable, Optional 

11 

12from scheduler.base.definition import JobType 

13from scheduler.base.job import BaseJob 

14from scheduler.base.timingtype import TimingJobUnion 

15 

16 

17class Job(BaseJob): 

18 r""" 

19 |Job| class bundling time and callback function methods. 

20 

21 Parameters 

22 ---------- 

23 job_type : JobType 

24 Indicator which defines which calculations has to be used. 

25 timing : TimingWeekly 

26 Desired execution time(s). 

27 handle : Callable[..., None] 

28 Handle to a callback function. 

29 args : tuple[Any] 

30 Positional argument payload for the function handle within a |Job|. 

31 kwargs : Optional[dict[str, Any]] 

32 Keyword arguments payload for the function handle within a |Job|. 

33 max_attempts : Optional[int] 

34 Number of times the |Job| will be executed where ``0 <=> inf``. 

35 A |Job| with no free attempt will be deleted. 

36 tags : Optional[set[str]] 

37 The tags of the |Job|. 

38 delay : Optional[bool] 

39 *Deprecated*: If ``True`` wait with the execution for the next scheduled time. 

40 start : Optional[datetime.datetime] 

41 Set the reference `datetime.datetime` stamp the |Job| 

42 will be scheduled against. Default value is `datetime.datetime.now()`. 

43 stop : Optional[datetime.datetime] 

44 Define a point in time after which a |Job| will be stopped 

45 and deleted. 

46 skip_missing : Optional[bool] 

47 If ``True`` a |Job| will only schedule it's newest planned 

48 execution and drop older ones. 

49 alias : Optional[str] 

50 Overwrites the function handle name in the string representation. 

51 tzinfo : Optional[datetime.tzinfo] 

52 Set the timezone of the |Scheduler| the |Job| 

53 is scheduled in. 

54 weight : Optional[float] 

55 Relative `weight` against other |Job|\ s. 

56 

57 Returns 

58 ------- 

59 Job 

60 Instance of a scheduled |Job|. 

61 """ 

62 

63 __weight: float 

64 __lock: threading.RLock 

65 

66 def __init__( 

67 self, 

68 job_type: JobType, 

69 timing: TimingJobUnion, 

70 handle: Callable[..., None], 

71 *, 

72 args: Optional[tuple[Any]] = None, 

73 kwargs: Optional[dict[str, Any]] = None, 

74 max_attempts: int = 0, 

75 tags: Optional[set[str]] = None, 

76 delay: bool = True, 

77 start: Optional[dt.datetime] = None, 

78 stop: Optional[dt.datetime] = None, 

79 skip_missing: bool = False, 

80 alias: str = None, 

81 tzinfo: Optional[dt.tzinfo] = None, 

82 weight: float = 1, 

83 ): 

84 super().__init__( 

85 job_type, 

86 timing, 

87 handle, 

88 args=args, 

89 kwargs=kwargs, 

90 max_attempts=max_attempts, 

91 tags=tags, 

92 delay=delay, 

93 start=start, 

94 stop=stop, 

95 skip_missing=skip_missing, 

96 alias=alias, 

97 tzinfo=tzinfo, 

98 ) 

99 self.__lock = threading.RLock() 

100 self.__weight = weight 

101 

102 # pylint: disable=no-member invalid-name 

103 

104 def _exec(self, logger: Logger) -> None: 

105 """Execute the callback function.""" 

106 with self.__lock: 

107 try: 

108 self._BaseJob__handle(*self._BaseJob__args, **self._BaseJob__kwargs) # type: ignore 

109 except Exception as err: 

110 logger.error("Unhandled exception `%s` in `%r`!", err, self) 

111 self._BaseJob__failed_attempts += 1 # type: ignore 

112 self._BaseJob__attempts += 1 # type: ignore 

113 

114 # pylint: enable=no-member invalid-name 

115 

116 def _calc_next_exec(self, ref_dt: dt.datetime) -> None: 

117 with self.__lock: 

118 super()._calc_next_exec(ref_dt) 

119 

120 def __repr__(self) -> str: 

121 with self.__lock: 

122 params: tuple[str, ...] = self._repr() 

123 params_sum: str = ", ".join(params[:6] + (repr(self.__weight),) + params[6:]) 

124 return f"scheduler.Job({params_sum})" 

125 

126 def __str__(self) -> str: 

127 return f"{super().__str__()}, w={self.weight:.3g}" 

128 

129 def timedelta(self, dt_stamp: Optional[dt.datetime] = None) -> dt.timedelta: 

130 with self.__lock: 

131 return super().timedelta(dt_stamp) 

132 

133 @property 

134 def datetime(self) -> dt.datetime: 

135 with self.__lock: 

136 return super().datetime 

137 

138 @property 

139 def weight(self) -> float: 

140 """ 

141 Return the weight of the `Job` instance. 

142 

143 Returns 

144 ------- 

145 float 

146 |Job| `weight`. 

147 """ 

148 return self.__weight 

149 

150 @property 

151 def has_attempts_remaining(self) -> bool: 

152 with self.__lock: 

153 return super().has_attempts_remaining