Coverage for scheduler/asyncio/scheduler.py: 100%

126 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2024-06-09 19:18 +0000

1""" 

2Implementation of a `asyncio` compatible in-process scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio as aio 

10import datetime as dt 

11from asyncio.selector_events import BaseSelectorEventLoop 

12from collections.abc import Iterable 

13from logging import Logger 

14from typing import Any, Callable, Coroutine, Optional 

15 

16import typeguard as tg 

17 

18from scheduler.asyncio.job import Job 

19from scheduler.base.definition import JOB_TYPE_MAPPING, JobType 

20from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag 

21from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff 

22from scheduler.base.timingtype import ( 

23 TimingCyclic, 

24 TimingDailyUnion, 

25 TimingOnceUnion, 

26 TimingWeeklyUnion, 

27) 

28from scheduler.error import SchedulerError 

29from scheduler.message import ( 

30 CYCLIC_TYPE_ERROR_MSG, 

31 DAILY_TYPE_ERROR_MSG, 

32 HOURLY_TYPE_ERROR_MSG, 

33 MINUTELY_TYPE_ERROR_MSG, 

34 ONCE_TYPE_ERROR_MSG, 

35 WEEKLY_TYPE_ERROR_MSG, 

36) 

37 

38 

39class Scheduler(BaseScheduler[Job, Callable[..., Coroutine[Any, Any, None]]]): 

40 r""" 

41 Implementation of an asyncio scheduler. 

42 

43 This implementation enables the planning of |AioJob|\ s depending on time 

44 cycles, fixed times, weekdays, dates, offsets and execution counts. 

45 

46 Notes 

47 ----- 

48 Due to the support of `datetime` objects, the |AioScheduler| is able to work 

49 with timezones. 

50 

51 Parameters 

52 ---------- 

53 loop : asyncio.selector_events.BaseSelectorEventLoop 

54 Set a AsyncIO event loop, default is the global event loop 

55 tzinfo : datetime.tzinfo 

56 Set the timezone of the |AioScheduler|. 

57 logger : Optional[logging.Logger] 

58 A custom Logger instance. 

59 """ 

60 

61 def __init__( 

62 self, 

63 *, 

64 loop: Optional[BaseSelectorEventLoop] = None, 

65 tzinfo: Optional[dt.tzinfo] = None, 

66 logger: Optional[Logger] = None, 

67 ): 

68 super().__init__(logger=logger) 

69 try: 

70 self.__loop = loop if loop else aio.get_running_loop() 

71 except RuntimeError: 

72 raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None 

73 self.__tzinfo = tzinfo 

74 self.__tz_str = check_tzname(tzinfo=tzinfo) 

75 

76 self._jobs: dict[Job, aio.Task[None]] = {} 

77 

78 def __repr__(self) -> str: 

79 return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format( 

80 ", ".join((repr(elem) for elem in (self.__tzinfo,))), 

81 ", ".join([repr(job) for job in sorted(self.jobs)]), 

82 ) 

83 

84 def __str__(self) -> str: 

85 # Scheduler meta heading 

86 scheduler_headings = "{0}, {1}\n\n".format(*self.__headings()) 

87 

88 # Job table (we join two of the Job._repr() fields into one) 

89 # columns 

90 c_align = ("<", "<", "<", "<", ">", ">") 

91 c_width = (8, 16, 19, 12, 9, 13) 

92 c_name = ( 

93 "type", 

94 "function / alias", 

95 "due at", 

96 "tzinfo", 

97 "due in", 

98 "attempts", 

99 ) 

100 form = [ 

101 f"{ {idx}:{align}{width}} " for idx, (align, width) in enumerate(zip(c_align, c_width)) 

102 ] 

103 if self.__tz_str is None: 

104 form = form[:3] + form[4:] 

105 

106 fstring = " ".join(form) + "\n" 

107 job_table = fstring.format(*c_name) 

108 job_table += fstring.format(*("-" * width for width in c_width)) 

109 for job in sorted(self.jobs): 

110 row = job._str() 

111 entries = ( 

112 row[0], 

113 str_cutoff(row[1] + row[2], c_width[1], False), 

114 row[3], 

115 str_cutoff(row[4] or "", c_width[3], False), 

116 str_cutoff(row[5], c_width[4], True), 

117 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), 

118 ) 

119 job_table += fstring.format(*entries) 

120 

121 return scheduler_headings + job_table 

122 

123 def __headings(self) -> list[str]: 

124 headings = [ 

125 f"tzinfo={self.__tz_str}", 

126 f"#jobs={len(self._jobs)}", 

127 ] 

128 return headings 

129 

130 def __schedule( 

131 self, 

132 **kwargs, 

133 ) -> Job: 

134 """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" 

135 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) 

136 

137 task = self.__loop.create_task(self.__supervise_job(job)) 

138 self._jobs[job] = task 

139 

140 return job 

141 

142 async def __supervise_job(self, job: Job) -> None: 

143 try: 

144 reference_dt = dt.datetime.now(tz=self.__tzinfo) 

145 while job.has_attempts_remaining: 

146 sleep_seconds: float = job.timedelta(reference_dt).total_seconds() 

147 await aio.sleep(sleep_seconds) 

148 

149 await job._exec(logger=self._logger) # pylint: disable=protected-access 

150 

151 reference_dt = dt.datetime.now(tz=self.__tzinfo) 

152 job._calc_next_exec(reference_dt) # pylint: disable=protected-access 

153 except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why? 

154 # raised, when `task.cancel()` in `delete_job` was run 

155 pass # pragma: no cover 

156 else: 

157 self.delete_job(job) 

158 

159 def delete_job(self, job: Job) -> None: 

160 """ 

161 Delete a `Job` from the `Scheduler`. 

162 

163 Parameters 

164 ---------- 

165 job : Job 

166 |AioJob| instance to delete. 

167 

168 Raises 

169 ------ 

170 SchedulerError 

171 Raises if the |AioJob| of the argument is not scheduled. 

172 """ 

173 try: 

174 task: aio.Task[None] = self._jobs.pop(job) 

175 _: bool = task.cancel() 

176 except KeyError: 

177 raise SchedulerError("An unscheduled Job can not be deleted!") from None 

178 

179 def delete_jobs( 

180 self, 

181 tags: Optional[set[str]] = None, 

182 any_tag: bool = False, 

183 ) -> int: 

184 r""" 

185 Delete a set of |AioJob|\ s from the |AioScheduler| by tags. 

186 

187 If no tags or an empty set of tags are given defaults to the deletion 

188 of all |AioJob|\ s. 

189 

190 Parameters 

191 ---------- 

192 tags : Optional[set[str]] 

193 Set of tags to identify target |AioJob|\ s. 

194 any_tag : bool 

195 False: To delete a |AioJob| all tags have to match. 

196 True: To delete a |AioJob| at least one tag has to match. 

197 """ 

198 all_jobs: set[Job] = set(self._jobs.keys()) 

199 jobs_to_delete: set[Job] 

200 

201 if tags is None or tags == set(): 

202 jobs_to_delete = all_jobs 

203 else: 

204 jobs_to_delete = select_jobs_by_tag(all_jobs, tags, any_tag) 

205 

206 for job in jobs_to_delete: 

207 self.delete_job(job) 

208 

209 return len(jobs_to_delete) 

210 

211 def get_jobs( 

212 self, 

213 tags: Optional[set[str]] = None, 

214 any_tag: bool = False, 

215 ) -> set[Job]: 

216 r""" 

217 Get a set of |AioJob|\ s from the |AioScheduler| by tags. 

218 

219 If no tags or an empty set of tags are given defaults to returning 

220 all |AioJob|\ s. 

221 

222 Parameters 

223 ---------- 

224 tags : set[str] 

225 Tags to filter scheduled |AioJob|\ s. 

226 If no tags are given all |AioJob|\ s are returned. 

227 any_tag : bool 

228 False: To match a |AioJob| all tags have to match. 

229 True: To match a |AioJob| at least one tag has to match. 

230 

231 Returns 

232 ------- 

233 set[Job] 

234 Currently scheduled |AioJob|\ s. 

235 """ 

236 if tags is None or tags == set(): 

237 return self.jobs 

238 return select_jobs_by_tag(self.jobs, tags, any_tag) 

239 

240 @deprecated(["delay"]) 

241 def cyclic( 

242 self, timing: TimingCyclic, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

243 ) -> Job: 

244 r""" 

245 Schedule a cyclic `Job`. 

246 

247 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects 

248 to schedule a cyclic |AioJob|. 

249 

250 Parameters 

251 ---------- 

252 timing : TimingTypeCyclic 

253 Desired execution time. 

254 handle : Callable[..., Coroutine[Any, Any, None]] 

255 Handle to a callback function. 

256 

257 Returns 

258 ------- 

259 Job 

260 Instance of a scheduled |AioJob|. 

261 

262 Other Parameters 

263 ---------------- 

264 **kwargs 

265 |AioJob| properties, optional 

266 

267 `kwargs` are used to specify |AioJob| properties. 

268 

269 Here is a list of available |AioJob| properties: 

270 

271 .. include:: ../_assets/aio_kwargs.rst 

272 """ 

273 try: 

274 tg.check_type(timing, TimingCyclic) 

275 except tg.TypeCheckError as err: 

276 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err 

277 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs) 

278 

279 @deprecated(["delay"]) 

280 def minutely( 

281 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

282 ) -> Job: 

283 r""" 

284 Schedule a minutely `Job`. 

285 

286 Use a `datetime.time` object or a `list` of `datetime.time` objects 

287 to schedule a |AioJob| every minute. 

288 

289 Notes 

290 ----- 

291 If given a `datetime.time` object with a non zero hour or minute property, these 

292 information will be ignored. 

293 

294 Parameters 

295 ---------- 

296 timing : TimingDailyUnion 

297 Desired execution time(s). 

298 handle : Callable[..., Coroutine[Any, Any, None]] 

299 Handle to a callback function. 

300 

301 Returns 

302 ------- 

303 Job 

304 Instance of a scheduled |AioJob|. 

305 

306 Other Parameters 

307 ---------------- 

308 **kwargs 

309 |AioJob| properties, optional 

310 

311 `kwargs` are used to specify |AioJob| properties. 

312 

313 Here is a list of available |AioJob| properties: 

314 

315 .. include:: ../_assets/aio_kwargs.rst 

316 """ 

317 try: 

318 tg.check_type(timing, TimingDailyUnion) 

319 except tg.TypeCheckError as err: 

320 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err 

321 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs) 

322 

323 @deprecated(["delay"]) 

324 def hourly( 

325 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

326 ) -> Job: 

327 r""" 

328 Schedule an hourly `Job`. 

329 

330 Use a `datetime.time` object or a `list` of `datetime.time` objects 

331 to schedule a |AioJob| every hour. 

332 

333 Notes 

334 ----- 

335 If given a `datetime.time` object with a non zero hour property, this information 

336 will be ignored. 

337 

338 Parameters 

339 ---------- 

340 timing : TimingDailyUnion 

341 Desired execution time(s). 

342 handle : Callable[..., Coroutine[Any, Any, None]] 

343 Handle to a callback function. 

344 

345 Returns 

346 ------- 

347 Job 

348 Instance of a scheduled |AioJob|. 

349 

350 Other Parameters 

351 ---------------- 

352 **kwargs 

353 |AioJob| properties, optional 

354 

355 `kwargs` are used to specify |AioJob| properties. 

356 

357 Here is a list of available |AioJob| properties: 

358 

359 .. include:: ../_assets/aio_kwargs.rst 

360 """ 

361 try: 

362 tg.check_type(timing, TimingDailyUnion) 

363 except tg.TypeCheckError as err: 

364 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err 

365 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs) 

366 

367 @deprecated(["delay"]) 

368 def daily( 

369 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

370 ) -> Job: 

371 r""" 

372 Schedule a daily `Job`. 

373 

374 Use a `datetime.time` object or a `list` of `datetime.time` objects 

375 to schedule a |AioJob| every day. 

376 

377 Parameters 

378 ---------- 

379 timing : TimingDailyUnion 

380 Desired execution time(s). 

381 handle : Callable[..., Coroutine[Any, Any, None]] 

382 Handle to a callback function. 

383 

384 Returns 

385 ------- 

386 Job 

387 Instance of a scheduled |AioJob|. 

388 

389 Other Parameters 

390 ---------------- 

391 **kwargs 

392 |AioJob| properties, optional 

393 

394 `kwargs` are used to specify |AioJob| properties. 

395 

396 Here is a list of available |AioJob| properties: 

397 

398 .. include:: ../_assets/aio_kwargs.rst 

399 """ 

400 try: 

401 tg.check_type(timing, TimingDailyUnion) 

402 except tg.TypeCheckError as err: 

403 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err 

404 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs) 

405 

406 @deprecated(["delay"]) 

407 def weekly( 

408 self, timing: TimingWeeklyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

409 ) -> Job: 

410 r""" 

411 Schedule a weekly `Job`. 

412 

413 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly 

414 recurring |AioJob|. Combine multiple desired `tuples` in 

415 a `list`. If the planed execution time is `00:00` the `datetime.time` object 

416 can be ignored, just pass a `Weekday` without a `tuple`. 

417 

418 Parameters 

419 ---------- 

420 timing : TimingWeeklyUnion 

421 Desired execution time(s). 

422 handle : Callable[..., Coroutine[Any, Any, None]] 

423 Handle to a callback function. 

424 

425 Returns 

426 ------- 

427 Job 

428 Instance of a scheduled |AioJob|. 

429 

430 Other Parameters 

431 ---------------- 

432 **kwargs 

433 |AioJob| properties, optional 

434 

435 `kwargs` are used to specify |AioJob| properties. 

436 

437 Here is a list of available |AioJob| properties: 

438 

439 .. include:: ../_assets/aio_kwargs.rst 

440 """ 

441 try: 

442 tg.check_type(timing, TimingWeeklyUnion) 

443 except tg.TypeCheckError as err: 

444 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err 

445 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs) 

446 

447 def once( 

448 self, 

449 timing: TimingOnceUnion, 

450 handle: Callable[..., Coroutine[Any, Any, None]], 

451 *, 

452 args: Optional[tuple[Any]] = None, 

453 kwargs: Optional[dict[str, Any]] = None, 

454 tags: Optional[Iterable[str]] = None, 

455 alias: Optional[str] = None, 

456 ) -> Job: 

457 r""" 

458 Schedule a oneshot `Job`. 

459 

460 Parameters 

461 ---------- 

462 timing : TimingOnceUnion 

463 Desired execution time. 

464 handle : Callable[..., Coroutine[Any, Any, None]] 

465 Handle to a callback function. 

466 args : tuple[Any] 

467 Positional argument payload for the function handle within a |AioJob|. 

468 kwargs : Optional[dict[str, Any]] 

469 Keyword arguments payload for the function handle within a |AioJob|. 

470 tags : Optional[Iterable[str]] 

471 The tags of the |AioJob|. 

472 alias : Optional[str] 

473 Overwrites the function handle name in the string representation. 

474 

475 Returns 

476 ------- 

477 Job 

478 Instance of a scheduled |AioJob|. 

479 """ 

480 try: 

481 tg.check_type(timing, TimingOnceUnion) 

482 except tg.TypeCheckError as err: 

483 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err 

484 if isinstance(timing, dt.datetime): 

485 return self.__schedule( 

486 job_type=JobType.CYCLIC, 

487 timing=dt.timedelta(), 

488 handle=handle, 

489 args=args, 

490 kwargs=kwargs, 

491 max_attempts=1, 

492 tags=set(tags) if tags else set(), 

493 alias=alias, 

494 delay=False, 

495 start=timing, 

496 ) 

497 return self.__schedule( 

498 job_type=JOB_TYPE_MAPPING[type(timing)], 

499 timing=timing, 

500 handle=handle, 

501 args=args, 

502 kwargs=kwargs, 

503 max_attempts=1, 

504 tags=tags, 

505 alias=alias, 

506 ) 

507 

508 @property 

509 def jobs(self) -> set[Job]: 

510 r""" 

511 Get the set of all `Job`\ s. 

512 

513 Returns 

514 ------- 

515 set[Job] 

516 Currently scheduled |AioJob|\ s. 

517 """ 

518 return set(self._jobs.keys())