Task Scheduler
- parallelism.task_scheduler(tasks, *, processes=None, threads=None, system_processor=100, system_memory=100, graphics_processor=100, graphics_memory=100)[source]
The task_scheduler function orchestrates the simultaneous execution of multiple tasks, optimizing parallelism and enhancing computational efficiency. It allows developers to efficiently manage and distribute tasks among processes and threads for improved performance.
- Parameters:
tasks (tuple of ScheduledTask) –
A tuple containing instances of ScheduledTask representing thetasks to be executed concurrently.
processes (int, default os.cpu_count()) –
Specifies the total number of parallel processes available forexecuting all tasks collectively.
threads (int, default os.cpu_count()) –
Specifies the total number of parallel threads available forexecuting all tasks collectively.
system_processor (int or float, default 100) –
Maximum allowed system processor usage (percentage).system_memory (int or float, default 100) –
Maximum allowed system memory usage (percentage).graphics_processor (int or float, default 100) –
Maximum allowed graphics processor usage (percentage).graphics_memory (int or float, default 100) –
Maximum allowed graphics memory usage (percentage).
- Returns:
An instance of the SchedulerResult class that encapsulates the outcomes and statistics of task execution. This result provides insights into execution times, elapsed times, exceptions, and return values.
- Return type:
- class parallelism.core.scheduler_result.SchedulerResult[source]
The SchedulerResult class is a specialized container that inherits from the collections.namedtuple class, designed to hold and manage information related to task scheduling and execution results. It encapsulates data about execution times, elapsed times, exceptions, and return values for various tasks.
from parallelism import scheduled_task, task_scheduler def func1(): return 12345 def func2(): raise Exception('Custom exception message') st1 = scheduled_task(executor=..., name='st1', target=func1, ..., continual=True) st2 = scheduled_task(executor=..., name='st2', target=func2, ..., continual=True) ts = task_scheduler(tasks=(st1, st2), ...)
- property execution_time
A dictionary where each key represents a task name, and the corresponding value is the timestamp (datetime.datetime) when the task was started.
>>> ts.execution_time { 'st1': datetime.datetime(%Y, %m, %d, %H, %M, %S, %f), 'st2': datetime.datetime(%Y, %m, %d, %H, %M, %S, %f), }
- property elapsed_time
A dictionary where each key represents a task name, and the corresponding value is the time in seconds (float) it took for the task to complete.
>>> ts.elapsed_time { 'st1': <time in seconds (float)>, 'st2': <time in seconds (float)>, }
- property raise_exception
A dictionary where each key represents a task name, and the corresponding value is a RaiseException object that contains information about any exceptions raised during task execution. The RaiseException object has two properties: exception (an Exception object) and traceback (a str containing the exception’s traceback).
>>> ts.raise_exception { 'st2': RaiseException( exception=Exception('Custom exception message'), traceback=<content of traceback (str)>, ), }
- property return_value
A dictionary where each key represents a task name, and the corresponding value is the return value (typing.Any) from the task’s execution after completion. Note: continuous=True is required.
>>> ts.return_value { 'st1': 12345, }
- class parallelism.core.raise_exception.RaiseException[source]
The RaiseException class is used within the SchedulerResult class to store information about exceptions raised during task execution.
from parallelism import scheduled_task, task_scheduler def func(a, b): return a / b st = scheduled_task(executor=..., name='st', target=func, args=(5, 0), ...) ts = task_scheduler(tasks=(st,), ...)
- property exception
The exception object that was raised during task execution.
>>> ts.raise_exception.get('st').exception ZeroDivisionError('division by zero')
- property traceback
A string representation of the traceback associated with the raised exception.
>>> ts.raise_exception.get('st').traceback Traceback (most recent call last): File ".../site-packages/parallelism/core/handlers/function_handler.py", line ..., in __call__ self.proxy['return_value'] = self.target(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File ".../main.py", line ..., in func return a / b ^^^^^ ZeroDivisionError: division by zero
Examples
# Built-in modules
from multiprocessing import Process
from threading import Thread
# Third-party libraries
from parallelism import scheduled_task, task_scheduler
Code flow
This code showcases parallel task execution using the parallelism module. It creates and schedules process and thread tasks, demonstrating their concurrent execution and interaction, while the task_scheduler manages resource allocation for optimal performance.
>>> def func1(a, b, c):
... print(f'func1(a={a}, b={b}, c={c})\n', end='')
... return a + b + c
...
>>> def func2(x):
... print(f'func2(x={x})\n', end='')
... return x
...
>>> p1 = scheduled_task(Process, 'p1', func1, args=(1, 2, 3))
>>> p2 = scheduled_task(Process, 'p2', func1, args=(4, 5, 6))
>>> p3 = scheduled_task(Process, 'p3', func1, args=(7, 8, 9))
>>> t1 = scheduled_task(Thread, 't1', func2, kwargs={'x': p1.return_value})
>>> t2 = scheduled_task(Thread, 't2', func2, kwargs={'x': p2.return_value})
>>> t3 = scheduled_task(Thread, 't3', func2, kwargs={'x': p3.return_value})
>>> p4 = scheduled_task(Process, 'p4', func1, args=(t1.return_value, t2.return_value, t3.return_value))
>>> t4 = scheduled_task(Thread, 't4', func2, kwargs={'x': p4.return_value})
>>> s1 = task_scheduler(tasks=(p1, p2, p3, t1, t2, t3, p4, t4), processes=2, threads=2)
func1(a=1, b=2, c=3) # Task 'p1' has been started
func1(a=4, b=5, c=6) # Task 'p2' has been started
TIMESTAMP [INFO] [parallelism:PID:TID] - 'p1' ran approximately ... nanoseconds
func1(a=7, b=8, c=9) # Task 'p3' has been started
func2(x=6) # Task 't1' has been started
TIMESTAMP [INFO] [parallelism:PID:TID] - 'p2' ran approximately ... nanoseconds
func2(x=15) # Task 't2' has been started
TIMESTAMP [INFO] [parallelism:PID:TID] - 'p3' ran approximately ... nanoseconds
func2(x=24) # Task 't3' has been started
TIMESTAMP [INFO] [parallelism:PID:TID] - 't1' ran approximately ... nanoseconds
TIMESTAMP [INFO] [parallelism:PID:TID] - 't2' ran approximately ... nanoseconds
TIMESTAMP [INFO] [parallelism:PID:TID] - 't3' ran approximately ... nanoseconds
func1(a=6, b=15, c=24) # Task 'p4' has been started
TIMESTAMP [INFO] [parallelism:PID:TID] - 'p4' ran approximately ... nanoseconds
func2(x=45) # Task 't4' has been started
TIMESTAMP [INFO] [parallelism:PID:TID] - 't4' ran approximately ... nanoseconds
Results
This illustrative code demonstrates how to effectively use the task_scheduler to manage concurrent task execution and retrieve outcomes. It showcases the orchestration of process and thread tasks, while the task scheduler handles resource allocation. The subsequent retrieval of execution details and results offers insights into the parallel task execution process.
>>> def func1(a, b):
... print(f'func1(a={a}, b={b})\n', end='')
... return a / b
...
>>> def func2(x):
... print(f'func2(x={x})\n', end='')
... return x
...
>>> p1 = scheduled_task(Process, 'p1', func1, (5, 2))
>>> p2 = scheduled_task(Process, 'p2', func1, (6, 0))
>>> p3 = scheduled_task(Process, 'p3', func1, (6, 0), processes=2)
>>> p4 = scheduled_task(Process, 'p4', func1, (6, 0), system_processor=12.6, system_memory=34.7)
>>> p5 = scheduled_task(Process, 'p5', func1, (7, 2))
>>> t1 = scheduled_task(Thread, 't1', func2, (p1.return_value,), continual=True)
>>> t2 = scheduled_task(Thread, 't2', func2, (p2.return_value,), continual=True)
>>> t3 = scheduled_task(Thread, 't3', func2, (p3.return_value,), continual=True)
>>> t4 = scheduled_task(Thread, 't4', func2, (p4.return_value,), continual=True)
>>> t5 = scheduled_task(Thread, 't5', func2, (p5.return_value,), continual=True)
>>> s1 = task_scheduler(tasks=(p1, p2, p3, p4, p5, t1, t2, t3, t4, t5), processes=2, threads=2, system_processor=10, system_memory=25)
TIMESTAMP [WARNING] [parallelism:PID:TID] - 'p3' is being canceled, due to lack of 1 process
TIMESTAMP [WARNING] [parallelism:PID:TID] - 'p4' is being canceled, due to lack of 2.6% CPU and 9.7% RAM
func1(a=5, b=2) # Task 'p1' has been started
func1(a=6, b=0) # Task 'p2' has been started
TIMESTAMP [WARNING] [parallelism:PID:TID] - 't3' is being canceled, due to task 'p3'
TIMESTAMP [WARNING] [parallelism:PID:TID] - 't4' is being canceled, due to task 'p4'
TIMESTAMP [INFO] [parallelism:PID:TID] - 'p1' ran approximately ... nanoseconds
func1(a=7, b=2) # Task 'p5' has been started
func2(x=2.5) # Task 't1' has been started
TIMESTAMP [ERROR] [parallelism:PID:TID] - 'p2' ran approximately ... microseconds - ZeroDivisionError('division by zero')
TIMESTAMP [WARNING] [parallelism:PID:TID] - 't2' is being canceled, due to task 'p2'
TIMESTAMP [INFO] [parallelism:PID:TID] - 'p5' ran approximately ... nanoseconds
func2(x=3.5) # Task 't5' has been started
TIMESTAMP [INFO] [parallelism:PID:TID] - 't1' ran approximately ... nanoseconds
TIMESTAMP [INFO] [parallelism:PID:TID] - 't5' ran approximately ... nanoseconds
>>> s1.execution_time
{
'p3': datetime.datetime(...),
'p4': datetime.datetime(...),
'p1': datetime.datetime(...),
'p2': datetime.datetime(...),
't3': datetime.datetime(...),
't4': datetime.datetime(...),
'p5': datetime.datetime(...),
't1': datetime.datetime(...),
't2': datetime.datetime(...),
't5': datetime.datetime(...),
}
>>> s1.elapsed_time
{
'p1': float(...),
'p2': float(...),
'p5': float(...),
't1': float(...),
't5': float(...),
}
>>> s1.raise_exception
{
'p3': WorkerError("'p3' has been canceled", (1, 0)),
'p4': ResourceError("'p4' has been canceled", (2.6, 9.7, 0, 0)),
'p2': ZeroDivisionError('division by zero'),
't3': DependencyError("'t3' has been canceled", ('p3',)),
't4': DependencyError("'t4' has been canceled", ('p4',)),
't2': DependencyError("'t2' has been canceled", ('p2',)),
}
>>> s1.return_value
{
't1': 2.5,
't5': 3.5,
}