Coverage for scheduler/asyncio/scheduler.py: 100%

124 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-12-10 22:04 +0000

1""" 

2Implementation of a `asyncio` compatible in-process scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio as aio 

10import datetime as dt 

11from logging import Logger 

12from typing import Any, Callable, Optional, cast 

13 

14import typeguard as tg 

15 

16from scheduler.asyncio.job import Job 

17from scheduler.base.definition import JOB_TYPE_MAPPING, JobType 

18from scheduler.base.scheduler import ( 

19 BaseJob, 

20 BaseScheduler, 

21 deprecated, 

22 select_jobs_by_tag, 

23) 

24from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff 

25from scheduler.base.timingtype import ( 

26 TimingCyclic, 

27 TimingDailyUnion, 

28 TimingOnceUnion, 

29 TimingWeeklyUnion, 

30) 

31from scheduler.error import SchedulerError 

32from scheduler.message import ( 

33 CYCLIC_TYPE_ERROR_MSG, 

34 DAILY_TYPE_ERROR_MSG, 

35 HOURLY_TYPE_ERROR_MSG, 

36 MINUTELY_TYPE_ERROR_MSG, 

37 ONCE_TYPE_ERROR_MSG, 

38 WEEKLY_TYPE_ERROR_MSG, 

39) 

40 

41 

42class Scheduler(BaseScheduler): 

43 r""" 

44 Implementation of an asyncio scheduler. 

45 

46 This implementation enables the planning of |AioJob|\ s depending on time 

47 cycles, fixed times, weekdays, dates, offsets and execution counts. 

48 

49 Notes 

50 ----- 

51 Due to the support of `datetime` objects, the |AioScheduler| is able to work 

52 with timezones. 

53 

54 Parameters 

55 ---------- 

56 loop : asyncio.selector_events.BaseSelectorEventLoop 

57 Set a AsyncIO event loop, default is the global event loop 

58 tzinfo : datetime.tzinfo 

59 Set the timezone of the |AioScheduler|. 

60 logger : Optional[logging.Logger] 

61 A custom Logger instance. 

62 """ 

63 

64 def __init__( 

65 self, 

66 *, 

67 loop: Optional[aio.selector_events.BaseSelectorEventLoop] = None, 

68 tzinfo: Optional[dt.tzinfo] = None, 

69 logger: Optional[Logger] = None, 

70 ): 

71 super().__init__(logger=logger) 

72 try: 

73 self.__loop = loop if loop else aio.get_running_loop() 

74 except RuntimeError: 

75 raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None 

76 self.__tzinfo = tzinfo 

77 self.__tz_str = check_tzname(tzinfo=tzinfo) 

78 

79 self.__jobs: dict[Job, aio.Task[None]] = {} 

80 

81 def __repr__(self) -> str: 

82 return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format( 

83 ", ".join((repr(elem) for elem in (self.__tzinfo,))), 

84 ", ".join([repr(job) for job in sorted(self.jobs)]), 

85 ) 

86 

87 def __str__(self) -> str: 

88 # Scheduler meta heading 

89 scheduler_headings = "{0}, {1}\n\n".format(*self.__headings()) 

90 

91 # Job table (we join two of the Job._repr() fields into one) 

92 # columns 

93 c_align = ("<", "<", "<", "<", ">", ">") 

94 c_width = (8, 16, 19, 12, 9, 13) 

95 c_name = ( 

96 "type", 

97 "function / alias", 

98 "due at", 

99 "tzinfo", 

100 "due in", 

101 "attempts", 

102 ) 

103 form = [ 

104 f"{ {idx}:{align}{width}} " for idx, (align, width) in enumerate(zip(c_align, c_width)) 

105 ] 

106 if self.__tz_str is None: 

107 form = form[:3] + form[4:] 

108 

109 fstring = " ".join(form) + "\n" 

110 job_table = fstring.format(*c_name) 

111 job_table += fstring.format(*("-" * width for width in c_width)) 

112 for job in sorted(self.jobs): 

113 row = job._str() 

114 entries = ( 

115 row[0], 

116 str_cutoff(row[1] + row[2], c_width[1], False), 

117 row[3], 

118 str_cutoff(row[4] or "", c_width[3], False), 

119 str_cutoff(row[5], c_width[4], True), 

120 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), 

121 ) 

122 job_table += fstring.format(*entries) 

123 

124 return scheduler_headings + job_table 

125 

126 def __headings(self) -> list[str]: 

127 headings = [ 

128 f"tzinfo={self.__tz_str}", 

129 f"#jobs={len(self.__jobs)}", 

130 ] 

131 return headings 

132 

133 def __schedule( 

134 self, 

135 **kwargs, 

136 ) -> Job: 

137 """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" 

138 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) 

139 

140 task = self.__loop.create_task(self.__supervise_job(job)) 

141 self.__jobs[job] = task 

142 

143 return job 

144 

145 async def __supervise_job(self, job: Job) -> None: 

146 try: 

147 reference_dt = dt.datetime.now(tz=self.__tzinfo) 

148 while job.has_attempts_remaining: 

149 sleep_seconds: float = job.timedelta(reference_dt).total_seconds() 

150 await aio.sleep(sleep_seconds) 

151 

152 await job._exec( 

153 logger=self._BaseScheduler__logger 

154 ) # pylint: disable=protected-access 

155 

156 reference_dt = dt.datetime.now(tz=self.__tzinfo) 

157 job._calc_next_exec(reference_dt) # pylint: disable=protected-access 

158 except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why? 

159 # raised, when `task.cancel()` in `delete_job` was run 

160 pass # pragma: no cover 

161 else: 

162 self.delete_job(job) 

163 

164 def delete_job(self, job: Job) -> None: 

165 """ 

166 Delete a `Job` from the `Scheduler`. 

167 

168 Parameters 

169 ---------- 

170 job : Job 

171 |AioJob| instance to delete. 

172 

173 Raises 

174 ------ 

175 SchedulerError 

176 Raises if the |AioJob| of the argument is not scheduled. 

177 """ 

178 try: 

179 task: aio.Task[None] = self.__jobs.pop(job) 

180 _: bool = task.cancel() 

181 except KeyError: 

182 raise SchedulerError("An unscheduled Job can not be deleted!") from None 

183 

184 def delete_jobs( 

185 self, 

186 tags: Optional[set[str]] = None, 

187 any_tag: bool = False, 

188 ) -> int: 

189 r""" 

190 Delete a set of |AioJob|\ s from the |AioScheduler| by tags. 

191 

192 If no tags or an empty set of tags are given defaults to the deletion 

193 of all |AioJob|\ s. 

194 

195 Parameters 

196 ---------- 

197 tags : Optional[set[str]] 

198 Set of tags to identify target |AioJob|\ s. 

199 any_tag : bool 

200 False: To delete a |AioJob| all tags have to match. 

201 True: To delete a |AioJob| at least one tag has to match. 

202 """ 

203 all_jobs: set[Job] = set(self.__jobs.keys()) 

204 jobs_to_delete: set[Job] 

205 

206 if tags is None or tags == {}: 

207 jobs_to_delete = all_jobs 

208 else: 

209 jobs_to_delete = cast( 

210 set[Job], select_jobs_by_tag(cast(set[BaseJob], all_jobs), tags, any_tag) 

211 ) 

212 

213 for job in jobs_to_delete: 

214 self.delete_job(job) 

215 

216 return len(jobs_to_delete) 

217 

218 def get_jobs( 

219 self, 

220 tags: Optional[set[str]] = None, 

221 any_tag: bool = False, 

222 ) -> set[Job]: 

223 r""" 

224 Get a set of |AioJob|\ s from the |AioScheduler| by tags. 

225 

226 If no tags or an empty set of tags are given defaults to returning 

227 all |AioJob|\ s. 

228 

229 Parameters 

230 ---------- 

231 tags : set[str] 

232 Tags to filter scheduled |AioJob|\ s. 

233 If no tags are given all |AioJob|\ s are returned. 

234 any_tag : bool 

235 False: To match a |AioJob| all tags have to match. 

236 True: To match a |AioJob| at least one tag has to match. 

237 

238 Returns 

239 ------- 

240 set[Job] 

241 Currently scheduled |AioJob|\ s. 

242 """ 

243 if tags is None or tags == {}: 

244 return self.jobs 

245 return cast(set[Job], select_jobs_by_tag(cast(set[BaseJob], self.jobs), tags, any_tag)) 

246 

247 @deprecated(["delay"]) 

248 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job: 

249 r""" 

250 Schedule a cyclic `Job`. 

251 

252 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects 

253 to schedule a cyclic |AioJob|. 

254 

255 Parameters 

256 ---------- 

257 timing : TimingTypeCyclic 

258 Desired execution time. 

259 handle : Callable[..., None] 

260 Handle to a callback function. 

261 

262 Returns 

263 ------- 

264 Job 

265 Instance of a scheduled |AioJob|. 

266 

267 Other Parameters 

268 ---------------- 

269 **kwargs 

270 |AioJob| properties, optional 

271 

272 `kwargs` are used to specify |AioJob| properties. 

273 

274 Here is a list of available |AioJob| properties: 

275 

276 .. include:: ../_assets/aio_kwargs.rst 

277 """ 

278 try: 

279 tg.check_type(timing, TimingCyclic) 

280 except tg.TypeCheckError as err: 

281 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err 

282 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs) 

283 

284 @deprecated(["delay"]) 

285 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

286 r""" 

287 Schedule a minutely `Job`. 

288 

289 Use a `datetime.time` object or a `list` of `datetime.time` objects 

290 to schedule a |AioJob| every minute. 

291 

292 Notes 

293 ----- 

294 If given a `datetime.time` object with a non zero hour or minute property, these 

295 information will be ignored. 

296 

297 Parameters 

298 ---------- 

299 timing : TimingDailyUnion 

300 Desired execution time(s). 

301 handle : Callable[..., None] 

302 Handle to a callback function. 

303 

304 Returns 

305 ------- 

306 Job 

307 Instance of a scheduled |AioJob|. 

308 

309 Other Parameters 

310 ---------------- 

311 **kwargs 

312 |AioJob| properties, optional 

313 

314 `kwargs` are used to specify |AioJob| properties. 

315 

316 Here is a list of available |AioJob| properties: 

317 

318 .. include:: ../_assets/aio_kwargs.rst 

319 """ 

320 try: 

321 tg.check_type(timing, TimingDailyUnion) 

322 except tg.TypeCheckError as err: 

323 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err 

324 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs) 

325 

326 @deprecated(["delay"]) 

327 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

328 r""" 

329 Schedule an hourly `Job`. 

330 

331 Use a `datetime.time` object or a `list` of `datetime.time` objects 

332 to schedule a |AioJob| every hour. 

333 

334 Notes 

335 ----- 

336 If given a `datetime.time` object with a non zero hour property, this information 

337 will be ignored. 

338 

339 Parameters 

340 ---------- 

341 timing : TimingDailyUnion 

342 Desired execution time(s). 

343 handle : Callable[..., None] 

344 Handle to a callback function. 

345 

346 Returns 

347 ------- 

348 Job 

349 Instance of a scheduled |AioJob|. 

350 

351 Other Parameters 

352 ---------------- 

353 **kwargs 

354 |AioJob| properties, optional 

355 

356 `kwargs` are used to specify |AioJob| properties. 

357 

358 Here is a list of available |AioJob| properties: 

359 

360 .. include:: ../_assets/aio_kwargs.rst 

361 """ 

362 try: 

363 tg.check_type(timing, TimingDailyUnion) 

364 except tg.TypeCheckError as err: 

365 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err 

366 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs) 

367 

368 @deprecated(["delay"]) 

369 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

370 r""" 

371 Schedule a daily `Job`. 

372 

373 Use a `datetime.time` object or a `list` of `datetime.time` objects 

374 to schedule a |AioJob| every day. 

375 

376 Parameters 

377 ---------- 

378 timing : TimingDailyUnion 

379 Desired execution time(s). 

380 handle : Callable[..., None] 

381 Handle to a callback function. 

382 

383 Returns 

384 ------- 

385 Job 

386 Instance of a scheduled |AioJob|. 

387 

388 Other Parameters 

389 ---------------- 

390 **kwargs 

391 |AioJob| properties, optional 

392 

393 `kwargs` are used to specify |AioJob| properties. 

394 

395 Here is a list of available |AioJob| properties: 

396 

397 .. include:: ../_assets/aio_kwargs.rst 

398 """ 

399 try: 

400 tg.check_type(timing, TimingDailyUnion) 

401 except tg.TypeCheckError as err: 

402 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err 

403 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs) 

404 

405 @deprecated(["delay"]) 

406 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job: 

407 r""" 

408 Schedule a weekly `Job`. 

409 

410 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly 

411 recurring |AioJob|. Combine multiple desired `tuples` in 

412 a `list`. If the planed execution time is `00:00` the `datetime.time` object 

413 can be ignored, just pass a `Weekday` without a `tuple`. 

414 

415 Parameters 

416 ---------- 

417 timing : TimingWeeklyUnion 

418 Desired execution time(s). 

419 handle : Callable[..., None] 

420 Handle to a callback function. 

421 

422 Returns 

423 ------- 

424 Job 

425 Instance of a scheduled |AioJob|. 

426 

427 Other Parameters 

428 ---------------- 

429 **kwargs 

430 |AioJob| properties, optional 

431 

432 `kwargs` are used to specify |AioJob| properties. 

433 

434 Here is a list of available |AioJob| properties: 

435 

436 .. include:: ../_assets/aio_kwargs.rst 

437 """ 

438 try: 

439 tg.check_type(timing, TimingWeeklyUnion) 

440 except tg.TypeCheckError as err: 

441 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err 

442 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs) 

443 

444 def once( 

445 self, 

446 timing: TimingOnceUnion, 

447 handle: Callable[..., None], 

448 *, 

449 args: tuple[Any] = None, 

450 kwargs: Optional[dict[str, Any]] = None, 

451 tags: Optional[list[str]] = None, 

452 alias: str = None, 

453 ) -> Job: 

454 r""" 

455 Schedule a oneshot `Job`. 

456 

457 Parameters 

458 ---------- 

459 timing : TimingOnceUnion 

460 Desired execution time. 

461 handle : Callable[..., None] 

462 Handle to a callback function. 

463 args : tuple[Any] 

464 Positional argument payload for the function handle within a |AioJob|. 

465 kwargs : Optional[dict[str, Any]] 

466 Keyword arguments payload for the function handle within a |AioJob|. 

467 tags : Optional[set[str]] 

468 The tags of the |AioJob|. 

469 alias : Optional[str] 

470 Overwrites the function handle name in the string representation. 

471 

472 Returns 

473 ------- 

474 Job 

475 Instance of a scheduled |AioJob|. 

476 """ 

477 try: 

478 tg.check_type(timing, TimingOnceUnion) 

479 except tg.TypeCheckError as err: 

480 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err 

481 if isinstance(timing, dt.datetime): 

482 return self.__schedule( 

483 job_type=JobType.CYCLIC, 

484 timing=dt.timedelta(), 

485 handle=handle, 

486 args=args, 

487 kwargs=kwargs, 

488 max_attempts=1, 

489 tags=tags, 

490 alias=alias, 

491 delay=False, 

492 start=timing, 

493 ) 

494 return self.__schedule( 

495 job_type=JOB_TYPE_MAPPING[type(timing)], 

496 timing=timing, 

497 handle=handle, 

498 args=args, 

499 kwargs=kwargs, 

500 max_attempts=1, 

501 tags=tags, 

502 alias=alias, 

503 ) 

504 

505 @property 

506 def jobs(self) -> set[Job]: 

507 r""" 

508 Get the set of all `Job`\ s. 

509 

510 Returns 

511 ------- 

512 set[Job] 

513 Currently scheduled |AioJob|\ s. 

514 """ 

515 return set(self.__jobs.keys())