Coverage for scheduler/base/definition.py: 100%

13 statements  

« prev     ^ index     » next       coverage.py v7.0.4, created at 2023-12-10 20:46 +0000

1""" 

2Basic definitions for a abstract `BaseJob` and `BaseScheduler`. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import datetime as dt 

8from enum import Enum, auto 

9 

10from scheduler.base.timingtype import _TimingCyclicList, _TimingDailyList, _TimingWeeklyList 

11from scheduler.message import ( 

12 CYCLIC_TYPE_ERROR_MSG, 

13 DAILY_TYPE_ERROR_MSG, 

14 HOURLY_TYPE_ERROR_MSG, 

15 MINUTELY_TYPE_ERROR_MSG, 

16 WEEKLY_TYPE_ERROR_MSG, 

17) 

18from scheduler.trigger import Friday, Monday, Saturday, Sunday, Thursday, Tuesday, Wednesday 

19 

20 

21class JobType(Enum): 

22 """Indicate the `JobType` of a |BaseJob|.""" 

23 

24 CYCLIC = auto() 

25 MINUTELY = auto() 

26 HOURLY = auto() 

27 DAILY = auto() 

28 WEEKLY = auto() 

29 

30 

31JOB_TYPE_MAPPING = { 

32 dt.timedelta: JobType.CYCLIC, 

33 dt.time: JobType.DAILY, 

34 Monday: JobType.WEEKLY, 

35 Tuesday: JobType.WEEKLY, 

36 Wednesday: JobType.WEEKLY, 

37 Thursday: JobType.WEEKLY, 

38 Friday: JobType.WEEKLY, 

39 Saturday: JobType.WEEKLY, 

40 Sunday: JobType.WEEKLY, 

41} 

42 

43JOB_TIMING_TYPE_MAPPING = { 

44 JobType.CYCLIC: { 

45 "type": _TimingCyclicList, 

46 "err": CYCLIC_TYPE_ERROR_MSG, 

47 }, 

48 JobType.MINUTELY: { 

49 "type": _TimingDailyList, 

50 "err": MINUTELY_TYPE_ERROR_MSG, 

51 }, 

52 JobType.HOURLY: { 

53 "type": _TimingDailyList, 

54 "err": HOURLY_TYPE_ERROR_MSG, 

55 }, 

56 JobType.DAILY: { 

57 "type": _TimingDailyList, 

58 "err": DAILY_TYPE_ERROR_MSG, 

59 }, 

60 JobType.WEEKLY: { 

61 "type": _TimingWeeklyList, 

62 "err": WEEKLY_TYPE_ERROR_MSG, 

63 }, 

64}