Coverage for scheduler/threading/scheduler.py: 100%

164 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-01-09 20:37 +0000

1""" 

2Implementation of a `threading` compatible in-process scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import datetime as dt 

8import queue 

9import threading 

10from logging import Logger 

11from typing import Any, Callable, Optional 

12 

13import typeguard as tg 

14 

15from scheduler.base.definition import JOB_TYPE_MAPPING, JobType 

16from scheduler.base.job import BaseJob 

17from scheduler.base.scheduler import BaseScheduler, _warn_deprecated_delay, select_jobs_by_tag 

18from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff 

19from scheduler.base.timingtype import ( 

20 TimingCyclic, 

21 TimingDailyUnion, 

22 TimingOnceUnion, 

23 TimingWeeklyUnion, 

24) 

25from scheduler.error import SchedulerError 

26from scheduler.message import ( 

27 CYCLIC_TYPE_ERROR_MSG, 

28 DAILY_TYPE_ERROR_MSG, 

29 HOURLY_TYPE_ERROR_MSG, 

30 MINUTELY_TYPE_ERROR_MSG, 

31 ONCE_TYPE_ERROR_MSG, 

32 TZ_ERROR_MSG, 

33 WEEKLY_TYPE_ERROR_MSG, 

34) 

35from scheduler.prioritization import linear_priority_function 

36from scheduler.threading.job import Job 

37 

38 

39def _exec_job_worker(que: queue.Queue[Job], logger: Logger): 

40 running = True 

41 while running: 

42 try: 

43 job = que.get(block=False) 

44 except queue.Empty: 

45 running = False 

46 else: 

47 job._exec(logger=logger) # pylint: disable=protected-access 

48 que.task_done() 

49 

50 

51class Scheduler(BaseScheduler): 

52 r""" 

53 Implementation of a scheduler for callback functions. 

54 

55 This implementation enables the planning of |Job|\ s depending on time 

56 cycles, fixed times, weekdays, dates, offsets, execution counts and weights. 

57 

58 Notes 

59 ----- 

60 Due to the support of `datetime` objects, `scheduler` is able to work 

61 with timezones. 

62 

63 Parameters 

64 ---------- 

65 tzinfo : datetime.tzinfo 

66 Set the timezone of the |Scheduler|. 

67 max_exec : int 

68 Limits the number of overdue |Job|\ s that can be executed 

69 by calling function `Scheduler.exec_jobs()`. 

70 priority_function : Callable[[float, Job, int, int], float] 

71 A function handle to compute the priority of a |Job| depending 

72 on the time it is overdue and its respective weight. Defaults to a linear 

73 priority function. 

74 jobs : set[Job] 

75 A collection of job instances. 

76 n_threads : int 

77 The number of worker threads. 0 for unlimited, default 1. 

78 logger : Optional[logging.Logger] 

79 A custom Logger instance. 

80 """ 

81 

82 def __init__( 

83 self, 

84 *, 

85 max_exec: int = 0, 

86 tzinfo: Optional[dt.tzinfo] = None, 

87 priority_function: Callable[ 

88 [float, BaseJob, int, int], 

89 float, 

90 ] = linear_priority_function, 

91 jobs: Optional[set[Job]] = None, 

92 n_threads: int = 1, 

93 logger: Optional[Logger] = None, 

94 ): 

95 super().__init__(logger=logger) 

96 self.__max_exec = max_exec 

97 self.__tzinfo = tzinfo 

98 self.__priority_function = priority_function 

99 self.__jobs_lock = threading.RLock() 

100 self.__jobs: set[Job] = jobs or set() 

101 for job in self.__jobs: 

102 if job._tzinfo != self.__tzinfo: 

103 raise SchedulerError(TZ_ERROR_MSG) 

104 

105 self.__n_threads = n_threads 

106 self.__tz_str = check_tzname(tzinfo=tzinfo) 

107 

108 def __repr__(self) -> str: 

109 with self.__jobs_lock: 

110 return "scheduler.Scheduler({0}, jobs={{{1}}})".format( 

111 ", ".join( 

112 ( 

113 repr(elem) 

114 for elem in ( 

115 self.__max_exec, 

116 self.__tzinfo, 

117 self.__priority_function, 

118 ) 

119 ) 

120 ), 

121 ", ".join([repr(job) for job in sorted(self.jobs)]), 

122 ) 

123 

124 def __str__(self) -> str: 

125 with self.__jobs_lock: 

126 # Scheduler meta heading 

127 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings()) 

128 

129 # Job table (we join two of the Job._repr() fields into one) 

130 # columns 

131 c_align = ("<", "<", "<", "<", ">", ">", ">") 

132 c_width = (8, 16, 19, 12, 9, 13, 6) 

133 c_name = ( 

134 "type", 

135 "function / alias", 

136 "due at", 

137 "tzinfo", 

138 "due in", 

139 "attempts", 

140 "weight", 

141 ) 

142 form = [ 

143 f"{{{idx}:{align}{width}}}" 

144 for idx, (align, width) in enumerate(zip(c_align, c_width)) 

145 ] 

146 if self.__tz_str is None: 

147 form = form[:3] + form[4:] 

148 

149 fstring = " ".join(form) + "\n" 

150 job_table = fstring.format(*c_name) + fstring.format( 

151 *("-" * width for width in c_width) 

152 ) 

153 for job in sorted(self.jobs): 

154 row = job._str() 

155 entries = ( 

156 row[0], 

157 str_cutoff(row[1] + row[2], c_width[1], False), 

158 row[3], 

159 str_cutoff(row[4] or "", c_width[3], False), 

160 str_cutoff(row[5], c_width[4], True), 

161 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), 

162 str_cutoff(f"{job.weight}", c_width[6], True), 

163 ) 

164 job_table += fstring.format(*entries) 

165 

166 return scheduler_headings + job_table 

167 

168 def __headings(self) -> list[str]: 

169 with self.__jobs_lock: 

170 headings = [ 

171 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}", 

172 f"tzinfo={self.__tz_str}", 

173 f"priority_function={self.__priority_function.__name__}", 

174 f"#jobs={len(self.__jobs)}", 

175 ] 

176 return headings 

177 

178 def __schedule( 

179 self, 

180 **kwargs, 

181 ) -> Job: 

182 """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" 

183 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) 

184 if job.has_attempts_remaining: 

185 with self.__jobs_lock: 

186 self.__jobs.add(job) 

187 return job 

188 

189 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int: 

190 n_jobs = len(jobs) 

191 

192 que: queue.Queue[Job] = queue.Queue() 

193 for job in jobs: 

194 que.put(job) 

195 

196 workers = [] 

197 for _ in range(self.__n_threads or n_jobs): 

198 worker = threading.Thread( 

199 target=_exec_job_worker, args=(que, self._BaseScheduler__logger) 

200 ) 

201 worker.daemon = True 

202 worker.start() 

203 workers.append(worker) 

204 

205 que.join() 

206 for worker in workers: 

207 worker.join() 

208 

209 for job in jobs: 

210 job._calc_next_exec(ref_dt) # pylint: disable=protected-access 

211 if not job.has_attempts_remaining: 

212 self.delete_job(job) 

213 

214 return n_jobs 

215 

216 def exec_jobs(self, force_exec_all: bool = False) -> int: 

217 r""" 

218 Execute scheduled `Job`\ s. 

219 

220 By default executes the |Job|\ s that are overdue. 

221 

222 |Job|\ s are executed in order of their priority 

223 :ref:`examples.weights`. If the |Scheduler| instance 

224 has a limit on the job execution counts per call of 

225 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument, 

226 |Job|\ s of lower priority might not get executed when 

227 competing |Job|\ s are overdue. 

228 

229 Parameters 

230 ---------- 

231 force_exec_all : bool 

232 Ignore the both - the status of the |Job| timers 

233 as well as the execution limit of the |Scheduler| 

234 

235 Returns 

236 ------- 

237 int 

238 Number of executed |Job|\ s. 

239 """ 

240 ref_dt = dt.datetime.now(tz=self.__tzinfo) 

241 

242 if force_exec_all: 

243 return self.__exec_jobs(list(self.__jobs), ref_dt) 

244 # collect the current priority for all jobs 

245 

246 job_priority: dict[Job, float] = {} 

247 n_jobs = len(self.__jobs) 

248 with self.__jobs_lock: 

249 for job in self.__jobs: 

250 delta_seconds = job.timedelta(ref_dt).total_seconds() 

251 job_priority[job] = self.__priority_function( 

252 -delta_seconds, 

253 job, 

254 self.__max_exec, 

255 n_jobs, 

256 ) 

257 # sort the jobs by priority 

258 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore 

259 # filter jobs by max_exec and priority greater zero 

260 filtered_jobs = [ 

261 job 

262 for idx, job in enumerate(sorted_jobs) 

263 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0 

264 ] 

265 return self.__exec_jobs(filtered_jobs, ref_dt) 

266 

267 def delete_job(self, job: Job) -> None: 

268 """ 

269 Delete a `Job` from the `Scheduler`. 

270 

271 Parameters 

272 ---------- 

273 job : Job 

274 |Job| instance to delete. 

275 

276 Raises 

277 ------ 

278 SchedulerError 

279 Raises if the |Job| of the argument is not scheduled. 

280 """ 

281 try: 

282 with self.__jobs_lock: 

283 self.__jobs.remove(job) 

284 except KeyError: 

285 raise SchedulerError("An unscheduled Job can not be deleted!") from None 

286 

287 def delete_jobs( 

288 self, 

289 tags: Optional[set[str]] = None, 

290 any_tag: bool = False, 

291 ) -> int: 

292 r""" 

293 Delete a set of |Job|\ s from the |Scheduler| by tags. 

294 

295 If no tags or an empty set of tags are given defaults to the deletion 

296 of all |Job|\ s. 

297 

298 Parameters 

299 ---------- 

300 tags : Optional[set[str]] 

301 Set of tags to identify target |Job|\ s. 

302 any_tag : bool 

303 False: To delete a |Job| all tags have to match. 

304 True: To deleta a |Job| at least one tag has to match. 

305 """ 

306 with self.__jobs_lock: 

307 if tags is None or tags == {}: 

308 n_jobs = len(self.__jobs) 

309 self.__jobs = set() 

310 return n_jobs 

311 

312 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag) 

313 

314 self.__jobs = self.__jobs - to_delete 

315 return len(to_delete) 

316 

317 def get_jobs( 

318 self, 

319 tags: Optional[set[str]] = None, 

320 any_tag: bool = False, 

321 ) -> set[Job]: 

322 r""" 

323 Get a set of |Job|\ s from the |Scheduler| by tags. 

324 

325 If no tags or an empty set of tags are given defaults to returning 

326 all |Job|\ s. 

327 

328 Parameters 

329 ---------- 

330 tags : set[str] 

331 Tags to filter scheduled |Job|\ s. 

332 If no tags are given all |Job|\ s are returned. 

333 any_tag : bool 

334 False: To match a |Job| all tags have to match. 

335 True: To match a |Job| at least one tag has to match. 

336 

337 Returns 

338 ------- 

339 set[Job] 

340 Currently scheduled |Job|\ s. 

341 """ 

342 with self.__jobs_lock: 

343 if tags is None or tags == {}: 

344 return self.__jobs.copy() 

345 return select_jobs_by_tag(self.__jobs, tags, any_tag) 

346 

347 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs): 

348 r""" 

349 Schedule a cyclic `Job`. 

350 

351 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects 

352 to schedule a cyclic |Job|. 

353 

354 Parameters 

355 ---------- 

356 timing : TimingTypeCyclic 

357 Desired execution time. 

358 handle : Callable[..., None] 

359 Handle to a callback function. 

360 

361 Returns 

362 ------- 

363 Job 

364 Instance of a scheduled |Job|. 

365 

366 Other Parameters 

367 ---------------- 

368 **kwargs 

369 |Job| properties, optional 

370 

371 `kwargs` are used to specify |Job| properties. 

372 

373 Here is a list of available |Job| properties: 

374 

375 .. include:: ../_assets/kwargs.rst 

376 """ 

377 _warn_deprecated_delay(**kwargs) 

378 try: 

379 tg.check_type("timing", timing, TimingCyclic) 

380 except TypeError as err: 

381 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err 

382 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs) 

383 

384 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs): 

385 r""" 

386 Schedule a minutely `Job`. 

387 

388 Use a `datetime.time` object or a `list` of `datetime.time` objects 

389 to schedule a |Job| every minute. 

390 

391 Notes 

392 ----- 

393 If given a `datetime.time` object with a non zero hour or minute property, these 

394 information will be ignored. 

395 

396 Parameters 

397 ---------- 

398 timing : TimingDailyUnion 

399 Desired execution time(s). 

400 handle : Callable[..., None] 

401 Handle to a callback function. 

402 

403 Returns 

404 ------- 

405 Job 

406 Instance of a scheduled |Job|. 

407 

408 Other Parameters 

409 ---------------- 

410 **kwargs 

411 |Job| properties, optional 

412 

413 `kwargs` are used to specify |Job| properties. 

414 

415 Here is a list of available |Job| properties: 

416 

417 .. include:: ../_assets/kwargs.rst 

418 """ 

419 _warn_deprecated_delay(**kwargs) 

420 try: 

421 tg.check_type("timing", timing, TimingDailyUnion) 

422 except TypeError as err: 

423 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err 

424 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs) 

425 

426 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs): 

427 r""" 

428 Schedule an hourly `Job`. 

429 

430 Use a `datetime.time` object or a `list` of `datetime.time` objects 

431 to schedule a |Job| every hour. 

432 

433 Notes 

434 ----- 

435 If given a `datetime.time` object with a non zero hour property, this information 

436 will be ignored. 

437 

438 Parameters 

439 ---------- 

440 timing : TimingDailyUnion 

441 Desired execution time(s). 

442 handle : Callable[..., None] 

443 Handle to a callback function. 

444 

445 Returns 

446 ------- 

447 Job 

448 Instance of a scheduled |Job|. 

449 

450 Other Parameters 

451 ---------------- 

452 **kwargs 

453 |Job| properties, optional 

454 

455 `kwargs` are used to specify |Job| properties. 

456 

457 Here is a list of available |Job| properties: 

458 

459 .. include:: ../_assets/kwargs.rst 

460 """ 

461 _warn_deprecated_delay(**kwargs) 

462 try: 

463 tg.check_type("timing", timing, TimingDailyUnion) 

464 except TypeError as err: 

465 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err 

466 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs) 

467 

468 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs): 

469 r""" 

470 Schedule a daily `Job`. 

471 

472 Use a `datetime.time` object or a `list` of `datetime.time` objects 

473 to schedule a |Job| every day. 

474 

475 Parameters 

476 ---------- 

477 timing : TimingDailyUnion 

478 Desired execution time(s). 

479 handle : Callable[..., None] 

480 Handle to a callback function. 

481 

482 Returns 

483 ------- 

484 Job 

485 Instance of a scheduled |Job|. 

486 

487 Other Parameters 

488 ---------------- 

489 **kwargs 

490 |Job| properties, optional 

491 

492 `kwargs` are used to specify |Job| properties. 

493 

494 Here is a list of available |Job| properties: 

495 

496 .. include:: ../_assets/kwargs.rst 

497 """ 

498 _warn_deprecated_delay(**kwargs) 

499 try: 

500 tg.check_type("timing", timing, TimingDailyUnion) 

501 except TypeError as err: 

502 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err 

503 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs) 

504 

505 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs): 

506 r""" 

507 Schedule a weekly `Job`. 

508 

509 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly 

510 recurring |Job|. Combine multiple desired `tuples` in 

511 a `list`. If the planed execution time is `00:00` the `datetime.time` object 

512 can be ignored, just pass a `Weekday` without a `tuple`. 

513 

514 Parameters 

515 ---------- 

516 timing : TimingWeeklyUnion 

517 Desired execution time(s). 

518 handle : Callable[..., None] 

519 Handle to a callback function. 

520 

521 Returns 

522 ------- 

523 Job 

524 Instance of a scheduled |Job|. 

525 

526 Other Parameters 

527 ---------------- 

528 **kwargs 

529 |Job| properties, optional 

530 

531 `kwargs` are used to specify |Job| properties. 

532 

533 Here is a list of available |Job| properties: 

534 

535 .. include:: ../_assets/kwargs.rst 

536 """ 

537 _warn_deprecated_delay(**kwargs) 

538 try: 

539 tg.check_type("timing", timing, TimingWeeklyUnion) 

540 except TypeError as err: 

541 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err 

542 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs) 

543 

544 def once( # pylint: disable=arguments-differ 

545 self, 

546 timing: TimingOnceUnion, 

547 handle: Callable[..., None], 

548 *, 

549 args: tuple[Any] = None, 

550 kwargs: Optional[dict[str, Any]] = None, 

551 tags: Optional[list[str]] = None, 

552 alias: str = None, 

553 weight: float = 1, 

554 ): 

555 r""" 

556 Schedule a oneshot `Job`. 

557 

558 Parameters 

559 ---------- 

560 timing : TimingOnceUnion 

561 Desired execution time. 

562 handle : Callable[..., None] 

563 Handle to a callback function. 

564 args : tuple[Any] 

565 Positional argument payload for the function handle within a |Job|. 

566 kwargs : Optional[dict[str, Any]] 

567 Keyword arguments payload for the function handle within a |Job|. 

568 tags : Optional[set[str]] 

569 The tags of the |Job|. 

570 alias : Optional[str] 

571 Overwrites the function handle name in the string representation. 

572 weight : float 

573 Relative weight against other |Job|\ s. 

574 

575 Returns 

576 ------- 

577 Job 

578 Instance of a scheduled |Job|. 

579 """ 

580 try: 

581 tg.check_type("timing", timing, TimingOnceUnion) 

582 except TypeError as err: 

583 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err 

584 if isinstance(timing, dt.datetime): 

585 return self.__schedule( 

586 job_type=JobType.CYCLIC, 

587 timing=dt.timedelta(), 

588 handle=handle, 

589 args=args, 

590 kwargs=kwargs, 

591 max_attempts=1, 

592 tags=tags, 

593 alias=alias, 

594 weight=weight, 

595 delay=False, 

596 start=timing, 

597 ) 

598 return self.__schedule( 

599 job_type=JOB_TYPE_MAPPING[type(timing)], 

600 timing=timing, 

601 handle=handle, 

602 args=args, 

603 kwargs=kwargs, 

604 max_attempts=1, 

605 tags=tags, 

606 alias=alias, 

607 weight=weight, 

608 ) 

609 

610 @property 

611 def jobs(self) -> set[Job]: 

612 r""" 

613 Get the set of all `Job`\ s. 

614 

615 Returns 

616 ------- 

617 set[Job] 

618 Currently scheduled |Job|\ s. 

619 """ 

620 return self.__jobs.copy()