from __future__ import annotations
from typing import NamedTuple, TYPE_CHECKING
from parallelism.core.executors.process_executor import ProcessExecutor
from parallelism.core.executors.thread_executor import ThreadExecutor
from parallelism.core.return_value import ReturnValue
if TYPE_CHECKING:
from typing import Any, Callable, Dict, Tuple, Type, Union
__all__ = ('ScheduledTask',)
[docs]class ScheduledTask(NamedTuple):
executor: Union[
ProcessExecutor,
ThreadExecutor,
Type[ProcessExecutor],
Type[ThreadExecutor],
]
name: str
target: Callable[..., Any]
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
dependencies: Tuple[ScheduledTask, ...]
priority: Union[int, float]
processes: int
threads: int
system_processor: Union[int, float]
system_memory: Union[int, float]
graphics_processor: Union[int, float]
graphics_memory: Union[int, float]
continual: bool
initialized: bool
def __hash__(self) -> int:
return hash(self.name)
def __eq__(self, other: ScheduledTask) -> bool:
return type(self) is type(other) and self.name == other.name
def __repr__(self) -> str:
parameters = (
'executor={!r}'.format(self.reformat_executor),
'name={!r}'.format(self.name),
'target={!r}'.format(self.reformat_target),
'args={!r}'.format(self.amount_of_args),
'kwargs={!r}'.format(self.amount_of_kwargs),
'dependencies={!r}'.format(self.amount_of_dependencies),
'priority={!r}'.format(self.priority),
'processes={!r}'.format(self.processes),
'threads={!r}'.format(self.threads),
'system_processor={!r}'.format(self.system_processor),
'system_memory={!r}'.format(self.system_memory),
'graphics_processor={!r}'.format(self.graphics_processor),
'graphics_memory={!r}'.format(self.graphics_memory),
'continual={!r}'.format(self.continual),
)
parameters = ', '.join(parameters)
return f'{self.__class__.__name__}({parameters})'
@property
def reformat_executor(self) -> str:
executor = self.executor
if isinstance(self.executor, (ProcessExecutor, ThreadExecutor)):
executor = executor.__class__
return executor.__base__.__name__
@property
def reformat_target(self) -> str:
target = self.target
module = getattr(target, '__module__')
qualified_name = getattr(target, '__qualname__', repr(target))
return f'{module}.{qualified_name}'
@property
def amount_of_args(self) -> int:
return len(self.args)
@property
def amount_of_kwargs(self) -> int:
return len(self.kwargs)
@property
def amount_of_dependencies(self) -> int:
return len(self.dependencies)
@property
def depends_on_dependencies(self) -> Tuple[ScheduledTask, ...]:
return tuple(task for task in self.dependencies)
@property
def depends_on_parameters(self) -> Tuple[ScheduledTask, ...]:
tasks = {}
for parameter in list(self.args) + list(self.kwargs.values()):
if isinstance(parameter, ReturnValue):
task = getattr(parameter, ':task')
if task not in tasks:
tasks[task] = None
return tuple(tasks.keys())
@property
def return_value(self) -> ReturnValue:
return ReturnValue(task=self)