Coverage for scheduler/threading/scheduler.py: 100%

164 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-12-10 21:31 +0000

1""" 

2Implementation of a `threading` compatible in-process scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import datetime as dt 

8import queue 

9import threading 

10from logging import Logger 

11from typing import Any, Callable, Optional 

12 

13import typeguard as tg 

14 

15from scheduler.base.definition import JOB_TYPE_MAPPING, JobType 

16from scheduler.base.job import BaseJob 

17from scheduler.base.scheduler import ( 

18 BaseScheduler, 

19 deprecated, 

20 select_jobs_by_tag, 

21) 

22from scheduler.base.scheduler_util import check_tzname, create_job_instance, str_cutoff 

23from scheduler.base.timingtype import ( 

24 TimingCyclic, 

25 TimingDailyUnion, 

26 TimingOnceUnion, 

27 TimingWeeklyUnion, 

28) 

29from scheduler.error import SchedulerError 

30from scheduler.message import ( 

31 CYCLIC_TYPE_ERROR_MSG, 

32 DAILY_TYPE_ERROR_MSG, 

33 HOURLY_TYPE_ERROR_MSG, 

34 MINUTELY_TYPE_ERROR_MSG, 

35 ONCE_TYPE_ERROR_MSG, 

36 TZ_ERROR_MSG, 

37 WEEKLY_TYPE_ERROR_MSG, 

38) 

39from scheduler.prioritization import linear_priority_function 

40from scheduler.threading.job import Job 

41 

42 

43def _exec_job_worker(que: queue.Queue[Job], logger: Logger): 

44 running = True 

45 while running: 

46 try: 

47 job = que.get(block=False) 

48 except queue.Empty: 

49 running = False 

50 else: 

51 job._exec(logger=logger) # pylint: disable=protected-access 

52 que.task_done() 

53 

54 

55class Scheduler(BaseScheduler): 

56 r""" 

57 Implementation of a scheduler for callback functions. 

58 

59 This implementation enables the planning of |Job|\ s depending on time 

60 cycles, fixed times, weekdays, dates, offsets, execution counts and weights. 

61 

62 Notes 

63 ----- 

64 Due to the support of `datetime` objects, `scheduler` is able to work 

65 with timezones. 

66 

67 Parameters 

68 ---------- 

69 tzinfo : datetime.tzinfo 

70 Set the timezone of the |Scheduler|. 

71 max_exec : int 

72 Limits the number of overdue |Job|\ s that can be executed 

73 by calling function `Scheduler.exec_jobs()`. 

74 priority_function : Callable[[float, Job, int, int], float] 

75 A function handle to compute the priority of a |Job| depending 

76 on the time it is overdue and its respective weight. Defaults to a linear 

77 priority function. 

78 jobs : set[Job] 

79 A collection of job instances. 

80 n_threads : int 

81 The number of worker threads. 0 for unlimited, default 1. 

82 logger : Optional[logging.Logger] 

83 A custom Logger instance. 

84 """ 

85 

86 def __init__( 

87 self, 

88 *, 

89 max_exec: int = 0, 

90 tzinfo: Optional[dt.tzinfo] = None, 

91 priority_function: Callable[ 

92 [float, BaseJob, int, int], 

93 float, 

94 ] = linear_priority_function, 

95 jobs: Optional[set[Job]] = None, 

96 n_threads: int = 1, 

97 logger: Optional[Logger] = None, 

98 ): 

99 super().__init__(logger=logger) 

100 self.__max_exec = max_exec 

101 self.__tzinfo = tzinfo 

102 self.__priority_function = priority_function 

103 self.__jobs_lock = threading.RLock() 

104 self.__jobs: set[Job] = jobs or set() 

105 for job in self.__jobs: 

106 if job._tzinfo != self.__tzinfo: 

107 raise SchedulerError(TZ_ERROR_MSG) 

108 

109 self.__n_threads = n_threads 

110 self.__tz_str = check_tzname(tzinfo=tzinfo) 

111 

112 def __repr__(self) -> str: 

113 with self.__jobs_lock: 

114 return "scheduler.Scheduler({0}, jobs={{{1}}})".format( 

115 ", ".join( 

116 ( 

117 repr(elem) 

118 for elem in ( 

119 self.__max_exec, 

120 self.__tzinfo, 

121 self.__priority_function, 

122 ) 

123 ) 

124 ), 

125 ", ".join([repr(job) for job in sorted(self.jobs)]), 

126 ) 

127 

128 def __str__(self) -> str: 

129 with self.__jobs_lock: 

130 # Scheduler meta heading 

131 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings()) 

132 

133 # Job table (we join two of the Job._repr() fields into one) 

134 # columns 

135 c_align = ("<", "<", "<", "<", ">", ">", ">") 

136 c_width = (8, 16, 19, 12, 9, 13, 6) 

137 c_name = ( 

138 "type", 

139 "function / alias", 

140 "due at", 

141 "tzinfo", 

142 "due in", 

143 "attempts", 

144 "weight", 

145 ) 

146 form = [ 

147 f"{ {idx}:{align}{width}} " 

148 for idx, (align, width) in enumerate(zip(c_align, c_width)) 

149 ] 

150 if self.__tz_str is None: 

151 form = form[:3] + form[4:] 

152 

153 fstring = " ".join(form) + "\n" 

154 job_table = fstring.format(*c_name) + fstring.format( 

155 *("-" * width for width in c_width) 

156 ) 

157 for job in sorted(self.jobs): 

158 row = job._str() 

159 entries = ( 

160 row[0], 

161 str_cutoff(row[1] + row[2], c_width[1], False), 

162 row[3], 

163 str_cutoff(row[4] or "", c_width[3], False), 

164 str_cutoff(row[5], c_width[4], True), 

165 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), 

166 str_cutoff(f"{job.weight}", c_width[6], True), 

167 ) 

168 job_table += fstring.format(*entries) 

169 

170 return scheduler_headings + job_table 

171 

172 def __headings(self) -> list[str]: 

173 with self.__jobs_lock: 

174 headings = [ 

175 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}", 

176 f"tzinfo={self.__tz_str}", 

177 f"priority_function={self.__priority_function.__name__}", 

178 f"#jobs={len(self.__jobs)}", 

179 ] 

180 return headings 

181 

182 def __schedule( 

183 self, 

184 **kwargs, 

185 ) -> Job: 

186 """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" 

187 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) 

188 if job.has_attempts_remaining: 

189 with self.__jobs_lock: 

190 self.__jobs.add(job) 

191 return job 

192 

193 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int: 

194 n_jobs = len(jobs) 

195 

196 que: queue.Queue[Job] = queue.Queue() 

197 for job in jobs: 

198 que.put(job) 

199 

200 workers = [] 

201 for _ in range(self.__n_threads or n_jobs): 

202 worker = threading.Thread( 

203 target=_exec_job_worker, args=(que, self._BaseScheduler__logger) 

204 ) 

205 worker.daemon = True 

206 worker.start() 

207 workers.append(worker) 

208 

209 que.join() 

210 for worker in workers: 

211 worker.join() 

212 

213 for job in jobs: 

214 job._calc_next_exec(ref_dt) # pylint: disable=protected-access 

215 if not job.has_attempts_remaining: 

216 self.delete_job(job) 

217 

218 return n_jobs 

219 

220 def exec_jobs(self, force_exec_all: bool = False) -> int: 

221 r""" 

222 Execute scheduled `Job`\ s. 

223 

224 By default executes the |Job|\ s that are overdue. 

225 

226 |Job|\ s are executed in order of their priority 

227 :ref:`examples.weights`. If the |Scheduler| instance 

228 has a limit on the job execution counts per call of 

229 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument, 

230 |Job|\ s of lower priority might not get executed when 

231 competing |Job|\ s are overdue. 

232 

233 Parameters 

234 ---------- 

235 force_exec_all : bool 

236 Ignore the both - the status of the |Job| timers 

237 as well as the execution limit of the |Scheduler| 

238 

239 Returns 

240 ------- 

241 int 

242 Number of executed |Job|\ s. 

243 """ 

244 ref_dt = dt.datetime.now(tz=self.__tzinfo) 

245 

246 if force_exec_all: 

247 return self.__exec_jobs(list(self.__jobs), ref_dt) 

248 # collect the current priority for all jobs 

249 

250 job_priority: dict[Job, float] = {} 

251 n_jobs = len(self.__jobs) 

252 with self.__jobs_lock: 

253 for job in self.__jobs: 

254 delta_seconds = job.timedelta(ref_dt).total_seconds() 

255 job_priority[job] = self.__priority_function( 

256 -delta_seconds, 

257 job, 

258 self.__max_exec, 

259 n_jobs, 

260 ) 

261 # sort the jobs by priority 

262 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore 

263 # filter jobs by max_exec and priority greater zero 

264 filtered_jobs = [ 

265 job 

266 for idx, job in enumerate(sorted_jobs) 

267 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0 

268 ] 

269 return self.__exec_jobs(filtered_jobs, ref_dt) 

270 

271 def delete_job(self, job: Job) -> None: 

272 """ 

273 Delete a `Job` from the `Scheduler`. 

274 

275 Parameters 

276 ---------- 

277 job : Job 

278 |Job| instance to delete. 

279 

280 Raises 

281 ------ 

282 SchedulerError 

283 Raises if the |Job| of the argument is not scheduled. 

284 """ 

285 try: 

286 with self.__jobs_lock: 

287 self.__jobs.remove(job) 

288 except KeyError: 

289 raise SchedulerError("An unscheduled Job can not be deleted!") from None 

290 

291 def delete_jobs( 

292 self, 

293 tags: Optional[set[str]] = None, 

294 any_tag: bool = False, 

295 ) -> int: 

296 r""" 

297 Delete a set of |Job|\ s from the |Scheduler| by tags. 

298 

299 If no tags or an empty set of tags are given defaults to the deletion 

300 of all |Job|\ s. 

301 

302 Parameters 

303 ---------- 

304 tags : Optional[set[str]] 

305 Set of tags to identify target |Job|\ s. 

306 any_tag : bool 

307 False: To delete a |Job| all tags have to match. 

308 True: To deleta a |Job| at least one tag has to match. 

309 """ 

310 with self.__jobs_lock: 

311 if tags is None or tags == {}: 

312 n_jobs = len(self.__jobs) 

313 self.__jobs = set() 

314 return n_jobs 

315 

316 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag) 

317 

318 self.__jobs = self.__jobs - to_delete 

319 return len(to_delete) 

320 

321 def get_jobs( 

322 self, 

323 tags: Optional[set[str]] = None, 

324 any_tag: bool = False, 

325 ) -> set[Job]: 

326 r""" 

327 Get a set of |Job|\ s from the |Scheduler| by tags. 

328 

329 If no tags or an empty set of tags are given defaults to returning 

330 all |Job|\ s. 

331 

332 Parameters 

333 ---------- 

334 tags : set[str] 

335 Tags to filter scheduled |Job|\ s. 

336 If no tags are given all |Job|\ s are returned. 

337 any_tag : bool 

338 False: To match a |Job| all tags have to match. 

339 True: To match a |Job| at least one tag has to match. 

340 

341 Returns 

342 ------- 

343 set[Job] 

344 Currently scheduled |Job|\ s. 

345 """ 

346 with self.__jobs_lock: 

347 if tags is None or tags == {}: 

348 return self.__jobs.copy() 

349 return select_jobs_by_tag(self.__jobs, tags, any_tag) 

350 

351 @deprecated(["delay"]) 

352 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs): 

353 r""" 

354 Schedule a cyclic `Job`. 

355 

356 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects 

357 to schedule a cyclic |Job|. 

358 

359 Parameters 

360 ---------- 

361 timing : TimingTypeCyclic 

362 Desired execution time. 

363 handle : Callable[..., None] 

364 Handle to a callback function. 

365 

366 Returns 

367 ------- 

368 Job 

369 Instance of a scheduled |Job|. 

370 

371 Other Parameters 

372 ---------------- 

373 **kwargs 

374 |Job| properties, optional 

375 

376 `kwargs` are used to specify |Job| properties. 

377 

378 Here is a list of available |Job| properties: 

379 

380 .. include:: ../_assets/kwargs.rst 

381 """ 

382 try: 

383 tg.check_type(timing, TimingCyclic) 

384 except tg.TypeCheckError as err: 

385 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err 

386 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs) 

387 

388 @deprecated(["delay"]) 

389 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs): 

390 r""" 

391 Schedule a minutely `Job`. 

392 

393 Use a `datetime.time` object or a `list` of `datetime.time` objects 

394 to schedule a |Job| every minute. 

395 

396 Notes 

397 ----- 

398 If given a `datetime.time` object with a non zero hour or minute property, these 

399 information will be ignored. 

400 

401 Parameters 

402 ---------- 

403 timing : TimingDailyUnion 

404 Desired execution time(s). 

405 handle : Callable[..., None] 

406 Handle to a callback function. 

407 

408 Returns 

409 ------- 

410 Job 

411 Instance of a scheduled |Job|. 

412 

413 Other Parameters 

414 ---------------- 

415 **kwargs 

416 |Job| properties, optional 

417 

418 `kwargs` are used to specify |Job| properties. 

419 

420 Here is a list of available |Job| properties: 

421 

422 .. include:: ../_assets/kwargs.rst 

423 """ 

424 try: 

425 tg.check_type(timing, TimingDailyUnion) 

426 except tg.TypeCheckError as err: 

427 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err 

428 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs) 

429 

430 @deprecated(["delay"]) 

431 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs): 

432 r""" 

433 Schedule an hourly `Job`. 

434 

435 Use a `datetime.time` object or a `list` of `datetime.time` objects 

436 to schedule a |Job| every hour. 

437 

438 Notes 

439 ----- 

440 If given a `datetime.time` object with a non zero hour property, this information 

441 will be ignored. 

442 

443 Parameters 

444 ---------- 

445 timing : TimingDailyUnion 

446 Desired execution time(s). 

447 handle : Callable[..., None] 

448 Handle to a callback function. 

449 

450 Returns 

451 ------- 

452 Job 

453 Instance of a scheduled |Job|. 

454 

455 Other Parameters 

456 ---------------- 

457 **kwargs 

458 |Job| properties, optional 

459 

460 `kwargs` are used to specify |Job| properties. 

461 

462 Here is a list of available |Job| properties: 

463 

464 .. include:: ../_assets/kwargs.rst 

465 """ 

466 try: 

467 tg.check_type(timing, TimingDailyUnion) 

468 except tg.TypeCheckError as err: 

469 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err 

470 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs) 

471 

472 @deprecated(["delay"]) 

473 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs): 

474 r""" 

475 Schedule a daily `Job`. 

476 

477 Use a `datetime.time` object or a `list` of `datetime.time` objects 

478 to schedule a |Job| every day. 

479 

480 Parameters 

481 ---------- 

482 timing : TimingDailyUnion 

483 Desired execution time(s). 

484 handle : Callable[..., None] 

485 Handle to a callback function. 

486 

487 Returns 

488 ------- 

489 Job 

490 Instance of a scheduled |Job|. 

491 

492 Other Parameters 

493 ---------------- 

494 **kwargs 

495 |Job| properties, optional 

496 

497 `kwargs` are used to specify |Job| properties. 

498 

499 Here is a list of available |Job| properties: 

500 

501 .. include:: ../_assets/kwargs.rst 

502 """ 

503 try: 

504 tg.check_type(timing, TimingDailyUnion) 

505 except tg.TypeCheckError as err: 

506 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err 

507 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs) 

508 

509 @deprecated(["delay"]) 

510 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs): 

511 r""" 

512 Schedule a weekly `Job`. 

513 

514 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly 

515 recurring |Job|. Combine multiple desired `tuples` in 

516 a `list`. If the planed execution time is `00:00` the `datetime.time` object 

517 can be ignored, just pass a `Weekday` without a `tuple`. 

518 

519 Parameters 

520 ---------- 

521 timing : TimingWeeklyUnion 

522 Desired execution time(s). 

523 handle : Callable[..., None] 

524 Handle to a callback function. 

525 

526 Returns 

527 ------- 

528 Job 

529 Instance of a scheduled |Job|. 

530 

531 Other Parameters 

532 ---------------- 

533 **kwargs 

534 |Job| properties, optional 

535 

536 `kwargs` are used to specify |Job| properties. 

537 

538 Here is a list of available |Job| properties: 

539 

540 .. include:: ../_assets/kwargs.rst 

541 """ 

542 try: 

543 tg.check_type(timing, TimingWeeklyUnion) 

544 except tg.TypeCheckError as err: 

545 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err 

546 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs) 

547 

548 def once( # pylint: disable=arguments-differ 

549 self, 

550 timing: TimingOnceUnion, 

551 handle: Callable[..., None], 

552 *, 

553 args: tuple[Any] = None, 

554 kwargs: Optional[dict[str, Any]] = None, 

555 tags: Optional[list[str]] = None, 

556 alias: str = None, 

557 weight: float = 1, 

558 ): 

559 r""" 

560 Schedule a oneshot `Job`. 

561 

562 Parameters 

563 ---------- 

564 timing : TimingOnceUnion 

565 Desired execution time. 

566 handle : Callable[..., None] 

567 Handle to a callback function. 

568 args : tuple[Any] 

569 Positional argument payload for the function handle within a |Job|. 

570 kwargs : Optional[dict[str, Any]] 

571 Keyword arguments payload for the function handle within a |Job|. 

572 tags : Optional[set[str]] 

573 The tags of the |Job|. 

574 alias : Optional[str] 

575 Overwrites the function handle name in the string representation. 

576 weight : float 

577 Relative weight against other |Job|\ s. 

578 

579 Returns 

580 ------- 

581 Job 

582 Instance of a scheduled |Job|. 

583 """ 

584 try: 

585 tg.check_type(timing, TimingOnceUnion) 

586 except tg.TypeCheckError as err: 

587 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err 

588 if isinstance(timing, dt.datetime): 

589 return self.__schedule( 

590 job_type=JobType.CYCLIC, 

591 timing=dt.timedelta(), 

592 handle=handle, 

593 args=args, 

594 kwargs=kwargs, 

595 max_attempts=1, 

596 tags=tags, 

597 alias=alias, 

598 weight=weight, 

599 delay=False, 

600 start=timing, 

601 ) 

602 return self.__schedule( 

603 job_type=JOB_TYPE_MAPPING[type(timing)], 

604 timing=timing, 

605 handle=handle, 

606 args=args, 

607 kwargs=kwargs, 

608 max_attempts=1, 

609 tags=tags, 

610 alias=alias, 

611 weight=weight, 

612 ) 

613 

614 @property 

615 def jobs(self) -> set[Job]: 

616 r""" 

617 Get the set of all `Job`\ s. 

618 

619 Returns 

620 ------- 

621 set[Job] 

622 Currently scheduled |Job|\ s. 

623 """ 

624 return self.__jobs.copy()