Coverage for scheduler/asyncio/scheduler.py: 100%

124 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-03-19 22:07 +0000

1""" 

2Implementation of a `asyncio` compatible in-process scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio as aio 

10import datetime as dt 

11from logging import Logger 

12from typing import Any, Callable, Optional, cast 

13 

14import typeguard as tg 

15 

16from scheduler.asyncio.job import Job 

17from scheduler.base.definition import JOB_TYPE_MAPPING, JobType 

18from scheduler.base.scheduler import ( 

19 BaseJob, 

20 BaseScheduler, 

21 _warn_deprecated_delay, 

22 select_jobs_by_tag, 

23) 

24from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff 

25from scheduler.base.timingtype import ( 

26 TimingCyclic, 

27 TimingDailyUnion, 

28 TimingOnceUnion, 

29 TimingWeeklyUnion, 

30) 

31from scheduler.error import SchedulerError 

32from scheduler.message import ( 

33 CYCLIC_TYPE_ERROR_MSG, 

34 DAILY_TYPE_ERROR_MSG, 

35 HOURLY_TYPE_ERROR_MSG, 

36 MINUTELY_TYPE_ERROR_MSG, 

37 ONCE_TYPE_ERROR_MSG, 

38 WEEKLY_TYPE_ERROR_MSG, 

39) 

40 

41 

42class Scheduler(BaseScheduler): 

43 r""" 

44 Implementation of an asyncio scheduler. 

45 

46 This implementation enables the planning of |AioJob|\ s depending on time 

47 cycles, fixed times, weekdays, dates, offsets and execution counts. 

48 

49 Notes 

50 ----- 

51 Due to the support of `datetime` objects, the |AioScheduler| is able to work 

52 with timezones. 

53 

54 Parameters 

55 ---------- 

56 loop : asyncio.selector_events.BaseSelectorEventLoop 

57 Set a AsyncIO event loop, default is the global event loop 

58 tzinfo : datetime.tzinfo 

59 Set the timezone of the |AioScheduler|. 

60 logger : Optional[logging.Logger] 

61 A custom Logger instance. 

62 """ 

63 

64 def __init__( 

65 self, 

66 *, 

67 loop: Optional[aio.selector_events.BaseSelectorEventLoop] = None, 

68 tzinfo: Optional[dt.tzinfo] = None, 

69 logger: Optional[Logger] = None, 

70 ): 

71 super().__init__(logger=logger) 

72 try: 

73 self.__loop = loop if loop else aio.get_running_loop() 

74 except RuntimeError: 

75 raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None 

76 self.__tzinfo = tzinfo 

77 self.__tz_str = check_tzname(tzinfo=tzinfo) 

78 

79 self.__jobs: dict[Job, aio.Task[None]] = {} 

80 

81 def __repr__(self) -> str: 

82 return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format( 

83 ", ".join((repr(elem) for elem in (self.__tzinfo,))), 

84 ", ".join([repr(job) for job in sorted(self.jobs)]), 

85 ) 

86 

87 def __str__(self) -> str: 

88 # Scheduler meta heading 

89 scheduler_headings = "{0}, {1}\n\n".format(*self.__headings()) 

90 

91 # Job table (we join two of the Job._repr() fields into one) 

92 # columns 

93 c_align = ("<", "<", "<", "<", ">", ">") 

94 c_width = (8, 16, 19, 12, 9, 13) 

95 c_name = ( 

96 "type", 

97 "function / alias", 

98 "due at", 

99 "tzinfo", 

100 "due in", 

101 "attempts", 

102 ) 

103 form = [ 

104 f"{{{idx}:{align}{width}}}" for idx, (align, width) in enumerate(zip(c_align, c_width)) 

105 ] 

106 if self.__tz_str is None: 

107 form = form[:3] + form[4:] 

108 

109 fstring = " ".join(form) + "\n" 

110 job_table = fstring.format(*c_name) 

111 job_table += fstring.format(*("-" * width for width in c_width)) 

112 for job in sorted(self.jobs): 

113 row = job._str() 

114 entries = ( 

115 row[0], 

116 str_cutoff(row[1] + row[2], c_width[1], False), 

117 row[3], 

118 str_cutoff(row[4] or "", c_width[3], False), 

119 str_cutoff(row[5], c_width[4], True), 

120 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), 

121 ) 

122 job_table += fstring.format(*entries) 

123 

124 return scheduler_headings + job_table 

125 

126 def __headings(self) -> list[str]: 

127 headings = [ 

128 f"tzinfo={self.__tz_str}", 

129 f"#jobs={len(self.__jobs)}", 

130 ] 

131 return headings 

132 

133 def __schedule( 

134 self, 

135 **kwargs, 

136 ) -> Job: 

137 """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" 

138 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) 

139 

140 task = self.__loop.create_task(self.__supervise_job(job)) 

141 self.__jobs[job] = task 

142 

143 return job 

144 

145 async def __supervise_job(self, job: Job) -> None: 

146 try: 

147 reference_dt = dt.datetime.now(tz=self.__tzinfo) 

148 while job.has_attempts_remaining: 

149 sleep_seconds: float = job.timedelta(reference_dt).total_seconds() 

150 await aio.sleep(sleep_seconds) 

151 

152 await job._exec( 

153 logger=self._BaseScheduler__logger 

154 ) # pylint: disable=protected-access 

155 

156 reference_dt = dt.datetime.now(tz=self.__tzinfo) 

157 job._calc_next_exec(reference_dt) # pylint: disable=protected-access 

158 except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why? 

159 # raised, when `task.cancel()` in `delete_job` was run 

160 pass # pragma: no cover 

161 else: 

162 self.delete_job(job) 

163 

164 def delete_job(self, job: Job) -> None: 

165 """ 

166 Delete a `Job` from the `Scheduler`. 

167 

168 Parameters 

169 ---------- 

170 job : Job 

171 |AioJob| instance to delete. 

172 

173 Raises 

174 ------ 

175 SchedulerError 

176 Raises if the |AioJob| of the argument is not scheduled. 

177 """ 

178 try: 

179 task: aio.Task[None] = self.__jobs.pop(job) 

180 _: bool = task.cancel() 

181 except KeyError: 

182 raise SchedulerError("An unscheduled Job can not be deleted!") from None 

183 

184 def delete_jobs( 

185 self, 

186 tags: Optional[set[str]] = None, 

187 any_tag: bool = False, 

188 ) -> int: 

189 r""" 

190 Delete a set of |AioJob|\ s from the |AioScheduler| by tags. 

191 

192 If no tags or an empty set of tags are given defaults to the deletion 

193 of all |AioJob|\ s. 

194 

195 Parameters 

196 ---------- 

197 tags : Optional[set[str]] 

198 Set of tags to identify target |AioJob|\ s. 

199 any_tag : bool 

200 False: To delete a |AioJob| all tags have to match. 

201 True: To delete a |AioJob| at least one tag has to match. 

202 """ 

203 all_jobs: set[Job] = set(self.__jobs.keys()) 

204 jobs_to_delete: set[Job] 

205 

206 if tags is None or tags == {}: 

207 jobs_to_delete = all_jobs 

208 else: 

209 jobs_to_delete = cast( 

210 set[Job], select_jobs_by_tag(cast(set[BaseJob], all_jobs), tags, any_tag) 

211 ) 

212 

213 for job in jobs_to_delete: 

214 self.delete_job(job) 

215 

216 return len(jobs_to_delete) 

217 

218 def get_jobs( 

219 self, 

220 tags: Optional[set[str]] = None, 

221 any_tag: bool = False, 

222 ) -> set[Job]: 

223 r""" 

224 Get a set of |AioJob|\ s from the |AioScheduler| by tags. 

225 

226 If no tags or an empty set of tags are given defaults to returning 

227 all |AioJob|\ s. 

228 

229 Parameters 

230 ---------- 

231 tags : set[str] 

232 Tags to filter scheduled |AioJob|\ s. 

233 If no tags are given all |AioJob|\ s are returned. 

234 any_tag : bool 

235 False: To match a |AioJob| all tags have to match. 

236 True: To match a |AioJob| at least one tag has to match. 

237 

238 Returns 

239 ------- 

240 set[Job] 

241 Currently scheduled |AioJob|\ s. 

242 """ 

243 if tags is None or tags == {}: 

244 return self.jobs 

245 return cast(set[Job], select_jobs_by_tag(cast(set[BaseJob], self.jobs), tags, any_tag)) 

246 

247 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job: 

248 r""" 

249 Schedule a cyclic `Job`. 

250 

251 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects 

252 to schedule a cyclic |AioJob|. 

253 

254 Parameters 

255 ---------- 

256 timing : TimingTypeCyclic 

257 Desired execution time. 

258 handle : Callable[..., None] 

259 Handle to a callback function. 

260 

261 Returns 

262 ------- 

263 Job 

264 Instance of a scheduled |AioJob|. 

265 

266 Other Parameters 

267 ---------------- 

268 **kwargs 

269 |AioJob| properties, optional 

270 

271 `kwargs` are used to specify |AioJob| properties. 

272 

273 Here is a list of available |AioJob| properties: 

274 

275 .. include:: ../_assets/aio_kwargs.rst 

276 """ 

277 _warn_deprecated_delay(**kwargs) 

278 try: 

279 tg.check_type(timing, TimingCyclic) 

280 except tg.TypeCheckError as err: 

281 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err 

282 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs) 

283 

284 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

285 r""" 

286 Schedule a minutely `Job`. 

287 

288 Use a `datetime.time` object or a `list` of `datetime.time` objects 

289 to schedule a |AioJob| every minute. 

290 

291 Notes 

292 ----- 

293 If given a `datetime.time` object with a non zero hour or minute property, these 

294 information will be ignored. 

295 

296 Parameters 

297 ---------- 

298 timing : TimingDailyUnion 

299 Desired execution time(s). 

300 handle : Callable[..., None] 

301 Handle to a callback function. 

302 

303 Returns 

304 ------- 

305 Job 

306 Instance of a scheduled |AioJob|. 

307 

308 Other Parameters 

309 ---------------- 

310 **kwargs 

311 |AioJob| properties, optional 

312 

313 `kwargs` are used to specify |AioJob| properties. 

314 

315 Here is a list of available |AioJob| properties: 

316 

317 .. include:: ../_assets/aio_kwargs.rst 

318 """ 

319 _warn_deprecated_delay(**kwargs) 

320 try: 

321 tg.check_type(timing, TimingDailyUnion) 

322 except tg.TypeCheckError as err: 

323 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err 

324 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs) 

325 

326 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

327 r""" 

328 Schedule an hourly `Job`. 

329 

330 Use a `datetime.time` object or a `list` of `datetime.time` objects 

331 to schedule a |AioJob| every hour. 

332 

333 Notes 

334 ----- 

335 If given a `datetime.time` object with a non zero hour property, this information 

336 will be ignored. 

337 

338 Parameters 

339 ---------- 

340 timing : TimingDailyUnion 

341 Desired execution time(s). 

342 handle : Callable[..., None] 

343 Handle to a callback function. 

344 

345 Returns 

346 ------- 

347 Job 

348 Instance of a scheduled |AioJob|. 

349 

350 Other Parameters 

351 ---------------- 

352 **kwargs 

353 |AioJob| properties, optional 

354 

355 `kwargs` are used to specify |AioJob| properties. 

356 

357 Here is a list of available |AioJob| properties: 

358 

359 .. include:: ../_assets/aio_kwargs.rst 

360 """ 

361 _warn_deprecated_delay(**kwargs) 

362 try: 

363 tg.check_type(timing, TimingDailyUnion) 

364 except tg.TypeCheckError as err: 

365 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err 

366 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs) 

367 

368 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

369 r""" 

370 Schedule a daily `Job`. 

371 

372 Use a `datetime.time` object or a `list` of `datetime.time` objects 

373 to schedule a |AioJob| every day. 

374 

375 Parameters 

376 ---------- 

377 timing : TimingDailyUnion 

378 Desired execution time(s). 

379 handle : Callable[..., None] 

380 Handle to a callback function. 

381 

382 Returns 

383 ------- 

384 Job 

385 Instance of a scheduled |AioJob|. 

386 

387 Other Parameters 

388 ---------------- 

389 **kwargs 

390 |AioJob| properties, optional 

391 

392 `kwargs` are used to specify |AioJob| properties. 

393 

394 Here is a list of available |AioJob| properties: 

395 

396 .. include:: ../_assets/aio_kwargs.rst 

397 """ 

398 _warn_deprecated_delay(**kwargs) 

399 try: 

400 tg.check_type(timing, TimingDailyUnion) 

401 except tg.TypeCheckError as err: 

402 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err 

403 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs) 

404 

405 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job: 

406 r""" 

407 Schedule a weekly `Job`. 

408 

409 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly 

410 recurring |AioJob|. Combine multiple desired `tuples` in 

411 a `list`. If the planed execution time is `00:00` the `datetime.time` object 

412 can be ignored, just pass a `Weekday` without a `tuple`. 

413 

414 Parameters 

415 ---------- 

416 timing : TimingWeeklyUnion 

417 Desired execution time(s). 

418 handle : Callable[..., None] 

419 Handle to a callback function. 

420 

421 Returns 

422 ------- 

423 Job 

424 Instance of a scheduled |AioJob|. 

425 

426 Other Parameters 

427 ---------------- 

428 **kwargs 

429 |AioJob| properties, optional 

430 

431 `kwargs` are used to specify |AioJob| properties. 

432 

433 Here is a list of available |AioJob| properties: 

434 

435 .. include:: ../_assets/aio_kwargs.rst 

436 """ 

437 _warn_deprecated_delay(**kwargs) 

438 try: 

439 tg.check_type(timing, TimingWeeklyUnion) 

440 except tg.TypeCheckError as err: 

441 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err 

442 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs) 

443 

444 def once( 

445 self, 

446 timing: TimingOnceUnion, 

447 handle: Callable[..., None], 

448 *, 

449 args: tuple[Any] = None, 

450 kwargs: Optional[dict[str, Any]] = None, 

451 tags: Optional[list[str]] = None, 

452 alias: str = None, 

453 ) -> Job: 

454 r""" 

455 Schedule a oneshot `Job`. 

456 

457 Parameters 

458 ---------- 

459 timing : TimingOnceUnion 

460 Desired execution time. 

461 handle : Callable[..., None] 

462 Handle to a callback function. 

463 args : tuple[Any] 

464 Positional argument payload for the function handle within a |AioJob|. 

465 kwargs : Optional[dict[str, Any]] 

466 Keyword arguments payload for the function handle within a |AioJob|. 

467 tags : Optional[set[str]] 

468 The tags of the |AioJob|. 

469 alias : Optional[str] 

470 Overwrites the function handle name in the string representation. 

471 

472 Returns 

473 ------- 

474 Job 

475 Instance of a scheduled |AioJob|. 

476 """ 

477 try: 

478 tg.check_type(timing, TimingOnceUnion) 

479 except tg.TypeCheckError as err: 

480 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err 

481 if isinstance(timing, dt.datetime): 

482 return self.__schedule( 

483 job_type=JobType.CYCLIC, 

484 timing=dt.timedelta(), 

485 handle=handle, 

486 args=args, 

487 kwargs=kwargs, 

488 max_attempts=1, 

489 tags=tags, 

490 alias=alias, 

491 delay=False, 

492 start=timing, 

493 ) 

494 return self.__schedule( 

495 job_type=JOB_TYPE_MAPPING[type(timing)], 

496 timing=timing, 

497 handle=handle, 

498 args=args, 

499 kwargs=kwargs, 

500 max_attempts=1, 

501 tags=tags, 

502 alias=alias, 

503 ) 

504 

505 @property 

506 def jobs(self) -> set[Job]: 

507 r""" 

508 Get the set of all `Job`\ s. 

509 

510 Returns 

511 ------- 

512 set[Job] 

513 Currently scheduled |AioJob|\ s. 

514 """ 

515 return set(self.__jobs.keys())