Scheduled Task

parallelism.scheduled_task(executor, name, target, args=None, kwargs=None, *, dependencies=None, priority=None, processes=0, threads=0, system_processor=0, system_memory=0, graphics_processor=0, graphics_memory=0, continual=False)[source]

The scheduled_task function empowers developers to efficiently manage and execute tasks in a parallel computing environment. It is a fundamental tool for creating and scheduling tasks within the task scheduler, allowing for granular control over task execution, dependencies, priorities, and result storage.

Parameters:
  • executor (type of multiprocessing.Process or threading.Thread) –

    Specifies the execution unit for the task, either as a

    multiprocessing.Process or a threading.Thread.

  • name (str) –

    A unique identifier representing the task, aiding in differentiation

    and tracking.

  • target (callable) –

    The function to be invoked by the task scheduler upon execution.

  • args (tuple, optional) –

    Positional arguments related to the target function.

  • kwargs (dict, optional) –

    Keyword arguments related to the target function.

  • dependencies (tuple of ScheduledTask, optional) –

    Tasks that the current task depends on, ensuring proper execution

    order.

  • priority (int or float, optional) –

    Priority level of task execution, influencing the order of execution

    among tasks. Lower values indicate higher priority.

  • processes (int, default 0) –

    The number of processes to be allocated by the target function.

  • threads (int, default 0) –

    The number of threads to be allocated by the target function.

  • system_processor (int or float, default 0) –

    Estimate of the percentage of system processor usage.

  • system_memory (int or float, default 0) –

    Estimate of the percentage of system memory usage.

  • graphics_processor (int or float, default 0) –

    Estimate of the percentage of graphics processor usage.

  • graphics_memory (int or float, default 0) –

    Estimate of the percentage of graphics memory usage.

  • continual (bool, default False) –

    A flag indicating whether the task scheduler should store the result

    of the task after completion. If True, the result is stored for later access.

Returns:

A scheduled task instance with configured properties, ready for execution within the task scheduler.

Return type:

ScheduledTask

class parallelism.core.scheduled_task.ScheduledTask[source]

The ScheduledTask class, a subclass of collections.namedtuple, represents a scheduled task along with its associated return value. This class provides a mechanism to access the return value of the task in a deferred manner using the ReturnValue object.

property return_value

A reference to the ReturnValue object associated with this scheduled task. The ReturnValue object provides deferred access to the actual return value of the task.

class parallelism.core.return_value.ReturnValue[source]

The ReturnValue class represents a deferred reference to the return value of a scheduled task. It implements the __call__, __getattribute__, and __getitem__ methods to provide flexible access to the actual return value.

from parallelism import scheduled_task

def func1(reverse=False):
   return 54321 if reverse else 12345

def func2():
   return func1

def func3():
   return {'a': 123, 'b': 45}

def func4():
   return [1, 2, 3, 4, 5]

st1 = scheduled_task(executor=..., name='st1', target=func1, ...)
st2 = scheduled_task(executor=..., name='st2', target=func2, ...)
st3 = scheduled_task(executor=..., name='st3', target=func3, ...)
st4 = scheduled_task(executor=..., name='st4', target=func4, ...)

Accessing a Return Value:

>>> st1.return_value
ReturnValue(task=ScheduledTask(executor=..., name='st1', target='__main__.func1', ...))
# At runtime: 12345
__call__(self, *args, **kwargs)[source]

Invokes the ReturnValue object as a callable. This method is used to retrieve the actual return value of the scheduled task.

Invoking the Return Value as a Callable:

>>> st2.return_value()
ReturnValue(task=ScheduledTask(executor=..., name='st2', target='__main__.func2', ...)))
# At runtime: 12345

Invoking the Return Value with Parameters:

>>> st2.return_value(reverse=True)
ReturnValue(task=ScheduledTask(executor=..., name='st2', target='__main__.func2', ...)))
# At runtime: 54321
__getattribute__(self, name)[source]

Retrieves an attribute or method of the actual return value. This method allows access to properties and methods of the return value without directly referencing it.

Accessing a Specific Key in the Return Dictionary:

>>> st3.return_value.get('b')
ReturnValue(task=ScheduledTask(executor=..., name='st3', target='__main__.func3', ...))
# At runtime: 45
__getitem__(self, key)[source]

Retrieves an item from the actual return value using the provided key. This method allows accessing elements of the return value, such as lists, dictionaries, etc.

Accessing an Element by Index:

>>> st4.return_value[2]
ReturnValue(task=ScheduledTask(executor=..., name='st4', target='__main__.func4', ...)))
# At runtime: 3

Accessing a Slice of Elements:

>>> st4.return_value[3:]
ReturnValue(task=ScheduledTask(executor=..., name='st4', target='__main__.func4', ...)))
# At runtime: 45

Examples

# Built-in modules
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Process
from threading import Thread

# Third-party libraries
from parallelism import scheduled_task

Executor

Running Different Concurrent Environments:

In this example, scheduled_task is used to create a scheduled task named p that runs as a multiprocessing.Process, and another task named t that runs as a threading.Thread. Both tasks execute the same function, func.

>>> def func():
...     pass
...
>>> p = scheduled_task(Process, 'p', func)
>>> t = scheduled_task(Thread, 't', func)

Args & Kwargs

Passing Arguments and Dependencies:

Here, scheduled_task demonstrates passing arguments and dependencies. p calculates the sum of 1 + 2 + 3, and t uses the return value of p as an argument.

>>> def func1(a, b, c):
...     return a + b + c
...
>>> def func2(x):
...     print(x)
...
>>> p = scheduled_task(Process, 'p', func1, args=(1, 2), kwargs={'c': 3})
>>> t = scheduled_task(Thread, 't', func2, kwargs={'x': p.return_value})

Dependencies

Managing Task Dependencies:

This example showcases task dependencies. t depends on the completion of p before starting its execution.

>>> def func1(path, text):
...     with open(path, 'w') as file:
...         file.write(text)
...
>>> def func2(path):
...     with open(path, 'r') as file:
...         return file.read()
...
>>> p = scheduled_task(Process, 'p', func1, kwargs={'path': 'example.txt', 'text': 'Hello, World!'})
>>> t = scheduled_task(Thread, 't', func2, kwargs={'path': 'example.txt'}, dependencies=(p,))

Priority

Setting Task Priority:

In this scenario, tasks are assigned priorities. t is given higher priority than p, affecting their order of execution.

>>> def func():
...     pass
...
>>> p = scheduled_task(Process, 'p', func, priority=2)
>>> t = scheduled_task(Thread, 't', func, priority=1)

Processes & Threads

Balancing Processes and Threads:

Here, scheduled_task is used to distribute workload across processes and threads based on specified weights.

>>> def func():
...     with ProcessPoolExecutor(max_workers=2) as executor:
...         pass
...     with ThreadPoolExecutor(max_workers=4) as executor:
...         pass
...
>>> p = scheduled_task(Process, 'p', func, processes=2, threads=4)

Processor & Memory

Resource Usage Estimation:

In the following use cases, the scheduled_task function incorporates the estimation of system resources to enhance task execution efficiency. The parameters system_processor, system_memory, graphics_processor, and graphics_memory allow you to specify the estimated resource usage for each task, ensuring optimized resource allocation within the system.

By estimating 60.5% system processor and 75% system memory usage, these tasks are unable to run concurrently due to resource limitations.

>>> def func():
...     pass
...
>>> p = scheduled_task(Process, 'p', func, system_processor=60.5, system_memory=75)
>>> t = scheduled_task(Thread, 't', func, system_processor=60.5, system_memory=75)

Continual

Storing Task Return Values:

This example illustrates how to manage stored return values. p’s return value is stored due to the continual=True parameter, while t’s return value is not stored.

>>> def func():
...     return 123
...
>>> p = scheduled_task(Process, 'p', func, continual=True)
>>> t = scheduled_task(Thread, 't', func, continual=False)