Custom Prioritization#

We previously discussed the default Job prioritization behaviour in the Job Prioritization example.

By default the priority in Scheduler is computed using the linear_priority_function(), where \mathtt{time\_delta} is defined as the difference between the current time (\mathtt{now}) and the planned execution time (\mathtt{next\_exec}) with \mathtt{time\_delta}=\mathtt{now}-\mathtt{next\_exec}. The default linear_priority_function() implements the prioritization using the following formula:

\left(\mathtt{time\_delta},\mathtt{weight}\right)\ {\mapsto}\begin{cases} 0 & :\ \mathtt{time\_delta}<0\\ {\left(\mathtt{time\_delta}+1\right)}\cdot\mathtt{weight} & :\ \mathtt{time\_delta}\geq0 \end{cases}

Note

By default Jobs with a priority value smaller or equal to zero are not executed by the exec_jobs() method of the Scheduler.

Some applications require customized prioritization models (e.g. with quadratic or exponential behaviour). Each Scheduler instance supports a custom implementation of the prioritization function.

Note

The custom prioritization functions implemented in this guide are directly available from prioritization.

Constant Weight Prioritization#

In this example we are going to implement a priority function without the time linear behaviour of the default linear_priority_function() with:

\left(\mathtt{time\_delta},\mathtt{weight}\right)\ {\mapsto}\begin{cases} 0 & :\ \mathtt{time\_delta}<0\\ \mathtt{weight} & :\ \mathtt{time\_delta}\geq0 \end{cases}

The Scheduler expects a prioritization function of the signature Callable[[float, Job, int, int], float]. The custom prioritization function is available in prioritization as constant_weight_prioritization().

import scheduler


def constant_weight_prioritization(
    time_delta: float, job: scheduler.threading.job.Job, max_exec: int, job_count: int
) -> float:
    """Interprete the Job's weight as its priority"""
    _ = max_exec
    _ = job_count
    if time_delta < 0:
        return 0
    return job.weight

Instantiate the Scheduler with the custom priority function.

>>> import datetime as dt

>>> from scheduler import Scheduler
>>> import scheduler.prioritization as sp

>>> now = dt.datetime.now()
>>> schedule = Scheduler(max_exec=3, priority_function=sp.constant_weight_prioritization)

Schedule some Jobs at different points in the past with distinct weights:

>>> for delayed_by, weight in ((2, 1), (3, 2), (1, 3), (4, 4)):
...     exec_time = now - dt.timedelta(seconds=delayed_by)
...     job = schedule.once(
...         exec_time,
...         print,
...         kwargs={"end": f"{weight = }; {delayed_by = }s\n"},
...         weight=weight,
...     )
...

Note how the columns due in and weight in the following table reflect the definitions of our Jobs.

>>> print(schedule)  
max_exec=3, tzinfo=None, priority_function=constant_weight_prioritization, #jobs=4

type     function / alias due at                 due in      attempts weight
-------- ---------------- ------------------- --------- ------------- ------
ONCE     print(?)         2021-07-02 20:43:47  -0:00:04           0/1      4
ONCE     print(?)         2021-07-02 20:43:48  -0:00:03           0/1      2
ONCE     print(?)         2021-07-02 20:43:49  -0:00:02           0/1      1
ONCE     print(?)         2021-07-02 20:43:50  -0:00:01           0/1      3

In contrast to the second the example in Default behaviour the time delay is not taken into consideration in the execution order of the Jobs.

>>> exec_count = schedule.exec_jobs()
weight = 4; delayed_by = 4s
weight = 3; delayed_by = 1s
weight = 2; delayed_by = 3s

Due to the Scheduler’s limit on the execution count argument max_exec, the Job with the lowest weight is still residing in the Scheduler.

>>> print(schedule)  
max_exec=3, tzinfo=None, priority_function=constant_weight_prioritization, #jobs=1

type     function / alias due at                 due in      attempts weight
-------- ---------------- ------------------- --------- ------------- ------
ONCE     print(?)         2021-07-02 21:07:17  -0:00:02           0/1      1

Uniform Random Prioritization#

This example demonstrates, how the priority function can be used to implement behaviours resembling more of a load balancer than a classical scheduler.

The following function implementation interprets the weight of a Job as a probability for it’s execution using the uniformly distributed random number generator random.random(). With random.random() generating values in the interval [0,1), the Job’s weights of 0, 0.3 and 1 would be interpreted as a probabilities of 0%, 30% and 100%.

Warning

In contrast to a regular scheduler the following example completely disregards the time element.

The Scheduler expects a prioritization function of the signature Callable[[float, Job, int, int], float]. The custom prioritization function is available in util as random_priority_function().

import random

import scheduler


def random_priority_function(
    time: float, job: scheduler.threading.job.Job, max_exec: int, job_count: int
) -> float:
    """
    Generate random priority values from weigths.

    .. warning:: Not suitable for security relevant purposes.

    The priority generator will return 1 if the random number
    is lower then the |Job|'s weight, otherwise it will return 0.
    """
    _ = time
    _ = max_exec
    _ = job_count
    if random.random() < job.weight:
        return 1
    return 0

Now instantiate a Scheduler with the custom random_priority_function. Then create some generic Jobs with probabilities from 0% to 100%:

>>> import datetime as dt

>>> from scheduler import Scheduler
>>> import scheduler.prioritization as sp

>>> schedule = Scheduler(priority_function=sp.random_priority_function)

>>> jobs = {}
>>> for percentage in range(0, 101, 10):
...     jobs[percentage] = schedule.cyclic(
...         dt.timedelta(),
...         lambda: None,
...         weight=0.01 * percentage,
...     )
...

We can verify that the expected number of Jobs with the given probabilities are scheduled:

>>> print(schedule)  
max_exec=inf, tzinfo=None, priority_function=random_priority_function, #jobs=11

type     function / alias due at                 due in      attempts weight
-------- ---------------- ------------------- --------- ------------- ------
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.0
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.1
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.2
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.3
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.4
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.5
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.6
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf 0.700#
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.8
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    0.9
CYCLIC   <lambda>()       2021-07-02 23:43:38  -0:00:00         0/inf    1.0

For the next step we run a small statistical experiment and perform 10k executions with the Scheduler.

>>> total_counts = 10_000
>>> for _ in range(total_counts):
...     exec_count = schedule.exec_jobs()
...

Utilizing the attempts() property we can observe the number of executions. For direct comparision with the target probabilities we normalize the results by the total counts. If everything is behaving correctly we would expect the results to approach the target probabilities with for increasing total counts.

>>> for percentage, job in jobs.items():  
...     print("{:>3} {:>5.1f}".format(percentage, 100 * job.attempts / total_counts))
...
  0   0.0
 10  10.2
 20  19.9
 30  30.1
 40  39.4
 50  49.7
 60  59.3
 70  70.3
 80  79.8
 90  90.5
100 100.0

The results in this experiment conform to what one would expect using an underlying uniformly distributed random variable.