Coverage for scheduler/base/definition.py: 100%
13 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-21 13:55 +0000
1"""
2Basic definitions for a abstract `BaseJob` and `BaseScheduler`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8from enum import Enum, auto
10from scheduler.base.timingtype import (
11 _TimingCyclicList,
12 _TimingDailyList,
13 _TimingWeeklyList,
14)
15from scheduler.message import (
16 CYCLIC_TYPE_ERROR_MSG,
17 DAILY_TYPE_ERROR_MSG,
18 HOURLY_TYPE_ERROR_MSG,
19 MINUTELY_TYPE_ERROR_MSG,
20 WEEKLY_TYPE_ERROR_MSG,
21)
22from scheduler.trigger import (
23 Friday,
24 Monday,
25 Saturday,
26 Sunday,
27 Thursday,
28 Tuesday,
29 Wednesday,
30)
33class JobType(Enum):
34 """Indicate the `JobType` of a |BaseJob|."""
36 CYCLIC = auto()
37 MINUTELY = auto()
38 HOURLY = auto()
39 DAILY = auto()
40 WEEKLY = auto()
43JOB_TYPE_MAPPING = {
44 dt.timedelta: JobType.CYCLIC,
45 dt.time: JobType.DAILY,
46 Monday: JobType.WEEKLY,
47 Tuesday: JobType.WEEKLY,
48 Wednesday: JobType.WEEKLY,
49 Thursday: JobType.WEEKLY,
50 Friday: JobType.WEEKLY,
51 Saturday: JobType.WEEKLY,
52 Sunday: JobType.WEEKLY,
53}
55JOB_TIMING_TYPE_MAPPING = {
56 JobType.CYCLIC: {
57 "type": _TimingCyclicList,
58 "err": CYCLIC_TYPE_ERROR_MSG,
59 },
60 JobType.MINUTELY: {
61 "type": _TimingDailyList,
62 "err": MINUTELY_TYPE_ERROR_MSG,
63 },
64 JobType.HOURLY: {
65 "type": _TimingDailyList,
66 "err": HOURLY_TYPE_ERROR_MSG,
67 },
68 JobType.DAILY: {
69 "type": _TimingDailyList,
70 "err": DAILY_TYPE_ERROR_MSG,
71 },
72 JobType.WEEKLY: {
73 "type": _TimingWeeklyList,
74 "err": WEEKLY_TYPE_ERROR_MSG,
75 },
76}