Coverage for scheduler/base/job.py: 100%

146 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-12-10 20:15 +0000

1""" 

2Implementation of a `BaseJob`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9import datetime as dt 

10import warnings 

11from abc import ABC, abstractmethod 

12from logging import Logger 

13from typing import Any, Callable, Optional, TypeVar, cast 

14 

15from scheduler.base.definition import JobType 

16from scheduler.base.job_timer import JobTimer 

17from scheduler.base.job_util import ( 

18 check_duplicate_effective_timings, 

19 check_timing_tzinfo, 

20 get_pending_timer, 

21 prettify_timedelta, 

22 sane_timing_types, 

23 set_start_check_stop_tzinfo, 

24 standardize_timing_format, 

25) 

26from scheduler.base.timingtype import TimingJobUnion 

27 

28 

29class BaseJob(ABC): 

30 """Abstract definition basic interface for a job class.""" 

31 

32 __type: JobType 

33 __timing: TimingJobUnion 

34 __handle: Callable[..., None] 

35 __args: tuple[Any, ...] 

36 __kwargs: dict[str, Any] 

37 __max_attempts: int 

38 __tags: set[str] 

39 __delay: bool 

40 __start: Optional[dt.datetime] 

41 __stop: Optional[dt.datetime] 

42 __skip_missing: bool 

43 __alias: Optional[str] 

44 __tzinfo: Optional[dt.tzinfo] 

45 __logger: Logger 

46 

47 __mark_delete: bool 

48 __attempts: int 

49 __failed_attempts: int 

50 __pending_timer: JobTimer 

51 __timers: list[JobTimer] 

52 

53 def __init__( 

54 self, 

55 job_type: JobType, 

56 timing: TimingJobUnion, 

57 handle: Callable[..., None], 

58 *, 

59 args: Optional[tuple[Any]] = None, 

60 kwargs: Optional[dict[str, Any]] = None, 

61 max_attempts: int = 0, 

62 tags: Optional[set[str]] = None, 

63 delay: bool = True, 

64 start: Optional[dt.datetime] = None, 

65 stop: Optional[dt.datetime] = None, 

66 skip_missing: bool = False, 

67 alias: str = None, 

68 tzinfo: Optional[dt.tzinfo] = None, 

69 ): 

70 timing = standardize_timing_format(job_type, timing) 

71 

72 sane_timing_types(job_type, timing) 

73 check_timing_tzinfo(job_type, timing, tzinfo) 

74 check_duplicate_effective_timings(job_type, timing, tzinfo) 

75 

76 self.__start = set_start_check_stop_tzinfo(start, stop, tzinfo) 

77 

78 self.__type = job_type 

79 self.__timing = timing # pylint: disable=unused-private-member 

80 # NOTE: https://github.com/python/mypy/issues/708 

81 # https://github.com/python/mypy/issues/2427 

82 self.__handle = handle # type: ignore 

83 self.__args = () if args is None else args 

84 self.__kwargs = {} if kwargs is None else kwargs.copy() 

85 self.__max_attempts = max_attempts 

86 self.__tags = set() if tags is None else tags.copy() 

87 self.__delay = delay 

88 self.__stop = stop 

89 self.__skip_missing = skip_missing 

90 self.__alias = alias 

91 self.__tzinfo = tzinfo 

92 

93 # self.__mark_delete will be set to True if the new Timer would be in future 

94 # relativ to the self.__stop variable 

95 self.__mark_delete = False 

96 self.__attempts = 0 

97 self.__failed_attempts = 0 

98 

99 # create JobTimers 

100 self.__timers = [JobTimer(job_type, tim, self.__start, skip_missing) for tim in timing] 

101 self.__pending_timer = get_pending_timer(self.__timers) 

102 

103 if self.__stop is not None: 

104 if self.__pending_timer.datetime > self.__stop: 

105 self.__mark_delete = True 

106 

107 def __lt__(self, other: BaseJob): 

108 return self.datetime < other.datetime 

109 

110 def _calc_next_exec(self, ref_dt: dt.datetime) -> None: 

111 """ 

112 Calculate the next estimated execution `datetime.datetime` of the `Job`. 

113 

114 Parameters 

115 ---------- 

116 ref_dt : datetime.datetime 

117 Reference time stamp to which the |BaseJob| calculates 

118 it's next execution. 

119 """ 

120 if self.__skip_missing: 

121 for timer in self.__timers: 

122 if (timer.datetime - ref_dt).total_seconds() <= 0: 

123 timer.calc_next_exec(ref_dt) 

124 else: 

125 self.__pending_timer.calc_next_exec(ref_dt) 

126 self.__pending_timer = get_pending_timer(self.__timers) 

127 if self.__stop is not None and self.__pending_timer.datetime > self.__stop: 

128 self.__mark_delete = True 

129 

130 def _repr(self) -> tuple[str, ...]: 

131 return tuple( 

132 repr(elem) 

133 for elem in ( 

134 self.__type, 

135 self.__timing, 

136 self.__handle, 

137 self.__args, 

138 self.__kwargs, 

139 self.__max_attempts, 

140 self.__delay, 

141 self.__start, 

142 self.__stop, 

143 self.__skip_missing, 

144 self.__alias, 

145 self.tzinfo, 

146 ) 

147 ) 

148 

149 @abstractmethod 

150 def __repr__(self) -> str: 

151 raise NotImplementedError() # pragma: no cover 

152 

153 def _str( 

154 self, 

155 ) -> tuple[str, ...]: 

156 """Return the objects relevant for readable string representation.""" 

157 dt_timedelta = self.timedelta(dt.datetime.now(self.tzinfo)) 

158 if self.alias is not None: 

159 f_args = "" 

160 elif hasattr(self.handle, "__code__"): 

161 f_args = "(..)" if self.handle.__code__.co_nlocals else "()" 

162 else: 

163 f_args = "(?)" 

164 return ( 

165 self.type.name if self.max_attempts != 1 else "ONCE", 

166 self.handle.__qualname__ if self.alias is None else self.alias, 

167 f_args, 

168 str(self.datetime)[:19], 

169 str(self.datetime.tzname()), 

170 prettify_timedelta(dt_timedelta), 

171 str(self.attempts), 

172 str(float("inf") if self.max_attempts == 0 else self.max_attempts), 

173 ) 

174 

175 def __str__(self) -> str: 

176 return "{0}, {1}{2}, at={3}, tz={4}, in={5}, #{6}/{7}".format(*self._str()) 

177 

178 def timedelta(self, dt_stamp: Optional[dt.datetime] = None) -> dt.timedelta: 

179 """ 

180 Get the `datetime.timedelta` until the next execution of this `Job`. 

181 

182 Parameters 

183 ---------- 

184 dt_stamp : Optional[datetime.datetime] 

185 Time to be compared with the planned execution time to determine the time difference. 

186 

187 Returns 

188 ------- 

189 timedelta 

190 `datetime.timedelta` to the next execution. 

191 """ 

192 if dt_stamp is None: 

193 dt_stamp = dt.datetime.now(self.__tzinfo) 

194 if not self.__delay and self.__attempts == 0: 

195 return cast(dt.datetime, self.__start) - dt_stamp 

196 return self.__pending_timer.timedelta(dt_stamp) 

197 

198 @property 

199 def datetime(self) -> dt.datetime: 

200 """ 

201 Give the `datetime.datetime` object for the planed execution. 

202 

203 Returns 

204 ------- 

205 datetime.datetime 

206 Execution `datetime.datetime` stamp. 

207 """ 

208 if not self.__delay and self.__attempts == 0: 

209 return cast(dt.datetime, self.__start) 

210 return self.__pending_timer.datetime 

211 

212 @property 

213 def type(self) -> JobType: 

214 """ 

215 Return the `JobType` of the `Job` instance. 

216 

217 Returns 

218 ------- 

219 JobType 

220 :class:`~scheduler.job.JobType` of the |BaseJob|. 

221 """ 

222 return self.__type 

223 

224 @property 

225 def handle(self) -> Callable[..., None]: 

226 """ 

227 Get the callback function. 

228 

229 Returns 

230 ------- 

231 handle 

232 Callback function. 

233 """ 

234 return self.__handle 

235 

236 @property 

237 def args(self) -> tuple[Any, ...]: 

238 r""" 

239 Get the positional arguments of the function handle within a `Job`. 

240 

241 .. warning:: When running |BaseJob|\ s in parallel threads, 

242 be sure to implement possible side effects of parameter accessing in a 

243 thread safe manner. 

244 

245 Returns 

246 ------- 

247 tuple[Any] 

248 The payload arguments to pass to the function handle within a 

249 |BaseJob|. 

250 """ 

251 return self.__args 

252 

253 @property 

254 def kwargs(self) -> dict[str, Any]: 

255 r""" 

256 Get the keyword arguments of the function handle within a `Job`. 

257 

258 .. warning:: When running |BaseJob|\ s in parallel threads, 

259 be sure to implement possible side effects of parameter accessing in a 

260 thread safe manner. 

261 

262 Returns 

263 ------- 

264 dict[str, Any] 

265 The payload arguments to pass to the function handle within a 

266 |BaseJob|. 

267 """ 

268 return self.__kwargs 

269 

270 @property 

271 def max_attempts(self) -> int: 

272 """ 

273 Get the execution limit for a `Job`. 

274 

275 Returns 

276 ------- 

277 int 

278 Max execution attempts. 

279 """ 

280 return self.__max_attempts 

281 

282 @property 

283 def tags(self) -> set[str]: 

284 r""" 

285 Get the tags of a `Job`. 

286 

287 Returns 

288 ------- 

289 set[str] 

290 The tags of a |BaseJob|. 

291 """ 

292 return self.__tags.copy() 

293 

294 @property 

295 def delay(self) -> bool: 

296 """ 

297 *Deprecated*: Return ``True`` if the first `Job` execution will wait for the next scheduled time. 

298 

299 Returns 

300 ------- 

301 bool 

302 If ``True`` wait with the execution for the next scheduled time. If ``False`` 

303 the first execution will target the time of `Job.start`. 

304 """ 

305 warnings.warn( 

306 ( 

307 "Using the `delay` property is deprecated and will " 

308 "be removed in the next minor release." 

309 ), 

310 DeprecationWarning, 

311 stacklevel=2, 

312 ) 

313 return self.__delay 

314 

315 @property 

316 def start(self) -> Optional[dt.datetime]: 

317 """ 

318 Get the timestamp at which the `JobTimer` starts. 

319 

320 Returns 

321 ------- 

322 Optional[datetime.datetime] 

323 The start datetime stamp. 

324 """ 

325 return self.__start 

326 

327 @property 

328 def stop(self) -> Optional[dt.datetime]: 

329 """ 

330 Get the timestamp after which no more executions of the `Job` should be scheduled. 

331 

332 Returns 

333 ------- 

334 Optional[datetime.datetime] 

335 The stop datetime stamp. 

336 """ 

337 return self.__stop 

338 

339 @property 

340 def skip_missing(self) -> bool: 

341 """ 

342 Return ``True`` if `Job` will only schedule it's newest planned execution. 

343 

344 Returns 

345 ------- 

346 bool 

347 If ``True`` a |BaseJob| will only schedule it's newest planned 

348 execution and drop older ones. 

349 """ 

350 return self.__skip_missing 

351 

352 @property 

353 def alias(self) -> Optional[str]: 

354 r""" 

355 Get the alias of the `Job`. 

356 

357 Returns 

358 ------- 

359 Optional[str] 

360 Alias of the |BaseJob|. 

361 """ 

362 return self.__alias 

363 

364 @property 

365 def tzinfo(self) -> Optional[dt.tzinfo]: 

366 r""" 

367 Get the timezone of the `Job`'s next execution. 

368 

369 Returns 

370 ------- 

371 Optional[datetime.tzinfo] 

372 Timezone of the |BaseJob|\ s next execution. 

373 """ 

374 return self.datetime.tzinfo 

375 

376 @property 

377 def _tzinfo(self) -> Optional[dt.tzinfo]: 

378 """ 

379 Get the timezone of the `Scheduler` in which the `Job` is living. 

380 

381 Returns 

382 ------- 

383 Optional[datetime.tzinfo] 

384 Timezone of the |BaseJob|. 

385 """ 

386 return self.__tzinfo 

387 

388 @property 

389 def attempts(self) -> int: 

390 """ 

391 Get the number of executions for a `Job`. 

392 

393 Returns 

394 ------- 

395 int 

396 Execution attempts. 

397 """ 

398 return self.__attempts 

399 

400 @property 

401 def failed_attempts(self) -> int: 

402 """ 

403 Get the number of failed executions for a `Job`. 

404 

405 Returns 

406 ------- 

407 int 

408 Failed execution attempts. 

409 """ 

410 return self.__failed_attempts 

411 

412 @property 

413 def has_attempts_remaining(self) -> bool: 

414 """ 

415 Check if a `Job` has remaining attempts. 

416 

417 This function will return True if the |BaseJob| has open 

418 execution counts and the stop argument is not in the past relative to the 

419 next planed execution. 

420 

421 Returns 

422 ------- 

423 bool 

424 True if the |BaseJob| has execution attempts. 

425 """ 

426 if self.__mark_delete: 

427 return False 

428 if self.__max_attempts == 0: 

429 return True 

430 return self.__attempts < self.__max_attempts 

431 

432 

433BaseJobType = TypeVar("BaseJobType", bound=BaseJob)