Coverage for scheduler/base/definition.py: 100%

13 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2024-06-09 19:18 +0000

1""" 

2Basic definitions for a abstract `BaseJob` and `BaseScheduler`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import datetime as dt 

8from enum import Enum, auto 

9 

10from scheduler.base.timingtype import ( 

11 _TimingCyclicList, 

12 _TimingDailyList, 

13 _TimingWeeklyList, 

14) 

15from scheduler.message import ( 

16 CYCLIC_TYPE_ERROR_MSG, 

17 DAILY_TYPE_ERROR_MSG, 

18 HOURLY_TYPE_ERROR_MSG, 

19 MINUTELY_TYPE_ERROR_MSG, 

20 WEEKLY_TYPE_ERROR_MSG, 

21) 

22from scheduler.trigger import ( 

23 Friday, 

24 Monday, 

25 Saturday, 

26 Sunday, 

27 Thursday, 

28 Tuesday, 

29 Wednesday, 

30) 

31 

32 

33class JobType(Enum): 

34 """Indicate the `JobType` of a |BaseJob|.""" 

35 

36 CYCLIC = auto() 

37 MINUTELY = auto() 

38 HOURLY = auto() 

39 DAILY = auto() 

40 WEEKLY = auto() 

41 

42 

43JOB_TYPE_MAPPING = { 

44 dt.timedelta: JobType.CYCLIC, 

45 dt.time: JobType.DAILY, 

46 Monday: JobType.WEEKLY, 

47 Tuesday: JobType.WEEKLY, 

48 Wednesday: JobType.WEEKLY, 

49 Thursday: JobType.WEEKLY, 

50 Friday: JobType.WEEKLY, 

51 Saturday: JobType.WEEKLY, 

52 Sunday: JobType.WEEKLY, 

53} 

54 

55JOB_TIMING_TYPE_MAPPING = { 

56 JobType.CYCLIC: { 

57 "type": _TimingCyclicList, 

58 "err": CYCLIC_TYPE_ERROR_MSG, 

59 }, 

60 JobType.MINUTELY: { 

61 "type": _TimingDailyList, 

62 "err": MINUTELY_TYPE_ERROR_MSG, 

63 }, 

64 JobType.HOURLY: { 

65 "type": _TimingDailyList, 

66 "err": HOURLY_TYPE_ERROR_MSG, 

67 }, 

68 JobType.DAILY: { 

69 "type": _TimingDailyList, 

70 "err": DAILY_TYPE_ERROR_MSG, 

71 }, 

72 JobType.WEEKLY: { 

73 "type": _TimingWeeklyList, 

74 "err": WEEKLY_TYPE_ERROR_MSG, 

75 }, 

76}