Coverage for scheduler/base/scheduler.py: 100%

49 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-21 13:55 +0000

1"""Implementation of a `BaseScheduler`. 

2 

3Author: Jendrik A. Potyka, Fabian A. Preiss 

4""" 

5 

6import warnings 

7from abc import ABC, abstractmethod 

8from collections.abc import Iterable 

9from functools import wraps 

10from logging import Logger, getLogger 

11from typing import Any, Callable, Generic, List, Optional, TypeVar 

12 

13from scheduler.base.job import BaseJobType 

14from scheduler.base.timingtype import ( 

15 TimingCyclic, 

16 TimingDailyUnion, 

17 TimingOnceUnion, 

18 TimingWeeklyUnion, 

19) 

20 

21# TODO: 

22# import sys 

23# if sys.version_info < (3, 10): 

24# from typing_extensions import ParamSpec 

25# else: 

26# from typing import ParamSpec 

27 

28 

29LOGGER = getLogger("scheduler") 

30 

31 

32def select_jobs_by_tag( 

33 jobs: set[BaseJobType], 

34 tags: set[str], 

35 any_tag: bool, 

36) -> set[BaseJobType]: 

37 r""" 

38 Select |BaseJob|\ s by matching `tags`. 

39 

40 Parameters 

41 ---------- 

42 jobs : set[BaseJob] 

43 Unfiltered set of |BaseJob|\ s. 

44 tags : set[str] 

45 Tags to filter |BaseJob|\ s. 

46 any_tag : bool 

47 False: To match a |BaseJob| all tags have to match. 

48 True: To match a |BaseJob| at least one tag has to match. 

49 

50 Returns 

51 ------- 

52 set[BaseJob] 

53 Selected |BaseJob|\ s. 

54 """ 

55 if any_tag: 

56 return {job for job in jobs if tags & job.tags} 

57 return {job for job in jobs if tags <= job.tags} 

58 

59 

60def deprecated(fields: List[str]) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

61 """ 

62 Decorator for marking specified function arguments as deprecated. 

63 

64 Parameters 

65 ---------- 

66 fields : List[str] 

67 A list of strings representing the names of the function arguments that are deprecated. 

68 

69 Examples 

70 -------- 

71 .. code-block:: python 

72 

73 @deprecated(["old_arg"]) 

74 def some_function(new_arg, old_arg=None): 

75 pass 

76 

77 Calling `some_function(new_arg=5, old_arg=3)` generates a deprecation warning for using 'old_arg'. 

78 """ 

79 

80 def wrapper(func: Callable[..., Any]) -> Callable[..., Any]: 

81 @wraps(func) 

82 def real_wrapper(*args: tuple[Any, ...], **kwargs: dict[str, Any]) -> Any: 

83 for f in fields: 

84 if f in kwargs and kwargs[f] is not None: 

85 # keep it in kwargs 

86 warnings.warn( 

87 ( 

88 f"Using the `{f}` argument is deprecated and will " 

89 "be removed in the next minor release." 

90 ), 

91 DeprecationWarning, 

92 stacklevel=3, 

93 ) 

94 return func(*args, **kwargs) 

95 

96 return real_wrapper 

97 

98 return wrapper 

99 

100 

101T = TypeVar("T", bound=Callable[[], Any]) 

102 

103 

104class BaseScheduler( 

105 ABC, Generic[BaseJobType, T] 

106): # NOTE maybe a typing Protocol class is better than an ABC class 

107 """ 

108 Interface definition of an abstract scheduler. 

109 

110 Author: Jendrik A. Potyka, Fabian A. Preiss 

111 """ 

112 

113 _logger: Logger 

114 

115 def __init__(self, logger: Optional[Logger] = None) -> None: 

116 self._logger = logger if logger else LOGGER 

117 

118 @abstractmethod 

119 def delete_job(self, job: BaseJobType) -> None: 

120 """Delete a |BaseJob| from the `BaseScheduler`.""" 

121 

122 @abstractmethod 

123 def delete_jobs( 

124 self, 

125 tags: Optional[set[str]] = None, 

126 any_tag: bool = False, 

127 ) -> int: 

128 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags.""" 

129 

130 @abstractmethod 

131 def get_jobs( 

132 self, 

133 tags: Optional[set[str]] = None, 

134 any_tag: bool = False, 

135 ) -> set[BaseJobType]: 

136 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags.""" 

137 

138 @abstractmethod 

139 def cyclic(self, timing: TimingCyclic, handle: T, **kwargs) -> BaseJobType: 

140 """Schedule a cyclic |BaseJob|.""" 

141 

142 @abstractmethod 

143 def minutely(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType: 

144 """Schedule a minutely |BaseJob|.""" 

145 

146 @abstractmethod 

147 def hourly(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType: 

148 """Schedule an hourly |BaseJob|.""" 

149 

150 @abstractmethod 

151 def daily(self, timing: TimingDailyUnion, handle: T, **kwargs) -> BaseJobType: 

152 """Schedule a daily |BaseJob|.""" 

153 

154 @abstractmethod 

155 def weekly(self, timing: TimingWeeklyUnion, handle: T, **kwargs) -> BaseJobType: 

156 """Schedule a weekly |BaseJob|.""" 

157 

158 @abstractmethod 

159 def once( 

160 self, 

161 timing: TimingOnceUnion, 

162 handle: T, 

163 *, 

164 args: Optional[tuple[Any]] = None, 

165 kwargs: Optional[dict[str, Any]] = None, 

166 tags: Optional[Iterable[str]] = None, 

167 alias: Optional[str] = None, 

168 ) -> BaseJobType: 

169 """Schedule a oneshot |BaseJob|.""" 

170 

171 @property 

172 @abstractmethod 

173 def jobs(self) -> set[BaseJobType]: 

174 r"""Get the set of all |BaseJob|\ s."""