"""Implementation of essential functions and components for a `BaseJob`.Author: Jendrik A. Potyka, Fabian A. Preiss"""importdatetimeasdtfromtypingimportOptional,Union,castfromscheduler.base.jobimportBaseJobTypefromscheduler.base.timingtypeimport(TimingCyclic,TimingDailyUnion,TimingJobUnion,TimingWeeklyUnion,)fromscheduler.errorimportSchedulerError
[docs]defstr_cutoff(string:str,max_length:int,cut_tail:bool=False)->str:""" Abbreviate a string to a given length. The resulting string will carry an indicator if it's abbreviated, like ``stri#``. Parameters ---------- string : str String which is to be cut. max_length : int Max resulting string length. cut_tail : bool ``False`` for string abbreviation from the front, else ``True``. Returns ------- str Resulting string """ifmax_length<1:raiseValueError("max_length < 1 not allowed")iflen(string)>max_length:pos=max_length-1returnstring[:pos]+"#"ifcut_tailelse"#"+string[-pos:]returnstring
[docs]defcheck_tzname(tzinfo:Optional[dt.tzinfo])->Optional[str]:"""Composed of the datetime.datetime.tzname and the datetime._check_tzname methode."""iftzinfoisNone:returnNonename:Optional[str]=tzinfo.tzname(None)ifnotisinstance(name,str):raiseSchedulerError(f"tzinfo.tzname() must return None or string, not {type(name)}")returnname
[docs]defcreate_job_instance(job_class:type[BaseJobType],timing:Union[TimingCyclic,TimingDailyUnion,TimingWeeklyUnion],**kwargs,)->BaseJobType:"""Create a job instance from the given input parameters."""ifnotisinstance(timing,list):timing_list=cast(TimingJobUnion,[timing])else:timing_list=cast(TimingJobUnion,timing)returnjob_class(timing=timing_list,**kwargs,)