Coverage for scheduler/base/scheduler.py: 100%

39 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-03-19 22:07 +0000

1""" 

2Implementation of a `BaseScheduler`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import warnings 

8from abc import ABC, abstractmethod 

9from logging import Logger, getLogger 

10from typing import Any, Callable, Optional 

11 

12from scheduler.base.job import BaseJob, BaseJobType 

13from scheduler.base.timingtype import ( 

14 TimingCyclic, 

15 TimingDailyUnion, 

16 TimingOnceUnion, 

17 TimingWeeklyUnion, 

18) 

19 

20LOGGER = getLogger("scheduler") 

21 

22 

23def select_jobs_by_tag( 

24 jobs: set[BaseJobType], 

25 tags: set[str], 

26 any_tag: bool, 

27) -> set[BaseJobType]: 

28 r""" 

29 Select |BaseJob|\ s by matching `tags`. 

30 

31 Parameters 

32 ---------- 

33 jobs : set[BaseJob] 

34 Unfiltered set of |BaseJob|\ s. 

35 tags : set[str] 

36 Tags to filter |BaseJob|\ s. 

37 any_tag : bool 

38 False: To match a |BaseJob| all tags have to match. 

39 True: To match a |BaseJob| at least one tag has to match. 

40 

41 Returns 

42 ------- 

43 set[BaseJob] 

44 Selected |BaseJob|\ s. 

45 """ 

46 if any_tag: 

47 return {job for job in jobs if tags & job.tags} 

48 return {job for job in jobs if tags <= job.tags} 

49 

50 

51def _warn_deprecated_delay(delay: Optional[bool] = None, **kwargs): 

52 if delay is not None: 

53 warnings.warn( 

54 ( 

55 "Using the `delay` argument is deprecated and will " 

56 "be removed in the next minor release." 

57 ), 

58 DeprecationWarning, 

59 stacklevel=3, 

60 ) 

61 

62 

63class BaseScheduler(ABC): # NOTE maybe a typing Protocol class is better than an ABC class 

64 """ 

65 Interface definition of an abstract scheduler. 

66 

67 Author: Jendrik A. Potyka, Fabian A. Preiss 

68 """ 

69 

70 __logger: Logger 

71 

72 def __init__(self, logger: Optional[Logger] = None) -> None: 

73 self.__logger = logger if logger else LOGGER 

74 

75 @abstractmethod 

76 def delete_job(self, job: BaseJob) -> None: 

77 """Delete a |BaseJob| from the `BaseScheduler`.""" 

78 

79 @abstractmethod 

80 def delete_jobs( 

81 self, 

82 tags: Optional[set[str]] = None, 

83 any_tag: bool = False, 

84 ) -> int: 

85 r"""Delete a set of |BaseJob|\ s from the `BaseScheduler` by tags.""" 

86 

87 @abstractmethod 

88 def get_jobs( 

89 self, 

90 tags: Optional[set[str]] = None, 

91 any_tag: bool = False, 

92 ) -> set[BaseJob]: 

93 r"""Get a set of |BaseJob|\ s from the `BaseScheduler` by tags.""" 

94 

95 @abstractmethod 

96 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> BaseJob: 

97 """Schedule a cyclic |BaseJob|.""" 

98 

99 @abstractmethod 

100 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob: 

101 """Schedule a minutely |BaseJob|.""" 

102 

103 @abstractmethod 

104 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob: 

105 """Schedule an hourly |BaseJob|.""" 

106 

107 @abstractmethod 

108 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> BaseJob: 

109 """Schedule a daily |BaseJob|.""" 

110 

111 @abstractmethod 

112 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> BaseJob: 

113 """Schedule a weekly |BaseJob|.""" 

114 

115 @abstractmethod 

116 def once( 

117 self, 

118 timing: TimingOnceUnion, 

119 handle: Callable[..., None], 

120 *, 

121 args: tuple[Any] = None, 

122 kwargs: Optional[dict[str, Any]] = None, 

123 tags: Optional[list[str]] = None, 

124 ) -> BaseJob: 

125 """Schedule a oneshot |BaseJob|.""" 

126 

127 @property 

128 @abstractmethod 

129 def jobs(self) -> set[BaseJob]: 

130 r"""Get the set of all |BaseJob|\ s."""