Coverage for scheduler/base/scheduler.py: 100%

47 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-12-10 21:31 +0000

1""" 

2Implementation of a `BaseScheduler`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import warnings 

8from abc import ABC, abstractmethod 

9from functools import wraps 

10from logging import Logger, getLogger 

11from typing import Any, Callable, List, Optional 

12 

13from scheduler.base.job import BaseJob, BaseJobType 

14from scheduler.base.timingtype import ( 

15 TimingCyclic, 

16 TimingDailyUnion, 

17 TimingOnceUnion, 

18 TimingWeeklyUnion, 

19) 

20 

21LOGGER = getLogger("scheduler") 

22 

23 

24def select_jobs_by_tag( 

25 jobs: set[BaseJobType], 

26 tags: set[str], 

27 any_tag: bool, 

28) -> set[BaseJobType]: 

29 r""" 

30 Select |BaseJob|\ s by matching `tags`. 

31 

32 Parameters 

33 ---------- 

34 jobs : set[BaseJob] 

35 Unfiltered set of |BaseJob|\ s. 

36 tags : set[str] 

37 Tags to filter |BaseJob|\ s. 

38 any_tag : bool 

39 False: To match a |BaseJob| all tags have to match. 

40 True: To match a |BaseJob| at least one tag has to match. 

41 

42 Returns 

43 ------- 

44 set[BaseJob] 

45 Selected |BaseJob|\ s. 

46 """ 

47 if any_tag: 

48 return {job for job in jobs if tags & job.tags} 

49 return {job for job in jobs if tags <= job.tags} 

50 

51 

52def deprecated(fields: List[str]) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

53 """ 

54 Decorator for marking specified function arguments as deprecated. 

55 

56 Parameters 

57 ---------- 

58 fields : List[str] 

59 A list of strings representing the names of the function arguments that are deprecated. 

60 

61 Examples 

62 -------- 

63 @deprecated(['old_arg']) 

64 def some_function(new_arg, old_arg=None): 

65 pass 

66 

67 Calling `some_function(new_arg=5, old_arg=3)` generates a deprecation warning for using 'old_arg'. 

68 """ 

69 

70 def wrapper(func): 

71 @wraps(func) 

72 def real_wrapper(*args, **kwargs): 

73 for f in fields: 

74 if f in kwargs and kwargs[f] is not None: 

75 # keep it in kwargs 

76 warnings.warn( 

77 ( 

78 f"Using the `{f}` argument is deprecated and will " 

79 "be removed in the next minor release." 

80 ), 

81 DeprecationWarning, 

82 stacklevel=3, 

83 ) 

84 return func(*args, **kwargs) 

85 

86 return real_wrapper 

87 

88 return wrapper 

89 

90 

91class BaseScheduler(ABC): # NOTE maybe a typing Protocol class is better than an ABC class 

92 """ 

93 Interface definition of an abstract scheduler. 

94 

95 Author: Jendrik A. Potyka, Fabian A. Preiss 

96 """ 

97 

98 __logger: Logger 

99 

100 def __init__(self, logger: Optional[Logger] = None) -> None: 

101 self.__logger = logger if logger else LOGGER 

102 

103 @abstractmethod 

104 def delete_job(self, job: BaseJob) -> None: 

105 """Delete a |BaseJob| from the `BaseScheduler`.""" 

106 

107 @abstractmethod 

108 def delete_jobs( 

109 self, 

110 tags: Optional[set[str]] = None, 

111 any_tag: bool = False, 

112 ) -> int: 

113 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags.""" 

114 

115 @abstractmethod 

116 def get_jobs( 

117 self, 

118 tags: Optional[set[str]] = None, 

119 any_tag: bool = False, 

120 ) -> set[BaseJob]: 

121 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags.""" 

122 

123 @abstractmethod 

124 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> BaseJob: 

125 """Schedule a cyclic |BaseJob|.""" 

126 

127 @abstractmethod 

128 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob: 

129 """Schedule a minutely |BaseJob|.""" 

130 

131 @abstractmethod 

132 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob: 

133 """Schedule an hourly |BaseJob|.""" 

134 

135 @abstractmethod 

136 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob: 

137 """Schedule a daily |BaseJob|.""" 

138 

139 @abstractmethod 

140 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> BaseJob: 

141 """Schedule a weekly |BaseJob|.""" 

142 

143 @abstractmethod 

144 def once( 

145 self, 

146 timing: TimingOnceUnion, 

147 handle: Callable[..., None], 

148 *, 

149 args: Optional[tuple[Any]] = None, 

150 kwargs: Optional[dict[str, Any]] = None, 

151 tags: Optional[list[str]] = None, 

152 ) -> BaseJob: 

153 """Schedule a oneshot |BaseJob|.""" 

154 

155 @property 

156 @abstractmethod 

157 def jobs(self) -> set[BaseJob]: 

158 r"""Get the set of all |BaseJob|\ s."""