#!/usr/bin/env python
# -*- coding: utf-8 -*-
import copy
import numbers
import warnings
import itertools as it
from collections import namedtuple
import numpy as np
from graphdot.graph import Graph
from graphdot.util import Timer
from graphdot.util.iterable import fold_like, flatten, replace
from graphdot.util.pretty_tuple import pretty_tuple
from ._backend_factory import backend_factory
from .starting_probability import StartingProbability, Uniform, Adhoc
[docs]class MarginalizedGraphKernel:
"""Implements the random walk-based graph similarity kernel as proposed in:
Kashima, H., Tsuda, K., & Inokuchi, A. (2003).
Marginalized kernels between labeled graphs. *In Proceedings of the 20th
international conference on machine learning (ICML-03)* (pp. 321-328).
Parameters
----------
node_kernel: microkernel
A kernelet that computes the similarity between individual nodes
edge_kernel: microkernel
A kernelet that computes the similarity between individual edge
p: positive number (default=1.0) or :py:class:`StartingProbability`
The starting probability of the random walk on each node. Must be
either a positive number or a concrete subclass instance of
:py:class:`StartingProbability`.
q: float in (0, 1)
The probability for the random walk to stop during each step.
q_bounds: pair of floats
The lower and upper bound that the stopping probability can vary during
hyperparameter optimization.
eps: float
The step size used for finite difference approximation of the gradient.
Only used for nodal matrices (``nodal=True``).
dtype: numpy dtype
The data type of the kernel matrix to be returned.
backend: 'auto' or 'cuda' or an instance of
:py:class:`graphdot.kernel.marginalized.Backend`.
The computing engine that solves the marginalized graph kernel's
generalized Laplacian equation.
"""
trait_t = namedtuple(
'Traits', 'diagonal, symmetric, nodal, lmin, eval_gradient'
)
[docs] @classmethod
def traits(cls, diagonal=False, symmetric=False, nodal=False, lmin=0,
eval_gradient=False):
traits = cls.trait_t(
diagonal, symmetric, nodal, lmin, eval_gradient
)
return traits
def __init__(self, node_kernel, edge_kernel, p=1.0, q=0.01,
q_bounds=(1e-4, 1 - 1e-4), eps=1e-2, ftol=1e-8, gtol=1e-6,
dtype=np.float, backend='auto'):
self.node_kernel = node_kernel
self.edge_kernel = edge_kernel
self.p = self._get_starting_probability(p)
self.q = q
self.q_bounds = q_bounds
self.eps = eps
self.ftol = ftol
self.gtol = gtol
self.element_dtype = dtype
self.backend = backend_factory(backend)
if self.node_kernel.minmax[0] <= 0 or self.node_kernel.minmax[1] > 1:
warnings.warn(
'Node kernel value range should be within (0, 1], '
f'got {self.node_kernel.minmax} for {self.node_kernel}. '
'This will not be allowed in a future version. '
'Consider adding a small constant or using the `.normalized` '
'attribute of the kernel.',
DeprecationWarning
)
if self.edge_kernel.minmax[0] < 0 or self.edge_kernel.minmax[1] > 1:
warnings.warn(
'Edge kernel value range must be within [0, 1], '
f'got {self.edge_kernel.minmax} for {self.edge_kernel}. '
'This will not be allowed in a future version. '
'Consider adding a small constant or using the `.normalized` '
'attribute of the kernel.',
DeprecationWarning
)
def _get_starting_probability(self, p):
if isinstance(p, StartingProbability):
return p
elif isinstance(p, tuple) and len(p) == 2:
f, expr = p
if callable(f) and isinstance(expr, str):
return Adhoc(f, expr)
else:
raise ValueError(
'An ad hoc starting probability must be specified as an'
'(callable, C++ expression) pair.'
)
elif isinstance(p, numbers.Number):
if p > 0:
return Uniform(p)
else:
raise ValueError(f'Starting probability {p} < 0.')
else:
raise ValueError(f'Unknown starting probability: {p}')
[docs] def __call__(self, X, Y=None, eval_gradient=False, nodal=False, lmin=0,
timing=False):
"""Compute pairwise similarity matrix between graphs
Parameters
----------
X: list of N graphs
The graphs must all have same node and edge attributes.
Y: None or list of M graphs
The graphs must all have same node and edge attributes.
eval_gradient: Boolean
If True, computes the gradient of the kernel matrix with respect
to hyperparameters and return it alongside the kernel matrix.
nodal: bool
If True, return node-wise similarities; otherwise, return graphwise
similarities.
lmin: 0 or 1
Number of steps to skip in each random walk path before similarity
is computed.
(lmin + 1) corresponds to the starting value of l in the summation
of Eq. 1 in Tang & de Jong, 2019 https://doi.org/10.1063/1.5078640
(or the first unnumbered equation in Section 3.3 of Kashima, Tsuda,
and Inokuchi, 2003).
Returns
-------
kernel_matrix: ndarray
if Y is None, return a square matrix containing pairwise
similarities between the graphs in X; otherwise, returns a matrix
containing similarities across graphs in X and Y.
gradient: ndarray
The gradient of the kernel matrix with respect to kernel
hyperparameters. Only returned if eval_gradient is True.
"""
timer = Timer()
backend = self.backend
traits = self.traits(
symmetric=Y is None,
nodal=nodal,
lmin=lmin,
eval_gradient=eval_gradient
)
''' assert graph attributes are compatible with each other '''
all_graphs = list(it.chain(X, Y)) if Y is not None else X
pred_or_tuple = Graph.has_unified_types(all_graphs)
if pred_or_tuple is not True:
group, first, second = pred_or_tuple
raise TypeError(
f'The two graphs have mismatching {group} attributes or '
'attribute types. If the attributes match in name but differ '
'in type, try `Graph.unify_datatype` as an automatic fix.\n'
f'First graph: {first}\n'
f'Second graph: {second}\n'
)
''' generate jobs '''
timer.tic('generating jobs')
if traits.symmetric:
i, j = np.triu_indices(len(X))
i, j = i.astype(np.uint32), j.astype(np.uint32)
else:
i, j = np.indices((len(X), len(Y)), dtype=np.uint32)
j += len(X)
jobs = backend.array(
np.column_stack((i.ravel(), j.ravel()))
.ravel()
.view(np.dtype([('i', np.uint32), ('j', np.uint32)]))
)
timer.toc('generating jobs')
''' create output buffer '''
timer.tic('creating output buffer')
if traits.symmetric:
starts = backend.zeros(len(X) + 1, dtype=np.uint32)
if traits.nodal is True:
sizes = np.array([len(g.nodes) for g in X], dtype=np.uint32)
np.cumsum(sizes, out=starts[1:])
n_nodes_X = int(starts[-1])
output_shape = (n_nodes_X, n_nodes_X)
else:
starts[:] = np.arange(len(X) + 1)
output_shape = (len(X), len(X))
else:
starts = backend.zeros(len(X) + len(Y) + 1, dtype=np.uint32)
if traits.nodal is True:
sizes = np.array([len(g.nodes) for g in X]
+ [len(g.nodes) for g in Y],
dtype=np.uint32)
np.cumsum(sizes, out=starts[1:])
n_nodes_X = int(starts[len(X)])
starts[len(X):] -= n_nodes_X
n_nodes_Y = int(starts[-1])
output_shape = (n_nodes_X, n_nodes_Y)
else:
starts[:len(X)] = np.arange(len(X))
starts[len(X):] = np.arange(len(Y) + 1)
output_shape = (len(X), len(Y))
gramian = backend.empty(int(np.prod(output_shape)), np.float32)
if traits.eval_gradient is True:
gradient = backend.empty(
self.n_dims * int(np.prod(output_shape)), np.float32
)
else:
gradient = None
timer.toc('creating output buffer')
''' call GPU kernel '''
timer.tic('calling GPU kernel (overall)')
backend(
np.concatenate((X, Y)) if Y is not None else X,
self.node_kernel,
self.edge_kernel,
self.p,
self.q,
self.eps,
self.ftol,
self.gtol,
jobs,
starts,
gramian,
gradient,
output_shape[0],
output_shape[1],
self.n_dims,
traits,
timer,
)
timer.toc('calling GPU kernel (overall)')
''' collect result '''
timer.tic('collecting result')
gramian = gramian.reshape(*output_shape, order='F')
if gradient is not None:
gradient = gradient.reshape(
(*output_shape, self.n_dims), order='F'
)[:, :, self.active_theta_mask]
timer.toc('collecting result')
if timing:
timer.report(unit='ms')
timer.reset()
if traits.eval_gradient is True:
return (
gramian.astype(self.element_dtype),
gradient.astype(self.element_dtype)
)
else:
return gramian.astype(self.element_dtype)
[docs] def diag(self, X, eval_gradient=False, nodal=False, lmin=0,
active_theta_only=True, timing=False):
"""Compute the self-similarities for a list of graphs
Parameters
----------
X: list of N graphs
The graphs must all have same node attributes and edge attributes.
eval_gradient: Boolean
If True, computes the gradient of the kernel matrix with respect
to hyperparameters and return it alongside the kernel matrix.
nodal: bool
If True, returns a vector containing nodal self similarties; if
False, returns a vector containing graphs' overall self
similarities; if 'block', return a list of square matrices which
forms a block-diagonal matrix, where each diagonal block represents
the pairwise nodal similarities within a graph.
lmin: 0 or 1
Number of steps to skip in each random walk path before similarity
is computed.
(lmin + 1) corresponds to the starting value of l in the summation
of Eq. 1 in Tang & de Jong, 2019 https://doi.org/10.1063/1.5078640
(or the first unnumbered equation in Section 3.3 of Kashima, Tsuda,
and Inokuchi, 2003).
active_theta_only: bool
Whether or not to return only gradients with regard to the
non-fixed hyperparameters.
Returns
-------
diagonal: numpy.array or list of np.array(s)
If nodal=True, returns a vector containing nodal self similarties;
if nodal=False, returns a vector containing graphs' overall self
similarities; if nodal = 'block', return a list of square matrices,
each being a pairwise nodal similarity matrix within a graph.
gradient
The gradient of the kernel matrix with respect to kernel
hyperparameters. Only returned if eval_gradient is True.
"""
timer = Timer()
backend = self.backend
traits = self.traits(
diagonal=True,
nodal=nodal,
lmin=lmin,
eval_gradient=eval_gradient
)
''' assert graph attributes are compatible with each other '''
pred_or_tuple = Graph.has_unified_types(X)
if pred_or_tuple is not True:
group, first, second = pred_or_tuple
raise TypeError(
f'The two graphs have mismatching {group} attributes or '
'attribute types.'
'If the attribute names do match, then try to unify data '
'types automatically with `Graph.unify_datatype`.\n'
f'First graph: {first}\n'
f'Second graph: {second}\n'
)
''' generate jobs '''
timer.tic('generating jobs')
i = np.arange(len(X), dtype=np.uint32)
jobs = backend.array(
np.column_stack((i, i))
.ravel()
.view(np.dtype([('i', np.uint32), ('j', np.uint32)]))
)
timer.toc('generating jobs')
''' create output buffer '''
timer.tic('creating output buffer')
starts = backend.zeros(len(X) + 1, dtype=np.uint32)
if nodal is True:
sizes = np.array([len(g.nodes) for g in X], dtype=np.uint32)
np.cumsum(sizes, out=starts[1:])
output_length = int(starts[-1])
elif nodal is False:
starts[:] = np.arange(len(X) + 1)
output_length = len(X)
elif nodal == 'block':
sizes = np.array([len(g.nodes) for g in X], dtype=np.uint32)
np.cumsum(sizes**2, out=starts[1:])
output_length = int(starts[-1])
else:
raise ValueError("Invalid 'nodal' option '%s'" % nodal)
gramian = backend.empty(output_length, np.float32)
if traits.eval_gradient is True:
gradient = backend.empty(self.n_dims * output_length, np.float32)
else:
gradient = None
timer.toc('creating output buffer')
''' call GPU kernel '''
timer.tic('calling GPU kernel (overall)')
backend(
X,
self.node_kernel,
self.edge_kernel,
self.p,
self.q,
self.eps,
self.ftol,
self.gtol,
jobs,
starts,
gramian,
gradient,
output_length,
1,
self.n_dims,
traits,
timer,
)
timer.toc('calling GPU kernel (overall)')
''' post processing '''
timer.tic('collecting result')
if gradient is not None:
gradient = gradient.reshape(
(output_length, self.n_dims), order='F'
)
if active_theta_only:
gradient = gradient[:, self.active_theta_mask]
if nodal == 'block':
retval = [gramian[s:s + n**2].reshape(n, n)
for s, n in zip(starts[:-1], sizes)]
elif traits.eval_gradient is True:
retval = (
gramian.astype(self.element_dtype),
gradient.astype(self.element_dtype)
)
else:
retval = gramian.astype(self.element_dtype)
timer.toc('collecting result')
if timing:
timer.report(unit='ms')
timer.reset()
return retval
"""⭣⭣⭣⭣⭣ scikit-learn interoperability methods ⭣⭣⭣⭣⭣"""
[docs] def is_stationary(self):
return False
@property
def requires_vector_input(self):
return False
@property
def hyperparameters(self):
'''A hierarchical representation of all the kernel hyperparameters.'''
return pretty_tuple(
'MarginalizedGraphKernel',
['starting_probability', 'stopping_probability', 'node_kernel',
'edge_kernel']
)(self.p.theta,
self.q,
self.node_kernel.theta,
self.edge_kernel.theta)
@property
def flat_hyperparameters(self):
return np.fromiter(flatten(self.hyperparameters), np.float)
@property
def hyperparameter_bounds(self):
return pretty_tuple(
'GraphKernelHyperparameterBounds',
['starting_probability', 'stopping_probability', 'node_kernel',
'edge_kernel']
)(self.p.bounds,
self.q_bounds,
self.node_kernel.bounds,
self.edge_kernel.bounds)
@property
def n_dims(self):
'Number of hyperparameters including both optimizable and fixed ones.'
return len(self.flat_hyperparameters)
@property
def active_theta_mask(self):
lower, upper = np.reshape(
np.fromiter(
flatten( # flatten the newly introduced (nan, nan) tuples
replace( # 'fixed' -> (nan, nan)
flatten(self.hyperparameter_bounds), # tree -> list
'fixed',
(np.nan, np.nan)
)
),
dtype=np.float
),
(2, -1),
order='F'
)
inactive = np.isnan(lower) | np.isnan(upper) | (lower == upper)
return ~inactive
@property
def theta(self):
'''The logarithms of a flattened array of kernel hyperparameters,
excluing those declared as 'fixed' or those with equal lower and upper
bounds.'''
return np.log(self.flat_hyperparameters[self.active_theta_mask])
@theta.setter
def theta(self, value):
hypers = np.log(self.flat_hyperparameters)
hypers[self.active_theta_mask] = value
(self.p.theta,
self.q,
self.node_kernel.theta,
self.edge_kernel.theta
) = fold_like(np.exp(hypers), self.hyperparameters)
@property
def bounds(self):
'''The logarithms of a reshaped X-by-2 array of kernel hyperparameter
bounds, excluing those declared as 'fixed' or those with equal lower
and upper bounds.'''
return np.log(
np.fromiter(
flatten(
replace(
flatten(self.hyperparameter_bounds),
'fixed',
(np.nan, np.nan)
)
),
np.float
).reshape(-1, 2, order='C')[self.active_theta_mask, :]
)
[docs] def clone_with_theta(self, theta):
clone = copy.deepcopy(self)
clone.theta = theta
return clone