Coverage for scheduler/base/definition.py: 100%
13 statements
« prev ^ index » next coverage.py v7.0.4, created at 2023-03-19 22:07 +0000
« prev ^ index » next coverage.py v7.0.4, created at 2023-03-19 22:07 +0000
1"""
2Basic definitions for a abstract `BaseJob` and `BaseScheduler`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8from enum import Enum, auto
10from scheduler.base.timingtype import _TimingCyclicList, _TimingDailyList, _TimingWeeklyList
11from scheduler.message import (
12 CYCLIC_TYPE_ERROR_MSG,
13 DAILY_TYPE_ERROR_MSG,
14 HOURLY_TYPE_ERROR_MSG,
15 MINUTELY_TYPE_ERROR_MSG,
16 WEEKLY_TYPE_ERROR_MSG,
17)
18from scheduler.trigger import Friday, Monday, Saturday, Sunday, Thursday, Tuesday, Wednesday
21class JobType(Enum):
22 """Indicate the `JobType` of a |BaseJob|."""
24 CYCLIC = auto()
25 MINUTELY = auto()
26 HOURLY = auto()
27 DAILY = auto()
28 WEEKLY = auto()
31JOB_TYPE_MAPPING = {
32 dt.timedelta: JobType.CYCLIC,
33 dt.time: JobType.DAILY,
34 Monday: JobType.WEEKLY,
35 Tuesday: JobType.WEEKLY,
36 Wednesday: JobType.WEEKLY,
37 Thursday: JobType.WEEKLY,
38 Friday: JobType.WEEKLY,
39 Saturday: JobType.WEEKLY,
40 Sunday: JobType.WEEKLY,
41}
43JOB_TIMING_TYPE_MAPPING = {
44 JobType.CYCLIC: {
45 "type": _TimingCyclicList,
46 "err": CYCLIC_TYPE_ERROR_MSG,
47 },
48 JobType.MINUTELY: {
49 "type": _TimingDailyList,
50 "err": MINUTELY_TYPE_ERROR_MSG,
51 },
52 JobType.HOURLY: {
53 "type": _TimingDailyList,
54 "err": HOURLY_TYPE_ERROR_MSG,
55 },
56 JobType.DAILY: {
57 "type": _TimingDailyList,
58 "err": DAILY_TYPE_ERROR_MSG,
59 },
60 JobType.WEEKLY: {
61 "type": _TimingWeeklyList,
62 "err": WEEKLY_TYPE_ERROR_MSG,
63 },
64}