Coverage for scheduler/base/scheduler.py: 100%

49 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2024-06-09 19:18 +0000

1""" 

2Implementation of a `BaseScheduler`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import warnings 

8from abc import ABC, abstractmethod 

9from collections.abc import Iterable 

10from functools import wraps 

11from logging import Logger, getLogger 

12from typing import Any, Callable, Generic, List, Optional, TypeVar 

13 

14from scheduler.base.job import BaseJobType 

15from scheduler.base.timingtype import ( 

16 TimingCyclic, 

17 TimingDailyUnion, 

18 TimingOnceUnion, 

19 TimingWeeklyUnion, 

20) 

21 

22# TODO: 

23# import sys 

24# if sys.version_info < (3, 10): 

25# from typing_extensions import ParamSpec 

26# else: 

27# from typing import ParamSpec 

28 

29 

30LOGGER = getLogger("scheduler") 

31 

32 

33def select_jobs_by_tag( 

34 jobs: set[BaseJobType], 

35 tags: set[str], 

36 any_tag: bool, 

37) -> set[BaseJobType]: 

38 r""" 

39 Select |BaseJob|\ s by matching `tags`. 

40 

41 Parameters 

42 ---------- 

43 jobs : set[BaseJob] 

44 Unfiltered set of |BaseJob|\ s. 

45 tags : set[str] 

46 Tags to filter |BaseJob|\ s. 

47 any_tag : bool 

48 False: To match a |BaseJob| all tags have to match. 

49 True: To match a |BaseJob| at least one tag has to match. 

50 

51 Returns 

52 ------- 

53 set[BaseJob] 

54 Selected |BaseJob|\ s. 

55 """ 

56 if any_tag: 

57 return {job for job in jobs if tags & job.tags} 

58 return {job for job in jobs if tags <= job.tags} 

59 

60 

61def deprecated(fields: List[str]) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

62 """ 

63 Decorator for marking specified function arguments as deprecated. 

64 

65 Parameters 

66 ---------- 

67 fields : List[str] 

68 A list of strings representing the names of the function arguments that are deprecated. 

69 

70 Examples 

71 -------- 

72 @deprecated(['old_arg']) 

73 def some_function(new_arg, old_arg=None): 

74 pass 

75 

76 Calling `some_function(new_arg=5, old_arg=3)` generates a deprecation warning for using 'old_arg'. 

77 """ 

78 

79 def wrapper(func: Callable[..., Any]) -> Callable[..., Any]: 

80 @wraps(func) 

81 def real_wrapper(*args: tuple[Any, ...], **kwargs: dict[str, Any]) -> Any: 

82 for f in fields: 

83 if f in kwargs and kwargs[f] is not None: 

84 # keep it in kwargs 

85 warnings.warn( 

86 ( 

87 f"Using the `{f}` argument is deprecated and will " 

88 "be removed in the next minor release." 

89 ), 

90 DeprecationWarning, 

91 stacklevel=3, 

92 ) 

93 return func(*args, **kwargs) 

94 

95 return real_wrapper 

96 

97 return wrapper 

98 

99 

100T = TypeVar("T", bound=Callable[[], Any]) 

101 

102 

103class BaseScheduler( 

104 ABC, Generic[BaseJobType, T] 

105): # NOTE maybe a typing Protocol class is better than an ABC class 

106 """ 

107 Interface definition of an abstract scheduler. 

108 

109 Author: Jendrik A. Potyka, Fabian A. Preiss 

110 """ 

111 

112 _logger: Logger 

113 

114 def __init__(self, logger: Optional[Logger] = None) -> None: 

115 self._logger = logger if logger else LOGGER 

116 

117 @abstractmethod 

118 def delete_job(self, job: BaseJobType) -> None: 

119 """Delete a |BaseJob| from the `BaseScheduler`.""" 

120 

121 @abstractmethod 

122 def delete_jobs( 

123 self, 

124 tags: Optional[set[str]] = None, 

125 any_tag: bool = False, 

126 ) -> int: 

127 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags.""" 

128 

129 @abstractmethod 

130 def get_jobs( 

131 self, 

132 tags: Optional[set[str]] = None, 

133 any_tag: bool = False, 

134 ) -> set[BaseJobType]: 

135 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags.""" 

136 

137 @abstractmethod 

138 def cyclic(self, timing: TimingCyclic, handle: T, **kwargs) -> BaseJobType: 

139 """Schedule a cyclic |BaseJob|.""" 

140 

141 @abstractmethod 

142 def minutely(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType: 

143 """Schedule a minutely |BaseJob|.""" 

144 

145 @abstractmethod 

146 def hourly(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType: 

147 """Schedule an hourly |BaseJob|.""" 

148 

149 @abstractmethod 

150 def daily(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType: 

151 """Schedule a daily |BaseJob|.""" 

152 

153 @abstractmethod 

154 def weekly(self, timing: TimingWeeklyUnion, handle: T, **kwargs) -> BaseJobType: 

155 """Schedule a weekly |BaseJob|.""" 

156 

157 @abstractmethod 

158 def once( 

159 self, 

160 timing: TimingOnceUnion, 

161 handle: T, 

162 *, 

163 args: Optional[tuple[Any]] = None, 

164 kwargs: Optional[dict[str, Any]] = None, 

165 tags: Optional[Iterable[str]] = None, 

166 alias: Optional[str] = None, 

167 ) -> BaseJobType: 

168 """Schedule a oneshot |BaseJob|.""" 

169 

170 @property 

171 @abstractmethod 

172 def jobs(self) -> set[BaseJobType]: 

173 r"""Get the set of all |BaseJob|\ s."""