Coverage for scheduler/threading/scheduler.py: 100%

168 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2024-06-09 19:18 +0000

1""" 

2Implementation of a `threading` compatible in-process scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import datetime as dt 

8import queue 

9import threading 

10from collections.abc import Iterable 

11from logging import Logger 

12from typing import Any, Callable, Optional 

13 

14import typeguard as tg 

15 

16from scheduler.base.definition import JOB_TYPE_MAPPING, JobType 

17from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag 

18from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff 

19from scheduler.base.timingtype import ( 

20 TimingCyclic, 

21 TimingDailyUnion, 

22 TimingOnceUnion, 

23 TimingWeeklyUnion, 

24) 

25from scheduler.error import SchedulerError 

26from scheduler.message import ( 

27 CYCLIC_TYPE_ERROR_MSG, 

28 DAILY_TYPE_ERROR_MSG, 

29 HOURLY_TYPE_ERROR_MSG, 

30 MINUTELY_TYPE_ERROR_MSG, 

31 ONCE_TYPE_ERROR_MSG, 

32 TZ_ERROR_MSG, 

33 WEEKLY_TYPE_ERROR_MSG, 

34) 

35from scheduler.prioritization import linear_priority_function 

36from scheduler.threading.job import Job 

37 

38 

39def _exec_job_worker(que: queue.Queue[Job], logger: Logger) -> None: 

40 running = True 

41 while running: 

42 try: 

43 job = que.get(block=False) 

44 except queue.Empty: 

45 running = False 

46 else: 

47 job._exec(logger=logger) # pylint: disable=protected-access 

48 que.task_done() 

49 

50 

51class Scheduler(BaseScheduler[Job, Callable[..., None]]): 

52 r""" 

53 Implementation of a scheduler for callback functions. 

54 

55 This implementation enables the planning of |Job|\ s depending on time 

56 cycles, fixed times, weekdays, dates, offsets, execution counts and weights. 

57 

58 Notes 

59 ----- 

60 Due to the support of `datetime` objects, `scheduler` is able to work 

61 with timezones. 

62 

63 Parameters 

64 ---------- 

65 tzinfo : datetime.tzinfo 

66 Set the timezone of the |Scheduler|. 

67 max_exec : int 

68 Limits the number of overdue |Job|\ s that can be executed 

69 by calling function `Scheduler.exec_jobs()`. 

70 priority_function : Callable[[float, Job, int, int], float] 

71 A function handle to compute the priority of a |Job| depending 

72 on the time it is overdue and its respective weight. Defaults to a linear 

73 priority function. 

74 jobs : set[Job] 

75 A collection of job instances. 

76 n_threads : int 

77 The number of worker threads. 0 for unlimited, default 1. 

78 logger : Optional[logging.Logger] 

79 A custom Logger instance. 

80 """ 

81 

82 def __init__( 

83 self, 

84 *, 

85 max_exec: int = 0, 

86 tzinfo: Optional[dt.tzinfo] = None, 

87 priority_function: Callable[ 

88 [float, Job, int, int], 

89 float, 

90 ] = linear_priority_function, 

91 jobs: Optional[Iterable[Job]] = None, 

92 n_threads: int = 1, 

93 logger: Optional[Logger] = None, 

94 ): 

95 super().__init__(logger=logger) 

96 self.__max_exec = max_exec 

97 self.__tzinfo = tzinfo 

98 self.__priority_function = priority_function 

99 self.__jobs_lock = threading.RLock() 

100 if not jobs: 

101 self.__jobs = set() 

102 elif isinstance(jobs, set): 

103 self.__jobs = jobs 

104 else: 

105 self.__jobs = set(jobs) 

106 

107 for job in self.__jobs: 

108 if job._tzinfo != self.__tzinfo: 

109 raise SchedulerError(TZ_ERROR_MSG) 

110 

111 self.__n_threads = n_threads 

112 self.__tz_str = check_tzname(tzinfo=tzinfo) 

113 

114 def __repr__(self) -> str: 

115 with self.__jobs_lock: 

116 return "scheduler.Scheduler({0}, jobs={{{1}}})".format( 

117 ", ".join( 

118 ( 

119 repr(elem) 

120 for elem in ( 

121 self.__max_exec, 

122 self.__tzinfo, 

123 self.__priority_function, 

124 ) 

125 ) 

126 ), 

127 ", ".join([repr(job) for job in sorted(self.jobs)]), 

128 ) 

129 

130 def __str__(self) -> str: 

131 with self.__jobs_lock: 

132 # Scheduler meta heading 

133 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings()) 

134 

135 # Job table (we join two of the Job._repr() fields into one) 

136 # columns 

137 c_align = ("<", "<", "<", "<", ">", ">", ">") 

138 c_width = (8, 16, 19, 12, 9, 13, 6) 

139 c_name = ( 

140 "type", 

141 "function / alias", 

142 "due at", 

143 "tzinfo", 

144 "due in", 

145 "attempts", 

146 "weight", 

147 ) 

148 form = [ 

149 f"{ {idx}:{align}{width}} " 

150 for idx, (align, width) in enumerate(zip(c_align, c_width)) 

151 ] 

152 if self.__tz_str is None: 

153 form = form[:3] + form[4:] 

154 

155 fstring = " ".join(form) + "\n" 

156 job_table = fstring.format(*c_name) + fstring.format( 

157 *("-" * width for width in c_width) 

158 ) 

159 for job in sorted(self.jobs): 

160 row = job._str() 

161 entries = ( 

162 row[0], 

163 str_cutoff(row[1] + row[2], c_width[1], False), 

164 row[3], 

165 str_cutoff(row[4] or "", c_width[3], False), 

166 str_cutoff(row[5], c_width[4], True), 

167 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), 

168 str_cutoff(f"{job.weight}", c_width[6], True), 

169 ) 

170 job_table += fstring.format(*entries) 

171 

172 return scheduler_headings + job_table 

173 

174 def __headings(self) -> list[str]: 

175 with self.__jobs_lock: 

176 headings = [ 

177 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}", 

178 f"tzinfo={self.__tz_str}", 

179 f"priority_function={self.__priority_function.__name__}", 

180 f"#jobs={len(self.__jobs)}", 

181 ] 

182 return headings 

183 

184 def __schedule( 

185 self, 

186 **kwargs, 

187 ) -> Job: 

188 """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" 

189 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) 

190 if job.has_attempts_remaining: 

191 with self.__jobs_lock: 

192 self.__jobs.add(job) 

193 return job 

194 

195 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int: 

196 n_jobs = len(jobs) 

197 

198 que: queue.Queue[Job] = queue.Queue() 

199 for job in jobs: 

200 que.put(job) 

201 

202 workers = [] 

203 for _ in range(self.__n_threads or n_jobs): 

204 worker = threading.Thread(target=_exec_job_worker, args=(que, self._logger)) 

205 worker.daemon = True 

206 worker.start() 

207 workers.append(worker) 

208 

209 que.join() 

210 for worker in workers: 

211 worker.join() 

212 

213 for job in jobs: 

214 job._calc_next_exec(ref_dt) # pylint: disable=protected-access 

215 if not job.has_attempts_remaining: 

216 self.delete_job(job) 

217 

218 return n_jobs 

219 

220 def exec_jobs(self, force_exec_all: bool = False) -> int: 

221 r""" 

222 Execute scheduled `Job`\ s. 

223 

224 By default executes the |Job|\ s that are overdue. 

225 

226 |Job|\ s are executed in order of their priority 

227 :ref:`examples.weights`. If the |Scheduler| instance 

228 has a limit on the job execution counts per call of 

229 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument, 

230 |Job|\ s of lower priority might not get executed when 

231 competing |Job|\ s are overdue. 

232 

233 Parameters 

234 ---------- 

235 force_exec_all : bool 

236 Ignore the both - the status of the |Job| timers 

237 as well as the execution limit of the |Scheduler| 

238 

239 Returns 

240 ------- 

241 int 

242 Number of executed |Job|\ s. 

243 """ 

244 ref_dt = dt.datetime.now(tz=self.__tzinfo) 

245 

246 if force_exec_all: 

247 return self.__exec_jobs(list(self.__jobs), ref_dt) 

248 # collect the current priority for all jobs 

249 

250 job_priority: dict[Job, float] = {} 

251 n_jobs = len(self.__jobs) 

252 with self.__jobs_lock: 

253 for job in self.__jobs: 

254 delta_seconds = job.timedelta(ref_dt).total_seconds() 

255 job_priority[job] = self.__priority_function( 

256 -delta_seconds, 

257 job, 

258 self.__max_exec, 

259 n_jobs, 

260 ) 

261 # sort the jobs by priority 

262 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore 

263 # filter jobs by max_exec and priority greater zero 

264 filtered_jobs = [ 

265 job 

266 for idx, job in enumerate(sorted_jobs) 

267 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0 

268 ] 

269 return self.__exec_jobs(filtered_jobs, ref_dt) 

270 

271 def delete_job(self, job: Job) -> None: 

272 """ 

273 Delete a `Job` from the `Scheduler`. 

274 

275 Parameters 

276 ---------- 

277 job : Job 

278 |Job| instance to delete. 

279 

280 Raises 

281 ------ 

282 SchedulerError 

283 Raises if the |Job| of the argument is not scheduled. 

284 """ 

285 try: 

286 with self.__jobs_lock: 

287 self.__jobs.remove(job) 

288 except KeyError: 

289 raise SchedulerError("An unscheduled Job can not be deleted!") from None 

290 

291 def delete_jobs( 

292 self, 

293 tags: Optional[set[str]] = None, 

294 any_tag: bool = False, 

295 ) -> int: 

296 r""" 

297 Delete a set of |Job|\ s from the |Scheduler| by tags. 

298 

299 If no tags or an empty set of tags are given defaults to the deletion 

300 of all |Job|\ s. 

301 

302 Parameters 

303 ---------- 

304 tags : Optional[set[str]] 

305 Set of tags to identify target |Job|\ s. 

306 any_tag : bool 

307 False: To delete a |Job| all tags have to match. 

308 True: To deleta a |Job| at least one tag has to match. 

309 """ 

310 with self.__jobs_lock: 

311 if tags is None or tags == set(): 

312 n_jobs = len(self.__jobs) 

313 self.__jobs = set() 

314 return n_jobs 

315 

316 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag) 

317 

318 self.__jobs = self.__jobs - to_delete 

319 return len(to_delete) 

320 

321 def get_jobs( 

322 self, 

323 tags: Optional[set[str]] = None, 

324 any_tag: bool = False, 

325 ) -> set[Job]: 

326 r""" 

327 Get a set of |Job|\ s from the |Scheduler| by tags. 

328 

329 If no tags or an empty set of tags are given defaults to returning 

330 all |Job|\ s. 

331 

332 Parameters 

333 ---------- 

334 tags : set[str] 

335 Tags to filter scheduled |Job|\ s. 

336 If no tags are given all |Job|\ s are returned. 

337 any_tag : bool 

338 False: To match a |Job| all tags have to match. 

339 True: To match a |Job| at least one tag has to match. 

340 

341 Returns 

342 ------- 

343 set[Job] 

344 Currently scheduled |Job|\ s. 

345 """ 

346 with self.__jobs_lock: 

347 if tags is None or tags == set(): 

348 return self.__jobs.copy() 

349 return select_jobs_by_tag(self.__jobs, tags, any_tag) 

350 

351 @deprecated(["delay"]) 

352 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job: 

353 r""" 

354 Schedule a cyclic `Job`. 

355 

356 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects 

357 to schedule a cyclic |Job|. 

358 

359 Parameters 

360 ---------- 

361 timing : TimingTypeCyclic 

362 Desired execution time. 

363 handle : Callable[..., None] 

364 Handle to a callback function. 

365 

366 Returns 

367 ------- 

368 Job 

369 Instance of a scheduled |Job|. 

370 

371 Other Parameters 

372 ---------------- 

373 **kwargs 

374 |Job| properties, optional 

375 

376 `kwargs` are used to specify |Job| properties. 

377 

378 Here is a list of available |Job| properties: 

379 

380 .. include:: ../_assets/kwargs.rst 

381 """ 

382 try: 

383 tg.check_type(timing, TimingCyclic) 

384 except tg.TypeCheckError as err: 

385 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err 

386 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs) 

387 

388 @deprecated(["delay"]) 

389 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

390 r""" 

391 Schedule a minutely `Job`. 

392 

393 Use a `datetime.time` object or a `list` of `datetime.time` objects 

394 to schedule a |Job| every minute. 

395 

396 Notes 

397 ----- 

398 If given a `datetime.time` object with a non zero hour or minute property, these 

399 information will be ignored. 

400 

401 Parameters 

402 ---------- 

403 timing : TimingDailyUnion 

404 Desired execution time(s). 

405 handle : Callable[..., None] 

406 Handle to a callback function. 

407 

408 Returns 

409 ------- 

410 Job 

411 Instance of a scheduled |Job|. 

412 

413 Other Parameters 

414 ---------------- 

415 **kwargs 

416 |Job| properties, optional 

417 

418 `kwargs` are used to specify |Job| properties. 

419 

420 Here is a list of available |Job| properties: 

421 

422 .. include:: ../_assets/kwargs.rst 

423 """ 

424 try: 

425 tg.check_type(timing, TimingDailyUnion) 

426 except tg.TypeCheckError as err: 

427 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err 

428 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs) 

429 

430 @deprecated(["delay"]) 

431 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

432 r""" 

433 Schedule an hourly `Job`. 

434 

435 Use a `datetime.time` object or a `list` of `datetime.time` objects 

436 to schedule a |Job| every hour. 

437 

438 Notes 

439 ----- 

440 If given a `datetime.time` object with a non zero hour property, this information 

441 will be ignored. 

442 

443 Parameters 

444 ---------- 

445 timing : TimingDailyUnion 

446 Desired execution time(s). 

447 handle : Callable[..., None] 

448 Handle to a callback function. 

449 

450 Returns 

451 ------- 

452 Job 

453 Instance of a scheduled |Job|. 

454 

455 Other Parameters 

456 ---------------- 

457 **kwargs 

458 |Job| properties, optional 

459 

460 `kwargs` are used to specify |Job| properties. 

461 

462 Here is a list of available |Job| properties: 

463 

464 .. include:: ../_assets/kwargs.rst 

465 """ 

466 try: 

467 tg.check_type(timing, TimingDailyUnion) 

468 except tg.TypeCheckError as err: 

469 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err 

470 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs) 

471 

472 @deprecated(["delay"]) 

473 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

474 r""" 

475 Schedule a daily `Job`. 

476 

477 Use a `datetime.time` object or a `list` of `datetime.time` objects 

478 to schedule a |Job| every day. 

479 

480 Parameters 

481 ---------- 

482 timing : TimingDailyUnion 

483 Desired execution time(s). 

484 handle : Callable[..., None] 

485 Handle to a callback function. 

486 

487 Returns 

488 ------- 

489 Job 

490 Instance of a scheduled |Job|. 

491 

492 Other Parameters 

493 ---------------- 

494 **kwargs 

495 |Job| properties, optional 

496 

497 `kwargs` are used to specify |Job| properties. 

498 

499 Here is a list of available |Job| properties: 

500 

501 .. include:: ../_assets/kwargs.rst 

502 """ 

503 try: 

504 tg.check_type(timing, TimingDailyUnion) 

505 except tg.TypeCheckError as err: 

506 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err 

507 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs) 

508 

509 @deprecated(["delay"]) 

510 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job: 

511 r""" 

512 Schedule a weekly `Job`. 

513 

514 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly 

515 recurring |Job|. Combine multiple desired `tuples` in 

516 a `list`. If the planed execution time is `00:00` the `datetime.time` object 

517 can be ignored, just pass a `Weekday` without a `tuple`. 

518 

519 Parameters 

520 ---------- 

521 timing : TimingWeeklyUnion 

522 Desired execution time(s). 

523 handle : Callable[..., None] 

524 Handle to a callback function. 

525 

526 Returns 

527 ------- 

528 Job 

529 Instance of a scheduled |Job|. 

530 

531 Other Parameters 

532 ---------------- 

533 **kwargs 

534 |Job| properties, optional 

535 

536 `kwargs` are used to specify |Job| properties. 

537 

538 Here is a list of available |Job| properties: 

539 

540 .. include:: ../_assets/kwargs.rst 

541 """ 

542 try: 

543 tg.check_type(timing, TimingWeeklyUnion) 

544 except tg.TypeCheckError as err: 

545 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err 

546 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs) 

547 

548 def once( # pylint: disable=arguments-differ 

549 self, 

550 timing: TimingOnceUnion, 

551 handle: Callable[..., None], 

552 *, 

553 args: Optional[tuple[Any]] = None, 

554 kwargs: Optional[dict[str, Any]] = None, 

555 tags: Optional[Iterable[str]] = None, 

556 alias: Optional[str] = None, 

557 weight: float = 1, 

558 ) -> Job: 

559 r""" 

560 Schedule a oneshot `Job`. 

561 

562 Parameters 

563 ---------- 

564 timing : TimingOnceUnion 

565 Desired execution time. 

566 handle : Callable[..., None] 

567 Handle to a callback function. 

568 args : tuple[Any] 

569 Positional argument payload for the function handle within a |Job|. 

570 kwargs : Optional[dict[str, Any]] 

571 Keyword arguments payload for the function handle within a |Job|. 

572 tags : Optional[Iterable[str]] 

573 The tags of the |Job|. 

574 alias : Optional[str] 

575 Overwrites the function handle name in the string representation. 

576 weight : float 

577 Relative weight against other |Job|\ s. 

578 

579 Returns 

580 ------- 

581 Job 

582 Instance of a scheduled |Job|. 

583 """ 

584 try: 

585 tg.check_type(timing, TimingOnceUnion) 

586 except tg.TypeCheckError as err: 

587 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err 

588 if isinstance(timing, dt.datetime): 

589 return self.__schedule( 

590 job_type=JobType.CYCLIC, 

591 timing=dt.timedelta(), 

592 handle=handle, 

593 args=args, 

594 kwargs=kwargs, 

595 max_attempts=1, 

596 tags=set(tags) if tags else set(), 

597 alias=alias, 

598 weight=weight, 

599 delay=False, 

600 start=timing, 

601 ) 

602 return self.__schedule( 

603 job_type=JOB_TYPE_MAPPING[type(timing)], 

604 timing=timing, 

605 handle=handle, 

606 args=args, 

607 kwargs=kwargs, 

608 max_attempts=1, 

609 tags=tags, 

610 alias=alias, 

611 weight=weight, 

612 ) 

613 

614 @property 

615 def jobs(self) -> set[Job]: 

616 r""" 

617 Get the set of all `Job`\ s. 

618 

619 Returns 

620 ------- 

621 set[Job] 

622 Currently scheduled |Job|\ s. 

623 """ 

624 return self.__jobs.copy()