from collections import Iterable, Mapping
from collections.__init__ import OrderedDict
import numpy as np
from copy import copy
import pyopencl as cl
import tatsu
from textwrap import dedent, indent
from mot.configuration import CLRuntimeInfo
from mot.lib.kernel_data import KernelData, Scalar, Array, Zeros
from mot.lib.utils import is_scalar, get_float_type_def, split_cl_function, split_in_batches
__author__ = 'Robbert Harms'
__date__ = '2017-08-31'
__maintainer__ = 'Robbert Harms'
__email__ = 'robbert.harms@maastrichtuniversity.nl'
__licence__ = 'LGPL v3'
[docs]class CLCodeObject:
"""Interface for basic code objects."""
[docs] def get_cl_code(self):
"""Get the CL code for this code object and all its dependencies, with include guards.
Returns:
str: The CL code for inclusion in a kernel.
"""
raise NotImplementedError()
[docs]class CLFunction(CLCodeObject):
"""Interface for a basic CL function."""
[docs] def get_return_type(self):
"""Get the type (in CL naming) of the returned value from this function.
Returns:
str: The return type of this CL function. (Examples: double, int, double4, ...)
"""
raise NotImplementedError()
[docs] def get_cl_function_name(self):
"""Return the calling name of the implemented CL function
Returns:
str: The name of this CL function
"""
raise NotImplementedError()
[docs] def get_parameters(self):
"""Return the list of parameters from this CL function.
Returns:
list of :class:`mot.lib.cl_function.CLFunctionParameter`: list of the parameters in this
model in the same order as in the CL function"""
raise NotImplementedError()
[docs] def get_signature(self):
"""Get the CL signature of this function.
Returns:
str: the CL code for the signature of this CL function.
"""
raise NotImplementedError()
[docs] def get_cl_code(self):
"""Get the function code for this function and all its dependencies, with include guards.
Returns:
str: The CL code for inclusion in a kernel.
"""
raise NotImplementedError()
[docs] def get_cl_body(self):
"""Get the CL code for the body of this function.
Returns:
str: the CL code of this function body
"""
raise NotImplementedError()
[docs] def evaluate(self, inputs, nmr_instances, use_local_reduction=False, cl_runtime_info=None):
"""Evaluate this function for each set of given parameters.
Given a set of input parameters, this model will be evaluated for every parameter set.
This function will convert possible dots in the parameter names to underscores for use in the CL kernel.
Args:
inputs (Iterable[Union(ndarray, mot.lib.utils.KernelData)]
or Mapping[str: Union(ndarray, mot.lib.utils.KernelData)]): for each CL function parameter
the input data. Each of these input datasets must either be a scalar or be of equal length in the
first dimension. The elements can either be raw ndarrays or KernelData objects.
If an ndarray is given we will load it read/write by default. You can provide either an iterable
with one value per parameter, or a mapping with for every parameter a corresponding value.
nmr_instances (int): the number of parallel processes to run.
use_local_reduction (boolean): set this to True if you want to use local memory reduction in
evaluating this function. If this is set to True we will multiply the global size
(given by the nmr_instances) by the work group sizes.
cl_runtime_info (mot.configuration.CLRuntimeInfo): the runtime information for execution
Returns:
ndarray: the return values of the function, which can be None if this function has a void return type.
"""
raise NotImplementedError()
[docs] def get_dependencies(self):
"""Get the list of dependencies this function depends on.
Returns:
list[CLFunction]: the list of dependencies for this function.
"""
raise NotImplementedError()
[docs]class SimpleCLCodeObject(CLCodeObject):
def __init__(self, cl_code):
"""Simple code object for including type definitions in the kernel.
Args:
cl_code (str): CL code to be included in the kernel
"""
self._cl_code = cl_code
[docs] def get_cl_code(self):
return self._cl_code
[docs]class SimpleCLFunction(CLFunction):
def __init__(self, return_type, cl_function_name, parameter_list, cl_body, dependencies=None):
"""A simple implementation of a CL function.
Args:
return_type (str): the CL return type of the function
cl_function_name (string): The name of the CL function
parameter_list (list or tuple): This either contains instances of
:class:`CLFunctionParameter` or strings from which to form the function parameters.
cl_body (str): the body of the CL code for this function.
dependencies (Iterable[CLCodeObject]): The CL code objects this function depends on,
these will be prepended to the CL code generated by this function.
"""
super().__init__()
self._return_type = return_type
self._function_name = cl_function_name
self._parameter_list = self._resolve_parameters(parameter_list)
self._cl_body = cl_body
self._dependencies = dependencies or []
[docs] @classmethod
def from_string(cls, cl_function, dependencies=()):
"""Parse the given CL function into a SimpleCLFunction object.
Args:
cl_function (str): the function we wish to turn into an object
dependencies (list or tuple of CLLibrary): The list of CL libraries this function depends on
Returns:
SimpleCLFunction: the CL data type for this parameter declaration
"""
return_type, function_name, parameter_list, body = split_cl_function(cl_function)
return SimpleCLFunction(return_type, function_name, parameter_list, body, dependencies=dependencies)
[docs] def get_cl_function_name(self):
return self._function_name
[docs] def get_return_type(self):
return self._return_type
[docs] def get_parameters(self):
return self._parameter_list
[docs] def get_signature(self):
return '{return_type} {cl_function_name}({parameters});'.format(
return_type=self.get_return_type(),
cl_function_name=self.get_cl_function_name(),
parameters=', '.join(self._get_parameter_signatures()))
[docs] def get_cl_code(self):
cl_code = dedent('''
{return_type} {cl_function_name}({parameters}){{
{body}
}}
'''.format(return_type=self.get_return_type(),
cl_function_name=self.get_cl_function_name(),
parameters=', '.join(self._get_parameter_signatures()),
body=indent(dedent(self._cl_body), ' '*4*4)))
return dedent('''
{dependencies}
#ifndef {inclusion_guard_name}
#define {inclusion_guard_name}
{code}
#endif // {inclusion_guard_name}
'''.format(dependencies=indent(self._get_cl_dependency_code(), ' ' * 4 * 3),
inclusion_guard_name='INCLUDE_GUARD_{}'.format(self.get_cl_function_name()),
code=indent('\n' + cl_code + '\n', ' ' * 4 * 3)))
[docs] def get_cl_body(self):
return self._cl_body
[docs] def evaluate(self, inputs, nmr_instances, use_local_reduction=False, cl_runtime_info=None):
def wrap_input_data(input_data):
def get_data_object(param):
if input_data[param.name] is None:
return Scalar(0)
elif isinstance(input_data[param.name], KernelData):
return input_data[param.name]
elif param.is_vector_type and np.squeeze(input_data[param.name]).shape[0] == 3:
return Scalar(input_data[param.name], ctype=param.ctype)
elif is_scalar(input_data[param.name]) \
and not (param.is_pointer_type or param.is_array_type):
return Scalar(input_data[param.name])
else:
if is_scalar(input_data[param.name]):
data = np.ones(nmr_instances) * input_data[param.name]
else:
data = input_data[param.name]
if param.is_pointer_type or param.is_array_type:
return Array(data, ctype=param.ctype, mode='rw')
else:
return Array(data, ctype=param.ctype, mode='r', as_scalar=True)
return {param.name.replace('.', '_'): get_data_object(param) for param in self.get_parameters()}
if isinstance(inputs, Iterable) and not isinstance(inputs, Mapping):
inputs = list(inputs)
if len(inputs) != len(self.get_parameters()):
raise ValueError('The length of the input list ({}), does not equal '
'the number of parameters ({})'.format(len(inputs), len(self.get_parameters())))
param_names = [param.name for param in self.get_parameters()]
inputs = dict(zip(param_names, inputs))
for param in self.get_parameters():
if param.name not in inputs:
names = [param.name for param in self.get_parameters()]
missing_names = [name for name in names if name not in inputs]
raise ValueError('Some parameters are missing an input value, '
'required parameters are: {}, missing inputs are: {}'.format(names, missing_names))
return apply_cl_function(self, wrap_input_data(inputs), nmr_instances,
use_local_reduction=use_local_reduction, cl_runtime_info=cl_runtime_info)
[docs] def get_dependencies(self):
return self._dependencies
def _get_parameter_signatures(self):
"""Get the signature of the parameters for the CL function declaration.
This should return the list of signatures of the parameters for use inside the function signature.
Returns:
list: the signatures of the parameters for the use in the CL code.
"""
declarations = []
for p in self.get_parameters():
new_p = p.get_renamed(p.name.replace('.', '_'))
declarations.append(new_p.get_declaration())
return declarations
def _get_cl_dependency_code(self):
"""Get the CL code for all the CL code for all the dependencies.
Returns:
str: The CL code with the actual code.
"""
code = ''
for d in self._dependencies:
code += d.get_cl_code() + "\n"
return code
@staticmethod
def _resolve_parameters(parameter_list):
params = []
for param in parameter_list:
if isinstance(param, CLFunctionParameter):
params.append(param)
else:
params.append(SimpleCLFunctionParameter(param))
return params
def __str__(self):
return dedent('''
{return_type} {cl_function_name}({parameters}){{
{body}
}}
'''.format(return_type=self.get_return_type(),
cl_function_name=self.get_cl_function_name(),
parameters=', '.join(self._get_parameter_signatures()),
body=indent(dedent(self._cl_body), ' '*4*4)))
def __hash__(self):
return hash(self.__repr__())
def __eq__(self, other):
return type(self) == type(other)
def __ne__(self, other):
return type(self) != type(other)
[docs]class CLFunctionParameter:
@property
def name(self):
"""The name of this parameter.
Returns:
str: the name of this parameter
"""
raise NotImplementedError()
[docs] def get_declaration(self):
"""Get the complete CL declaration for this parameter.
Returns:
str: the declaration for this data type.
"""
raise NotImplementedError()
@property
def ctype(self):
"""Get the ctype of this data type.
For example, if the data type is float4*, we will return float4 here.
Returns:
str: the full ctype of this data type
"""
raise NotImplementedError()
@property
def address_space(self):
"""Get the address space of this data declaration.
Returns:
str: the data type address space, one of ``global``, ``local``, ``constant`` or ``private``.
"""
raise NotImplementedError()
@property
def basic_ctype(self):
"""Get the basic data type without the vector and pointer additions.
For example, if the full data ctype is ``float4*``, we will only return ``float`` here.
Returns:
str: the raw CL data type
"""
raise NotImplementedError()
@property
def is_vector_type(self):
"""Check if this data type is a vector type (like for example double4, float2, int8, etc.).
Returns:
boolean: True if it is a vector type, false otherwise
"""
raise NotImplementedError()
@property
def vector_length(self):
"""Get the length of this vector, returns None if not a vector type.
Returns:
int: the length of the vector type (for example, if the data type is float4, this returns 4).
"""
raise NotImplementedError()
@property
def is_pointer_type(self):
"""Check if this parameter is a pointer type (appended by a ``*``)
Returns:
boolean: True if it is a pointer type, false otherwise
"""
raise NotImplementedError()
@property
def nmr_pointers(self):
"""Get the number of asterisks / pointer references of this data type.
If the data type is float**, we return 2 here.
Returns:
int: the number of pointer asterisks in the data type.
"""
raise NotImplementedError()
@property
def array_sizes(self):
"""Get the dimension of this array type.
This returns for example (10, 5) for the data type float[10][5].
Returns:
Tuple[int]: the sizes of the arrays
"""
raise NotImplementedError()
@property
def is_array_type(self):
"""Check if this parameter is an array type (like float[3] or int[10][5]).
Returns:
boolean: True if this is an array type, false otherwise
"""
raise NotImplementedError()
[docs] def get_renamed(self, name):
"""Get a copy of the current parameter but then with a new name.
Args:
name (str): the new name for this parameter
Returns:
cls: a copy of the current type but with a new name
"""
raise NotImplementedError()
_cl_data_type_parser = tatsu.compile('''
result = [address_space] {type_qualifiers}* ctype {pointer_star}* {pointer_qualifiers}* name {array_size}*;
address_space = ['__'] ('local' | 'global' | 'constant' | 'private');
type_qualifiers = 'const' | 'volatile';
basic_ctype = ?'(unsigned )?\w[\w]*[a-zA-Z]';
vector_type_length = '2' | '3' | '4' | '8' | '16';
ctype = basic_ctype [vector_type_length];
pointer_star = '*';
pointer_qualifiers = 'const' | 'restrict';
name = /[\w\_\-\.]+/;
array_size = /\[\d+\]/;
''')
[docs]class SimpleCLFunctionParameter(CLFunctionParameter):
def __init__(self, declaration):
"""Creates a new function parameter for the CL functions.
Args:
declaration (str): the declaration of this parameter. For example ``global int foo``.
"""
self._address_space = None
self._type_qualifiers = []
self._basic_ctype = ''
self._vector_type_length = None
self._nmr_pointer_stars = 0
self._pointer_qualifiers = []
self._name = ''
self._array_sizes = []
param = self
class Semantics:
def type_qualifiers(self, ast):
if ast in param._type_qualifiers:
raise ValueError('The pre-type qualifier "{}" is present multiple times.'.format(ast))
param._type_qualifiers.append(ast)
return ast
def address_space(self, ast):
param._address_space = ''.join(ast)
return ''.join(ast)
def basic_ctype(self, ast):
param._basic_ctype = ast
return ast
def vector_type_length(self, ast):
param._vector_type_length = int(ast)
return ast
def pointer_star(self, ast):
param._nmr_pointer_stars += 1
return ast
def pointer_qualifiers(self, ast):
if ast in param._pointer_qualifiers:
raise ValueError('The pre-type qualifier "{}" is present multiple times.'.format(ast))
param._pointer_qualifiers.append(ast)
return ast
def name(self, ast):
param._name = ast
return ast
def array_size(self, ast):
param._array_sizes.append(int(ast[1:-1]))
return ast
_cl_data_type_parser.parse(declaration, semantics=Semantics())
@property
def name(self):
return self._name
[docs] def get_renamed(self, name):
new_param = copy(self)
new_param._name = name
return new_param
[docs] def get_declaration(self):
declaration = ''
if self._address_space:
declaration += str(self._address_space) + ' '
if self._type_qualifiers:
declaration += str(' '.join(self._type_qualifiers)) + ' '
declaration += str(self.ctype)
declaration += '*' * self._nmr_pointer_stars
if self._pointer_qualifiers:
declaration += ' ' + str(' '.join(self._pointer_qualifiers)) + ' '
declaration += ' ' + self._name
for s in self._array_sizes:
declaration += '[{}]'.format(s)
return declaration
@property
def ctype(self):
if self._vector_type_length is not None:
return '{}{}'.format(self._basic_ctype, self._vector_type_length)
return self._basic_ctype
@property
def address_space(self):
return self._address_space or 'private'
@property
def basic_ctype(self):
return self._basic_ctype
@property
def is_vector_type(self):
return self._vector_type_length is not None
@property
def vector_length(self):
return self._vector_type_length
@property
def is_pointer_type(self):
return self._nmr_pointer_stars > 0
@property
def nmr_pointers(self):
return self._nmr_pointer_stars
@property
def array_sizes(self):
return self._array_sizes
@property
def is_array_type(self):
return len(self.array_sizes) > 0
[docs]def apply_cl_function(cl_function, kernel_data, nmr_instances, use_local_reduction=False, cl_runtime_info=None):
"""Run the given function/procedure on the given set of data.
This class will wrap the given CL function in a kernel call and execute that that for every data instance using
the provided kernel data. This class will respect the read write setting of the kernel data elements such that
output can be written back to the according kernel data elements.
Args:
cl_function (mot.lib.cl_function.CLFunction): the function to
run on the datasets. Either a name function tuple or an actual CLFunction object.
kernel_data (dict[str: mot.lib.kernel_data.KernelData]): the data to use as input to the function.
nmr_instances (int): the number of parallel threads to run (used as ``global_size``)
use_local_reduction (boolean): set this to True if you want to use local memory reduction in
your CL procedure. If this is set to True we will multiply the global size (given by the nmr_instances)
by the work group sizes.
cl_runtime_info (mot.configuration.CLRuntimeInfo): the runtime information
"""
cl_runtime_info = cl_runtime_info or CLRuntimeInfo()
cl_environments = cl_runtime_info.cl_environments
for param in cl_function.get_parameters():
if param.name not in kernel_data:
names = [param.name for param in cl_function.get_parameters()]
missing_names = [name for name in names if name not in kernel_data]
raise ValueError('Some parameters are missing an input value, '
'required parameters are: {}, missing inputs are: {}'.format(names, missing_names))
if cl_function.get_return_type() != 'void':
kernel_data['_results'] = Zeros((nmr_instances,), cl_function.get_return_type())
workers = []
for ind, cl_environment in enumerate(cl_environments):
worker = _ProcedureWorker(cl_environment, cl_runtime_info.compile_flags,
cl_function, kernel_data, cl_runtime_info.double_precision, use_local_reduction)
workers.append(worker)
def enqueue_batch(batch_size, offset):
items_per_worker = [batch_size // len(cl_environments) for _ in range(len(cl_environments) - 1)]
items_per_worker.append(batch_size - sum(items_per_worker))
for ind, worker in enumerate(workers):
worker.calculate(offset, offset + items_per_worker[ind])
offset += items_per_worker[ind]
worker.cl_queue.flush()
for worker in workers:
worker.cl_queue.finish()
return offset
total_offset = 0
for batch_start, batch_end in split_in_batches(nmr_instances, 1e4 * len(workers)):
total_offset = enqueue_batch(batch_end - batch_start, total_offset)
if cl_function.get_return_type() != 'void':
return kernel_data['_results'].get_data()
class _ProcedureWorker:
def __init__(self, cl_environment, compile_flags, cl_function,
kernel_data, double_precision, use_local_reduction):
self._cl_environment = cl_environment
self._cl_context = cl_environment.context
self._cl_queue = cl_environment.queue
self._cl_function = cl_function
self._kernel_data = OrderedDict(sorted(kernel_data.items()))
self._double_precision = double_precision
self._use_local_reduction = use_local_reduction
self._mot_float_dtype = np.float32
if double_precision:
self._mot_float_dtype = np.float64
for data in self._kernel_data.values():
data.set_mot_float_dtype(self._mot_float_dtype)
self._kernel = self._build_kernel(self._get_kernel_source(), compile_flags)
self._workgroup_size = self._kernel.run_procedure.get_work_group_info(
cl.kernel_work_group_info.PREFERRED_WORK_GROUP_SIZE_MULTIPLE,
self._cl_environment.device)
if not self._use_local_reduction:
self._workgroup_size = 1
self._kernel_inputs = {name: data.get_kernel_inputs(self._cl_context, self._workgroup_size)
for name, data in self._kernel_data.items()}
@property
def cl_environment(self):
"""Get the used CL environment.
Returns:
cl_environment (CLEnvironment): The cl environment to use for calculations.
"""
return self._cl_environment
@property
def cl_queue(self):
"""Get the queue this worker is using for its GPU computations.
The load balancing routine will use this queue to flush and finish the computations.
Returns:
pyopencl queues: the queue used by this worker
"""
return self._cl_queue
def calculate(self, range_start, range_end):
nmr_problems = range_end - range_start
func = self._kernel.run_procedure
func.set_scalar_arg_dtypes(self.get_scalar_arg_dtypes())
kernel_inputs_list = []
for inputs in [self._kernel_inputs[name] for name in self._kernel_data]:
kernel_inputs_list.extend(inputs)
func(self._cl_queue,
(int(nmr_problems * self._workgroup_size),),
(int(self._workgroup_size),),
*kernel_inputs_list,
global_offset=(int(range_start * self._workgroup_size),))
for name, data in self._kernel_data.items():
data.enqueue_readouts(self._cl_queue, self._kernel_inputs[name], range_start, range_end)
def _build_kernel(self, kernel_source, compile_flags=()):
"""Convenience function for building the kernel for this worker.
Args:
kernel_source (str): the kernel source to use for building the kernel
Returns:
cl.Program: a compiled CL kernel
"""
return cl.Program(self._cl_context, kernel_source).build(' '.join(compile_flags))
def _get_kernel_source(self):
assignment = ''
if self._cl_function.get_return_type() != 'void':
assignment = '__results[gid] = '
variable_inits = []
function_call_inputs = []
post_function_callbacks = []
for parameter in self._cl_function.get_parameters():
data = self._kernel_data[parameter.name]
call_args = (parameter.name, '_' + parameter.name, 'gid', parameter.address_space)
variable_inits.append(data.initialize_variable(*call_args))
function_call_inputs.append(data.get_function_call_input(*call_args))
post_function_callbacks.append(data.post_function_callback(*call_args))
kernel_source = ''
kernel_source += get_float_type_def(self._double_precision)
kernel_source += '\n'.join(data.get_type_definitions() for data in self._kernel_data.values())
kernel_source += self._cl_function.get_cl_code()
kernel_source += '''
__kernel void run_procedure(''' + ",\n".join(self._get_kernel_arguments()) + '''){
ulong gid = (ulong)(get_global_id(0) / get_local_size(0));
''' + '\n'.join(variable_inits) + '''
''' + assignment + ' ' + self._cl_function.get_cl_function_name() + '(' + \
', '.join(function_call_inputs) + ''');
''' + '\n'.join(post_function_callbacks) + '''
}
'''
return kernel_source
def _get_kernel_arguments(self):
"""Get the list of kernel arguments for loading the kernel data elements into the kernel.
This will use the sorted keys for looping through the kernel input items.
Returns:
list of str: the list of parameter definitions
"""
declarations = []
for name, data in self._kernel_data.items():
declarations.extend(data.get_kernel_parameters('_' + name))
return declarations
def get_scalar_arg_dtypes(self):
"""Get the location and types of the input scalars.
Returns:
list: for every kernel input element either None if the data is a buffer or the numpy data type if
if is a scalar.
"""
dtypes = []
for name, data in self._kernel_data.items():
dtypes.extend(data.get_scalar_arg_dtypes())
return dtypes