#!/usr/bin/env python
# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from collections import OrderedDict
from itertools import starmap
import operator
import numpy as np
import sympy as sy
from sympy.utilities.lambdify import lambdify
from graphdot.codegen import Template
from graphdot.codegen.sympy_printer import cudacxxcode
from graphdot.codegen.cpptool import cpptype
from graphdot.util.pretty_tuple import pretty_tuple
[docs]class MicroKernel(ABC):
'''The abstract base class for all microkernels.'''
@property
@abstractmethod
def name(self):
'''Name of the kernel.'''
@property
def normalized(self):
r'''A normalized version of the original kernel using the dot product
formula: :py:math:`k^\mathrm{normalized}(i, j) =
\frac{k(i, j)}{\sqrt{k(i, i) k(j, j)}}`.'''
return Normalize(self)
[docs] @abstractmethod
def __call__(self, i, j, jac=False):
'''Evaluates the kernel.
Parameters
----------
i, j: feature
Inputs to the kernel.
jac: Boolean
Whether or not to return the gradient of the kernel with respect to
kernel hyperparameters alongside the kernel value.
Returns
-------
k_ij: scalar
The value of the kernel as evaluated on i and j.
jacobian: 1D ndarray
The gradient of the kernel with regard to hyperparameters.
'''
@abstractmethod
def __repr__(self):
'''Evaluating the representation of a kernel should create an exact
instance of the kernel itself.'''
[docs] @abstractmethod
def gen_expr(self, x, y, theta_scope=''):
'''Generate the C++ expression for evaluating the kernel and its
partial derivatives.
Parameters
----------
x, y: str
Name of the input variables.
theta_scope: str
The scope in which the hyperparameters is located.
Returns
-------
expr: str
A C++ expression that evaluates the kernel.
jac_expr: list of strs
C++ expressions that evaluate the derivative of the kernel.
'''
@property
@abstractmethod
def theta(self):
'''A tuple of all the kernel hyperparameters.'''
@theta.setter
@abstractmethod
def theta(self, value):
'''Method for setting the kernel hyperparameters from a tuple.'''
@property
@abstractmethod
def bounds(self):
'''A list of 2-tuples for the lower and upper bounds of each kernel
hyperparameter.'''
@property
@abstractmethod
def minmax(self):
'''A 2-tuple of the minimum and maximum values that the kernel could
take.'''
def _assert_bounds(self, hyp, bounds):
if not ((isinstance(bounds, tuple) and len(bounds) == 2)
or bounds == 'fixed'):
raise ValueError(
f'Bounds for hyperparameter {hyp} of kernel {self.name} must '
f'be a 2-tuple or "fixed": {bounds} provided.'
)
[docs] @staticmethod
def from_sympy(
name, desc, expr, vars, *hyperparameter_specs, minmax=(0, 1)
):
'''Create a pairwise kernel class from a SymPy expression.
Parameters
----------
name: str
The name of the kernel. Must be a valid Python identifier.
desc: str
A human-readable description of the kernel. Will be used to build
the docstring of the returned kernel class.
expr: str or SymPy expression
Expression of the kernel in SymPy format.
vars: 2-tuple of str or SymPy symbols
The input variables of the kernel as shown up in the expression.
A kernel must have exactly 2 input variables. All other symbols
that show up in its expression should be regarded as
hyperparameters.
hyperparameter_specs: list of hyperparameter specifications in one of
the formats below:
| symbol,
| (symbol,),
| (symbol, dtype),
| (symbol, dtype, description),
| (symbol, dtype, lower_bound, upper_bound),
| (symbol, dtype, lower_bound, upper_bound, description),
If a default set of lower and upper bounds are not defined here,
then it must be specified explicitly during kernel object
creation, using arguments as specified in the kernel class's
docstring.
'''
return _from_sympy(
name, desc, expr, vars, *hyperparameter_specs, minmax=minmax
)
[docs] def __add__(self, k):
r"""Implements the additive kernel composition semantics, i.e.
expression ``k1 + k2`` creates
:math:`k_+(a, b) = k_1(a, b) + k_2(a, b)`"""
return MicroKernelExpr.add(self, k)
def __radd__(self, k):
return MicroKernelExpr.add(k, self)
[docs] def __mul__(self, k):
r"""Implements the multiplicative kernel composition semantics, i.e.
expression ``k1 * k2`` creates
:math:`k_\times(a, b) = k_1(a, b) \times k_2(a, b)`"""
return MicroKernelExpr.mul(self, k)
def __rmul__(self, k):
return MicroKernelExpr.mul(k, self)
def __pow__(self, c):
r"""Implements the exponentiation semantics, i.e.
expression ``k1**c`` creates
:math:`k_{exp}(a, b) = k_1(a, b) ** k_2(a, b)`"""
return MicroKernelExpr.pow(self, c)
class MicroKernelExpr(MicroKernel):
@property
@abstractmethod
def opstr(self):
pass
def __init__(self, k1, k2):
self.k1 = k1
self.k2 = k2
def __repr__(self):
return f'{repr(self.k1)} {self.opstr} {repr(self.k2)}'
@property
def theta(self):
return pretty_tuple(self.name, ['lhs', 'rhs'])(
self.k1.theta, self.k2.theta
)
@theta.setter
def theta(self, seq):
self.k1.theta = seq[0]
self.k2.theta = seq[1]
@property
def bounds(self):
return (self.k1.bounds, self.k2.bounds)
@staticmethod
def add(k1, k2):
k1 = Constant(k1) if np.isscalar(k1) else k1
k2 = Constant(k2) if np.isscalar(k2) else k2
@cpptype(k1=k1.dtype, k2=k2.dtype)
class Add(MicroKernelExpr):
@property
def opstr(self):
return '+'
@property
def name(self):
return 'Add'
def __call__(self, i, j, jac=False):
if jac is True:
f1, J1 = self.k1(i, j, True)
f2, J2 = self.k2(i, j, True)
return (f1 + f2, J1 + J2)
else:
return self.k1(i, j, False) + self.k2(i, j, False)
def gen_expr(self, x, y, theta_scope=''):
f1, J1 = self.k1.gen_expr(x, y, theta_scope + 'k1.')
f2, J2 = self.k2.gen_expr(x, y, theta_scope + 'k2.')
return (f'({f1} + {f2})', J1 + J2)
@property
def minmax(self):
return tuple(starmap(operator.add, zip(k1.minmax, k2.minmax)))
return Add(k1, k2)
@staticmethod
def mul(k1, k2):
k1 = Constant(k1) if np.isscalar(k1) else k1
k2 = Constant(k2) if np.isscalar(k2) else k2
@cpptype(k1=k1.dtype, k2=k2.dtype)
class Multiply(MicroKernelExpr):
@property
def opstr(self):
return '*'
@property
def name(self):
return 'Multiply'
def __call__(self, i, j, jac=False):
if jac is True:
f1, J1 = self.k1(i, j, True)
f2, J2 = self.k2(i, j, True)
return (
f1 * f2,
np.array(
[j1 * f2 for j1 in J1] + [f1 * j2 for j2 in J2]
)
)
else:
return self.k1(i, j, False) * self.k2(i, j, False)
def gen_expr(self, x, y, theta_scope=''):
f1, J1 = self.k1.gen_expr(x, y, theta_scope + 'k1.')
f2, J2 = self.k2.gen_expr(x, y, theta_scope + 'k2.')
return (
f'({f1} * {f2})',
[f'({j1} * {f2})' for j1 in J1] +
[f'({f1} * {j2})' for j2 in J2]
)
@property
def minmax(self):
return tuple(starmap(operator.mul, zip(k1.minmax, k2.minmax)))
return Multiply(k1, k2)
@staticmethod
def pow(k1, c):
if np.isscalar(c):
k2 = Constant(c)
elif isinstance(c, MicroKernel) and c.name == 'Constant':
k2 = c
else:
raise ValueError(
f'Exponent must be a constant or constant microkernel, '
f'got {c} instead.'
)
@cpptype(k1=k1.dtype, k2=k2.dtype)
class Exponentiation(MicroKernelExpr):
@property
def opstr(self):
return '**'
@property
def name(self):
return 'Exponentiation'
def __call__(self, i, j, jac=False):
if jac is True:
# d(x^y) / dx = y * x^(y - 1)
# d(x^y) / dy = x^y * log(x)
f1, J1 = self.k1(i, j, True)
f2, J2 = self.k2(i, j, True)
return (
f1**f2,
np.array(
[f2 * f1**(f2 - 1) * j1 for j1 in J1] +
[f1**f2 * np.log(f1) * j2 for j2 in J2]
)
)
else:
return self.k1(i, j, False)**self.k2(i, j, False)
def gen_expr(self, x, y, theta_scope=''):
f1, J1 = self.k1.gen_expr(x, y, theta_scope + 'k1.')
f2, J2 = self.k2.gen_expr(x, y, theta_scope + 'k2.')
return (
f'__powf({f1}, {f2})',
[f'({f2} * __powf({f1}, {f2} - 1) * {j})' for j in J1] +
[f'(__powf({f1}, {f2}) * __logf({f1}) * {j})' for j in J2]
)
@property
def minmax(self):
return tuple(starmap(operator.pow, zip(k1.minmax, k2.minmax)))
return Exponentiation(k1, k2)
[docs]def Constant(c, c_bounds='fixed'):
r"""Creates a no-op microkernel that returns a constant value,
i.e. :math:`k_\mathrm{c}(\cdot, \cdot) \equiv constant`. This kernel is
often mutliplied with other microkernels as an adjustable weight.
Parameters
----------
c: float > 0
The constant value.
"""
@cpptype(c=np.float32)
class ConstantKernel(MicroKernel):
@property
def name(self):
return 'Constant'
def __init__(self, c, c_bounds):
self.c = float(c)
self.c_bounds = c_bounds
self._assert_bounds('c', c_bounds)
def __call__(self, i, j, jac=False):
if jac is True:
return self.c, np.ones(1)
else:
return self.c
def __repr__(self):
return f'{self.name}({self.c})'
def gen_expr(self, x, y, theta_scope=''):
return f'{theta_scope}c', ['1.0f']
@property
def theta(self):
return pretty_tuple(
self.name,
['c']
)(self.c)
@theta.setter
def theta(self, seq):
self.c = seq[0]
@property
def bounds(self):
return (self.c_bounds,)
@property
def minmax(self):
return (self.c, self.c)
return ConstantKernel(c, c_bounds)
[docs]def Normalize(kernel: MicroKernel):
r"""Normalize the value range of a microkernel to [0, 1] using the cosine
of angle formula: :math:`k_{normalized}(x, y) = \frac{k(x, y)}
{\sqrt{k(x, x) k(y, y)}}`.
Parameters
----------
kernel:
The microkernel to be normalized.
"""
if kernel.name == 'Normalize':
# avoid repeated normalization
return kernel
else:
@cpptype(kernel=kernel.dtype)
class Normalized(MicroKernel):
@property
def name(self):
return 'Normalize'
def __init__(self, kernel):
self.kernel = kernel
def __call__(self, X, Y, jac=False):
if jac is True:
Fxx, Jxx = self.kernel(X, X, jac=True)
Fxy, Jxy = self.kernel(X, Y, jac=True)
Fyy, Jyy = self.kernel(Y, Y, jac=True)
if Fxx > 0 and Fyy > 0:
return (
Fxy * (Fxx * Fyy)**-0.5,
(Jxy * (Fxx * Fyy)**-0.5
- (0.5 * Fxy * (Fxx * Fyy)**-1.5
* (Jxx * Fyy + Fxx * Jyy)))
)
else:
return (0.0, np.zeros_like(Jxy))
else:
Fxx = self.kernel(X, X)
Fxy = self.kernel(X, Y)
Fyy = self.kernel(Y, Y)
if Fxx > 0 and Fyy > 0:
return Fxy * (Fxx * Fyy)**-0.5
else:
return 0.0
def __repr__(self):
return f'{self.name}({repr(self.kernel)})'
def gen_expr(self, x, y, theta_scope=''):
F, J = self.kernel.gen_expr(
'_1', '_2', theta_scope + 'kernel.'
)
f = Template(
r'''normalize(
[&](auto _1, auto _2){return ${f};},
${x}, ${y}
)'''
).render(
x=x, y=y, f=F
)
template = Template(
r'''normalize_jacobian(
[&](auto _1, auto _2){return ${f};},
[&](auto _1, auto _2){return ${j};},
${x},
${y}
)'''
)
jacobian = [template.render(x=x, y=y, f=F, j=j) for j in J]
return f, jacobian
@property
def theta(self):
return self.kernel.theta
@theta.setter
def theta(self, seq):
self.kernel.theta = seq
@property
def bounds(self):
return self.kernel.bounds
@property
def minmax(self):
lo, hi = self.kernel.minmax
return (lo / hi, 1)
return Normalized(kernel)
def _from_sympy(name, desc, expr, vars, *hyperparameter_specs, minmax=(0, 1)):
'''Create a microkernel class from a SymPy expression.
Parameters
----------
name: str
The name of the microkernel. Must be a valid Python identifier.
desc: str
A human-readable description of the microkernel. Will be used to build
the docstring of the returned microkernel class.
expr: str or SymPy expression
Expression of the microkernel in SymPy format.
vars: 2-tuple of str or SymPy symbols
The input variables of the microkernel as shown up in the expression.
A microkernel must have exactly 2 input variables. All other symbols
that show up in its expression should be regarded as
hyperparameters.
hyperparameter_specs: list of hyperparameter specifications in one of
the formats below:
| symbol,
| (symbol,),
| (symbol, dtype),
| (symbol, dtype, description),
| (symbol, dtype, lower_bound, upper_bound),
| (symbol, dtype, lower_bound, upper_bound, description),
If a default set of lower and upper bounds are not defined here,
then it must be specified explicitly during microkernel object
creation, using arguments as specified in the microkernel class's
docstring.
minmax: a 2-tuple of floats
The minimum and maximum value that the kernel can output.
'''
assert(isinstance(name, str) and name.isidentifier())
'''parse expression'''
if isinstance(expr, str):
expr = sy.sympify(expr)
'''check input variables'''
if len(vars) != 2:
raise ValueError('A microkernel must have exactly two variables')
vars = [sy.Symbol(v) if isinstance(v, str) else v for v in vars]
'''parse the list of hyperparameters'''
hyperdefs = OrderedDict()
for spec in hyperparameter_specs:
if not hasattr(spec, '__iter__'):
symbol = spec
hyperdefs[symbol] = dict(dtype=np.dtype(np.float32))
if len(spec) == 1:
symbol = spec[0]
hyperdefs[symbol] = dict(dtype=np.dtype(np.float32))
if len(spec) == 2:
symbol, dtype = spec
hyperdefs[symbol] = dict(dtype=np.dtype(dtype))
if len(spec) == 3:
symbol, dtype, doc = spec
hyperdefs[symbol] = dict(dtype=np.dtype(dtype), doc=doc)
elif len(spec) == 4:
symbol, dtype, lb, ub = spec
hyperdefs[symbol] = dict(dtype=np.dtype(dtype),
bounds=(lb, ub))
elif len(spec) == 5:
symbol, dtype, lb, ub, doc = spec
hyperdefs[symbol] = dict(dtype=np.dtype(dtype),
bounds=(lb, ub),
doc=doc)
else:
raise ValueError(
'Invalid hyperparameter specification, '
'must be one of\n'
'(symbol)\n',
'(symbol, dtype)\n',
'(symbol, dtype, doc)\n',
'(symbol, dtype, lb, ub)\n',
'(symbol, dtype, lb, ub, doc)\n',
)
'''create microkernel class'''
class CppType(type(MicroKernel)):
@property
def dtype(cls):
return cls._dtype
class uKernel(MicroKernel, metaclass=CppType):
_expr = expr
_vars = vars
_hyperdefs = hyperdefs
_dtype = np.dtype([(k, v['dtype']) for k, v in hyperdefs.items()],
align=True)
@property
def name(self):
return name
def __init__(self, *args, **kwargs):
self._theta_values = values = OrderedDict()
self._theta_bounds = bounds = OrderedDict()
for symbol, value in zip(self._hyperdefs, args):
values[symbol] = value
for symbol in self._hyperdefs:
try:
values[symbol] = kwargs[symbol]
except KeyError:
if symbol not in values:
raise KeyError(
f'Hyperparameter {symbol} not provided '
f'for {self.name}'
)
try:
bounds[symbol] = kwargs['%s_bounds' % symbol]
except KeyError:
try:
bounds[symbol] = self._hyperdefs[symbol]['bounds']
except KeyError:
raise KeyError(
f'Bounds for hyperparameter {symbol} of '
f'microkernel {self.name} not set, and '
f'no defaults were given.'
)
self._assert_bounds(symbol, bounds[symbol])
# @cached_property
@property
def _vars_and_hypers(self):
if not hasattr(self, '_vars_and_hypers_cached'):
self._vars_and_hypers_cached = [
*self._vars, *self._hyperdefs.keys()
]
return self._vars_and_hypers_cached
# @cached_property
@property
def _fun(self):
if not hasattr(self, '_fun_cached'):
self._fun_cached = lambdify(
self._vars_and_hypers,
self._expr
)
return self._fun_cached
# return lambdify(self._vars_and_hypers, self._expr)
# @cached_property
@property
def _jac(self):
if not hasattr(self, '_jac_cached'):
self._jac_cached = [
lambdify(self._vars_and_hypers, sy.diff(expr, h))
for h in self._hyperdefs
]
return self._jac_cached
# return [lambdify(self._vars_and_hypers, sy.diff(expr, h))
# for h in self._hyperdefs]
def __call__(self, x1, x2, jac=False):
if jac is True:
return (
self._fun(x1, x2, *self.theta),
np.array([j(x1, x2, *self.theta) for j in self._jac])
)
else:
return self._fun(x1, x2, *self.theta)
def __repr__(self):
return Template('${cls}(${theta, }, ${bounds, })').render(
cls=self.name,
theta=[f'{n}={v}' for n, v in self._theta_values.items()],
bounds=[f'{n}_bounds={v}'
for n, v in self._theta_bounds.items()]
)
def gen_expr(self, x, y, theta_scope=''):
nmap = {
str(self._vars[0]): x,
str(self._vars[1]): y,
**{t: theta_scope + t for t in self._hyperdefs}
}
return (
cudacxxcode(self._expr, nmap),
[cudacxxcode(sy.diff(self._expr, h), nmap)
for h in self._hyperdefs]
)
@property
def dtype(self):
return self._dtype
@property
def state(self):
return tuple(self._theta_values.values())
@property
def theta(self):
return pretty_tuple(
self.name,
self._theta_values.keys()
)(**self._theta_values)
@theta.setter
def theta(self, seq):
assert(len(seq) == len(self._theta_values))
for theta, value in zip(self._hyperdefs, seq):
self._theta_values[theta] = value
@property
def bounds(self):
return tuple(self._theta_bounds.values())
@property
def minmax(self):
return minmax
'''furnish doc strings'''
param_docs = [
Template(
'${name}: ${type}\n'
' ${desc\n }\n'
'${name}_bounds: tuple or "fixed"\n'
' Lower and upper bounds of `${name}` with respect to '
'hyperparameter optimization. If "fixed", the hyperparameter will '
'not be optimized during training.'
).render(
name=name,
type=hdef['dtype'],
desc=[s.strip() for s in hdef.get('doc', '').split('\n')]
) for name, hdef in hyperdefs.items()
]
uKernel.__doc__ = Template(
'${desc}\n'
'\n'
'Parameters\n'
'----------\n'
'${param_docs\n}',
escape=False
).render(
desc='\n'.join([s.strip() for s in desc.split('\n')]),
param_docs=param_docs
)
return uKernel