Coverage for scheduler/base/job.py: 100%

147 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-21 13:55 +0000

1""" 

2Implementation of a `BaseJob`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9import datetime as dt 

10import warnings 

11from abc import ABC, abstractmethod 

12from logging import Logger 

13from typing import Any, Callable, Generic, Optional, TypeVar, cast 

14 

15from scheduler.base.definition import JobType 

16from scheduler.base.job_timer import JobTimer 

17from scheduler.base.job_util import ( 

18 check_duplicate_effective_timings, 

19 check_timing_tzinfo, 

20 get_pending_timer, 

21 prettify_timedelta, 

22 sane_timing_types, 

23 set_start_check_stop_tzinfo, 

24 standardize_timing_format, 

25) 

26from scheduler.base.timingtype import TimingJobUnion 

27 

28T = TypeVar("T", bound=Callable[[], Any]) 

29 

30 

31class BaseJob(ABC, Generic[T]): 

32 """Abstract definition basic interface for a job class.""" 

33 

34 __type: JobType 

35 __timing: TimingJobUnion 

36 __handle: T 

37 __args: tuple[Any, ...] 

38 __kwargs: dict[str, Any] 

39 __max_attempts: int 

40 __tags: set[str] 

41 __delay: bool 

42 __start: Optional[dt.datetime] 

43 __stop: Optional[dt.datetime] 

44 __skip_missing: bool 

45 __alias: Optional[str] 

46 __tzinfo: Optional[dt.tzinfo] 

47 __logger: Logger 

48 

49 __mark_delete: bool 

50 __attempts: int 

51 __failed_attempts: int 

52 __pending_timer: JobTimer 

53 __timers: list[JobTimer] 

54 

55 def __init__( 

56 self, 

57 job_type: JobType, 

58 timing: TimingJobUnion, 

59 handle: T, 

60 *, 

61 args: Optional[tuple[Any, ...]] = None, 

62 kwargs: Optional[dict[str, Any]] = None, 

63 max_attempts: int = 0, 

64 tags: Optional[set[str]] = None, 

65 delay: bool = True, 

66 start: Optional[dt.datetime] = None, 

67 stop: Optional[dt.datetime] = None, 

68 skip_missing: bool = False, 

69 alias: Optional[str] = None, 

70 tzinfo: Optional[dt.tzinfo] = None, 

71 ): 

72 timing = standardize_timing_format(job_type, timing) 

73 

74 sane_timing_types(job_type, timing) 

75 check_timing_tzinfo(job_type, timing, tzinfo) 

76 check_duplicate_effective_timings(job_type, timing, tzinfo) 

77 

78 self.__start = set_start_check_stop_tzinfo(start, stop, tzinfo) 

79 

80 self.__type = job_type 

81 self.__timing = timing # pylint: disable=unused-private-member 

82 # NOTE: https://github.com/python/mypy/issues/708 

83 # https://github.com/python/mypy/issues/2427 

84 self.__handle = handle 

85 self.__args = () if args is None else args 

86 self.__kwargs = {} if kwargs is None else kwargs.copy() 

87 self.__max_attempts = max_attempts 

88 self.__tags = set() if tags is None else tags.copy() 

89 self.__delay = delay 

90 self.__stop = stop 

91 self.__skip_missing = skip_missing 

92 self.__alias = alias 

93 self.__tzinfo = tzinfo 

94 

95 # self.__mark_delete will be set to True if the new Timer would be in future 

96 # relativ to the self.__stop variable 

97 self.__mark_delete = False 

98 self.__attempts = 0 

99 self.__failed_attempts = 0 

100 

101 # create JobTimers 

102 self.__timers = [JobTimer(job_type, tim, self.__start, skip_missing) for tim in timing] 

103 self.__pending_timer = get_pending_timer(self.__timers) 

104 

105 if self.__stop is not None: 

106 if self.__pending_timer.datetime > self.__stop: 

107 self.__mark_delete = True 

108 

109 def __lt__(self, other: BaseJob[T]) -> bool: 

110 return self.datetime < other.datetime 

111 

112 def _calc_next_exec(self, ref_dt: dt.datetime) -> None: 

113 """ 

114 Calculate the next estimated execution `datetime.datetime` of the `Job`. 

115 

116 Parameters 

117 ---------- 

118 ref_dt : datetime.datetime 

119 Reference time stamp to which the |BaseJob| calculates 

120 it's next execution. 

121 """ 

122 if self.__skip_missing: 

123 for timer in self.__timers: 

124 if (timer.datetime - ref_dt).total_seconds() <= 0: 

125 timer.calc_next_exec(ref_dt) 

126 else: 

127 self.__pending_timer.calc_next_exec(ref_dt) 

128 self.__pending_timer = get_pending_timer(self.__timers) 

129 if self.__stop is not None and self.__pending_timer.datetime > self.__stop: 

130 self.__mark_delete = True 

131 

132 def _repr(self) -> tuple[str, ...]: 

133 return tuple( 

134 repr(elem) 

135 for elem in ( 

136 self.__type, 

137 self.__timing, 

138 self.__handle, 

139 self.__args, 

140 self.__kwargs, 

141 self.__max_attempts, 

142 self.__delay, 

143 self.__start, 

144 self.__stop, 

145 self.__skip_missing, 

146 self.__alias, 

147 self.tzinfo, 

148 ) 

149 ) 

150 

151 @abstractmethod 

152 def __repr__(self) -> str: 

153 raise NotImplementedError() # pragma: no cover 

154 

155 def _str( 

156 self, 

157 ) -> tuple[str, ...]: 

158 """Return the objects relevant for readable string representation.""" 

159 dt_timedelta = self.timedelta(dt.datetime.now(self.tzinfo)) 

160 if self.alias is not None: 

161 f_args = "" 

162 elif hasattr(self.handle, "__code__"): 

163 f_args = "(..)" if self.handle.__code__.co_nlocals else "()" 

164 else: 

165 f_args = "(?)" 

166 return ( 

167 self.type.name if self.max_attempts != 1 else "ONCE", 

168 self.handle.__qualname__ if self.alias is None else self.alias, 

169 f_args, 

170 str(self.datetime)[:19], 

171 str(self.datetime.tzname()), 

172 prettify_timedelta(dt_timedelta), 

173 str(self.attempts), 

174 str(float("inf") if self.max_attempts == 0 else self.max_attempts), 

175 ) 

176 

177 def __str__(self) -> str: 

178 return "{0}, {1}{2}, at={3}, tz={4}, in={5}, #{6}/{7}".format(*self._str()) 

179 

180 def timedelta(self, dt_stamp: Optional[dt.datetime] = None) -> dt.timedelta: 

181 """ 

182 Get the `datetime.timedelta` until the next execution of this `Job`. 

183 

184 Parameters 

185 ---------- 

186 dt_stamp : Optional[datetime.datetime] 

187 Time to be compared with the planned execution time to determine the time difference. 

188 

189 Returns 

190 ------- 

191 timedelta 

192 `datetime.timedelta` to the next execution. 

193 """ 

194 if dt_stamp is None: 

195 dt_stamp = dt.datetime.now(self.__tzinfo) 

196 if not self.__delay and self.__attempts == 0: 

197 return cast(dt.datetime, self.__start) - dt_stamp 

198 return self.__pending_timer.timedelta(dt_stamp) 

199 

200 @property 

201 def datetime(self) -> dt.datetime: 

202 """ 

203 Give the `datetime.datetime` object for the planed execution. 

204 

205 Returns 

206 ------- 

207 datetime.datetime 

208 Execution `datetime.datetime` stamp. 

209 """ 

210 if not self.__delay and self.__attempts == 0: 

211 return cast(dt.datetime, self.__start) 

212 return self.__pending_timer.datetime 

213 

214 @property 

215 def type(self) -> JobType: 

216 """ 

217 Return the `JobType` of the `Job` instance. 

218 

219 Returns 

220 ------- 

221 JobType 

222 :class:`~scheduler.job.JobType` of the |BaseJob|. 

223 """ 

224 return self.__type 

225 

226 @property 

227 def handle(self) -> T: 

228 """ 

229 Get the callback function. 

230 

231 Returns 

232 ------- 

233 handle 

234 Callback function. 

235 """ 

236 return self.__handle 

237 

238 @property 

239 def args(self) -> tuple[Any, ...]: 

240 r""" 

241 Get the positional arguments of the function handle within a `Job`. 

242 

243 .. warning:: When running |BaseJob|\ s in parallel threads, 

244 be sure to implement possible side effects of parameter accessing in a 

245 thread safe manner. 

246 

247 Returns 

248 ------- 

249 tuple[Any] 

250 The payload arguments to pass to the function handle within a 

251 |BaseJob|. 

252 """ 

253 return self.__args 

254 

255 @property 

256 def kwargs(self) -> dict[str, Any]: 

257 r""" 

258 Get the keyword arguments of the function handle within a `Job`. 

259 

260 .. warning:: When running |BaseJob|\ s in parallel threads, 

261 be sure to implement possible side effects of parameter accessing in a 

262 thread safe manner. 

263 

264 Returns 

265 ------- 

266 dict[str, Any] 

267 The payload arguments to pass to the function handle within a 

268 |BaseJob|. 

269 """ 

270 return self.__kwargs 

271 

272 @property 

273 def max_attempts(self) -> int: 

274 """ 

275 Get the execution limit for a `Job`. 

276 

277 Returns 

278 ------- 

279 int 

280 Max execution attempts. 

281 """ 

282 return self.__max_attempts 

283 

284 @property 

285 def tags(self) -> set[str]: 

286 r""" 

287 Get the tags of a `Job`. 

288 

289 Returns 

290 ------- 

291 set[str] 

292 The tags of a |BaseJob|. 

293 """ 

294 return self.__tags.copy() 

295 

296 @property 

297 def delay(self) -> bool: 

298 """ 

299 *Deprecated*: Return ``True`` if the first `Job` execution will wait for the next scheduled time. 

300 

301 Returns 

302 ------- 

303 bool 

304 If ``True`` wait with the execution for the next scheduled time. If ``False`` 

305 the first execution will target the time of `Job.start`. 

306 """ 

307 warnings.warn( 

308 ( 

309 "Using the `delay` property is deprecated and will " 

310 "be removed in the next minor release." 

311 ), 

312 DeprecationWarning, 

313 stacklevel=2, 

314 ) 

315 return self.__delay 

316 

317 @property 

318 def start(self) -> Optional[dt.datetime]: 

319 """ 

320 Get the timestamp at which the `JobTimer` starts. 

321 

322 Returns 

323 ------- 

324 Optional[datetime.datetime] 

325 The start datetime stamp. 

326 """ 

327 return self.__start 

328 

329 @property 

330 def stop(self) -> Optional[dt.datetime]: 

331 """ 

332 Get the timestamp after which no more executions of the `Job` should be scheduled. 

333 

334 Returns 

335 ------- 

336 Optional[datetime.datetime] 

337 The stop datetime stamp. 

338 """ 

339 return self.__stop 

340 

341 @property 

342 def skip_missing(self) -> bool: 

343 """ 

344 Return ``True`` if `Job` will only schedule it's newest planned execution. 

345 

346 Returns 

347 ------- 

348 bool 

349 If ``True`` a |BaseJob| will only schedule it's newest planned 

350 execution and drop older ones. 

351 """ 

352 return self.__skip_missing 

353 

354 @property 

355 def alias(self) -> Optional[str]: 

356 r""" 

357 Get the alias of the `Job`. 

358 

359 Returns 

360 ------- 

361 Optional[str] 

362 Alias of the |BaseJob|. 

363 """ 

364 return self.__alias 

365 

366 @property 

367 def tzinfo(self) -> Optional[dt.tzinfo]: 

368 r""" 

369 Get the timezone of the `Job`'s next execution. 

370 

371 Returns 

372 ------- 

373 Optional[datetime.tzinfo] 

374 Timezone of the |BaseJob|\ s next execution. 

375 """ 

376 return self.datetime.tzinfo 

377 

378 @property 

379 def _tzinfo(self) -> Optional[dt.tzinfo]: 

380 """ 

381 Get the timezone of the `Scheduler` in which the `Job` is living. 

382 

383 Returns 

384 ------- 

385 Optional[datetime.tzinfo] 

386 Timezone of the |BaseJob|. 

387 """ 

388 return self.__tzinfo 

389 

390 @property 

391 def attempts(self) -> int: 

392 """ 

393 Get the number of executions for a `Job`. 

394 

395 Returns 

396 ------- 

397 int 

398 Execution attempts. 

399 """ 

400 return self.__attempts 

401 

402 @property 

403 def failed_attempts(self) -> int: 

404 """ 

405 Get the number of failed executions for a `Job`. 

406 

407 Returns 

408 ------- 

409 int 

410 Failed execution attempts. 

411 """ 

412 return self.__failed_attempts 

413 

414 @property 

415 def has_attempts_remaining(self) -> bool: 

416 """ 

417 Check if a `Job` has remaining attempts. 

418 

419 This function will return True if the |BaseJob| has open 

420 execution counts and the stop argument is not in the past relative to the 

421 next planed execution. 

422 

423 Returns 

424 ------- 

425 bool 

426 True if the |BaseJob| has execution attempts. 

427 """ 

428 if self.__mark_delete: 

429 return False 

430 if self.__max_attempts == 0: 

431 return True 

432 return self.__attempts < self.__max_attempts 

433 

434 

435BaseJobType = TypeVar("BaseJobType", bound=BaseJob[Any])