Source code for graphdot.model.gaussian_process.nystrom

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import warnings
import numpy as np
from graphdot.util.printer import markdown as mprint
import graphdot.linalg.low_rank as lr
from graphdot.linalg.spectral import powerh
from .base import GaussianProcessRegressorBase


[docs]class LowRankApproximateGPR(GaussianProcessRegressorBase): r"""Accelerated Gaussian process regression (GPR) using the Nystrom low-rank approximation. Parameters ---------- kernel: kernel instance The covariance function of the GP. alpha: float > 0 Value added to the diagonal of the core matrix during fitting. Larger values correspond to increased noise level in the observations. A practical usage of this parameter is to prevent potential numerical stability issues during fitting, and ensures that the core matrix is always positive definite in the precense of duplicate entries and/or round-off error. beta: float > 0 Cutoff value on the singular values for the spectral pseudoinverse of the low-rank kernel matrix. optimizer: one of (str, True, None, callable) A string or callable that represents one of the optimizers usable in the scipy.optimize.minimize method. If None, no hyperparameter optimization will be carried out in fitting. If True, the optimizer will default to L-BFGS-B. normalize_y: boolean Whether to normalize the target values y so that the mean and variance become 0 and 1, respectively. Recommended for cases where zero-mean, unit-variance kernels are used. The normalization will be reversed when the GP predictions are returned. regularization: '+' or 'additive' or '*' or 'multiplicative' Determines the method of regularization. If '+' or 'additive', ``alpha`` is added to the diagonals of the kernel matrix. If '*' or 'multiplicative', a factor of ``1 + alpha`` will be multiplied with each diagonal element. kernel_options: dict, optional A dictionary of additional options to be passed along when applying the kernel to data. """ def __init__(self, kernel, alpha=1e-7, beta=1e-7, optimizer=None, normalize_y=False, regularization='+', kernel_options={}): super().__init__( kernel, normalize_y=normalize_y, regularization=regularization, kernel_options=kernel_options ) self.alpha = alpha self.beta = beta self.optimizer = optimizer if optimizer is True: self.optimizer = 'L-BFGS-B' @property def C(self): '''The core sample set for constructing the subspace for low-rank approximation.''' try: return self._C except AttributeError: raise AttributeError( 'Core samples do not exist. Please provide using fit().' ) @C.setter def C(self, C): self._C = C def _corespace(self, C=None, Kcc=None): assert(C is None or Kcc is None) if Kcc is None: Kcc = self._gramian(self.alpha, C) try: return powerh(Kcc, -0.5, return_symmetric=False) except np.linalg.LinAlgError: warnings.warn( 'Core matrix singular, try to increase `alpha`.\n' 'Now falling back to use a pseudoinverse.' ) try: return powerh(Kcc, -0.5, rcond=self.beta, mode='clamp', return_symmetric=False) except np.linalg.LinAlgError: raise np.linalg.LinAlgError( 'The core matrix is likely corrupted with NaNs and Infs ' 'because a pseudoinverse could not be computed.' )
[docs] def fit(self, C, X, y, loss='likelihood', tol=1e-5, repeat=1, theta_jitter=1.0, verbose=False): """Train a low-rank approximate GPR model. If the `optimizer` argument was set while initializing the GPR object, the hyperparameters of the kernel will be optimized using the specified loss function. Parameters ---------- C: list of objects or feature vectors. The core set that defines the subspace of low-rank approximation. X: list of objects or feature vectors. Input values of the training data. y: 1D array Output/target values of the training data. loss: 'likelihood' or 'loocv' The loss function to be minimzed during training. Could be either 'likelihood' (negative log-likelihood) or 'loocv' (mean-square leave-one-out cross validation error). tol: float Tolerance for termination. repeat: int Repeat the hyperparameter optimization by the specified number of times and return the best result. theta_jitter: float Standard deviation of the random noise added to the initial logscale hyperparameters across repeated optimization runs. verbose: bool Whether or not to print out the optimization progress and outcome. Returns ------- self: LowRankApproximateGPR returns an instance of self. """ self.C = C self.X = X self.y = y '''hyperparameter optimization''' if self.optimizer: if loss == 'likelihood': objective = self.log_marginal_likelihood elif loss == 'loocv': raise NotImplementedError( '(ง๑ •̀_•́)ง LOOCV training not ready yet.' ) def xgen(n): x0 = self.kernel.theta.copy() yield x0 yield from x0 + theta_jitter * np.random.randn(n - 1, len(x0)) opt = self._hyper_opt( method=self.optimizer, fun=lambda theta, objective=objective: objective( theta, eval_gradient=True, clone_kernel=False, verbose=verbose ), xgen=xgen(repeat), tol=tol, verbose=verbose ) if verbose: print(f'Optimization result:\n{opt}') if opt.success: self.kernel.theta = opt.x else: raise RuntimeError( f'Training using the {loss} loss did not converge, got:\n' f'{opt}' ) '''build and store GPR model''' self.Kcc_rsqrt = self._corespace(C=self._C) self.Kxc = self._gramian(None, self._X, self._C)[self._y_mask, :] self.Fxc = self.Kxc @ self.Kcc_rsqrt self.Kinv = lr.dot(self.Fxc, rcond=self.beta, mode='clamp').pinv() self.Ky = self.Kinv @ self._y return self
[docs] def predict(self, Z, return_std=False, return_cov=False): """Predict using the trained GPR model. Parameters ---------- Z: list of objects or feature vectors. Input values of the unknown data. return_std: boolean If True, the standard-deviations of the predictions at the query points are returned along with the mean. return_cov: boolean If True, the covariance of the predictions at the query points are returned along with the mean. Returns ------- ymean: 1D array Mean of the predictive distribution at query points. std: 1D array Standard deviation of the predictive distribution at query points. cov: 2D matrix Covariance of the predictive distribution at query points. """ if not hasattr(self, 'Kinv'): raise RuntimeError('Model not trained.') Kzc = self._gramian(None, Z, self._C) Fzc = Kzc @ self.Kcc_rsqrt Kzx = lr.dot(Fzc, self.Fxc.T) ymean = Kzx @ self.Ky * self._ystd + self._ymean if return_std is True: Kzz = self._gramian(self.alpha, Z, diag=True) std = np.sqrt( np.maximum(Kzz - (Kzx @ self.Kinv @ Kzx.T).diagonal(), 0) ) return (ymean, std * self._ystd) elif return_cov is True: Kzz = self._gramian(self.alpha, Z) cov = np.maximum(Kzz - (Kzx @ self.Kinv @ Kzx.T).todense(), 0) return (ymean, cov * self._ystd**2) else: return ymean
[docs] def predict_loocv(self, Z, z, return_std=False, method='auto'): """Compute the leave-one-out cross validation prediction of the given data. Parameters ---------- Z: list of objects or feature vectors. Input values of the unknown data. z: 1D array Target values of the training data. return_std: boolean If True, the standard-deviations of the predictions at the query points are returned along with the mean. method: 'auto' or 'ridge-like' or 'gpr-like' Selects the algorithm used for fast evaluation of the leave-one-out cross validation without expliciting training one model per sample. 'ridge-like' seems to be more stable with a smaller core size (that is not rank-deficit), while 'gpr-like' seems to be more stable with a larger core size. By default, the option is 'auto' and the function will choose a method based on an analysis on the eigenspectrum of the dataset. Returns ------- ymean: 1D array Leave-one-out mean of the predictive distribution at query points. std: 1D array Leave-one-out standard deviation of the predictive distribution at query points. """ assert(len(Z) == len(z)) z = np.asarray(z) if self.normalize_y is True: z_mean, z_std = np.mean(z), np.std(z) z = (z - z_mean) / z_std else: z_mean, z_std = 0, 1 if not hasattr(self, 'Kcc_rsqrt'): raise RuntimeError('Model not trained.') Kzc = self._gramian(None, Z, self._C) Cov = Kzc.T @ Kzc Cov.flat[::len(self._C) + 1] += self.alpha Cov_rsqrt, eigvals = powerh( Cov, -0.5, return_symmetric=False, return_eigvals=True ) # if an eigenvalue is smaller than alpha, it would have been negative # in the unregularized Cov matrix if method == 'auto': if eigvals.min() > self.alpha: method = 'ridge-like' else: method = 'gpr-like' if method == 'ridge-like': P = Kzc @ Cov_rsqrt L = lr.dot(P, P.T) zstar = z - (z - L @ z) / (1 - L.diagonal()) if return_std is True: raise NotImplementedError( 'LOOCV std using the ridge-like method is not ready yet.' ) elif method == 'gpr-like': F = Kzc @ self.Kcc_rsqrt Kinv = lr.dot(F, rcond=self.beta, mode='clamp').pinv() zstar = z - (Kinv @ z) / Kinv.diagonal() if return_std is True: std = np.sqrt(1 / np.maximum(Kinv.diagonal(), 1e-14)) else: raise RuntimeError(f'Unknown method {method} for predict_loocv.') if return_std is True: return (zstar * z_std + z_mean, std * z_std) else: return zstar * z_std + z_mean
[docs] def log_marginal_likelihood(self, theta=None, C=None, X=None, y=None, eval_gradient=False, clone_kernel=True, verbose=False): """Returns the log-marginal likelihood of a given set of log-scale hyperparameters. Parameters ---------- theta: array-like Kernel hyperparameters for which the log-marginal likelihood is to be evaluated. If None, the current hyperparameters will be used. C: list of objects or feature vectors. The core set that defines the subspace of low-rank approximation. If None, `self.C` will be used. X: list of objects or feature vectors. Input values of the training data. If None, `self.X` will be used. y: 1D array Output/target values of the training data. If None, `self.y` will be used. eval_gradient: boolean If True, the gradient of the log-marginal likelihood with respect to the kernel hyperparameters at position theta will be returned alongside. clone_kernel: boolean If True, the kernel is copied so that probing with theta does not alter the trained kernel. If False, the kernel hyperparameters will be modified in-place. verbose: boolean If True, the log-likelihood value and its components will be printed to the screen. Returns ------- log_likelihood: float Log-marginal likelihood of theta for training data. log_likelihood_gradient: 1D array Gradient of the log-marginal likelihood with respect to the kernel hyperparameters at position theta. Only returned when eval_gradient is True. """ theta = theta if theta is not None else self.kernel.theta C = C if C is not None else self._C X = X if X is not None else self._X if y is not None: y_mask, y = self.mask(y) else: y = self._y y_mask = self._y_mask if clone_kernel is True: kernel = self.kernel.clone_with_theta(theta) else: kernel = self.kernel kernel.theta = theta t_kernel = time.perf_counter() if eval_gradient is True: Kxc, d_Kxc = self._gramian(None, X, C, kernel=kernel, jac=True) Kcc, d_Kcc = self._gramian(self.alpha, C, kernel=kernel, jac=True) Kxc, d_Kxc = Kxc[y_mask, :], d_Kxc[y_mask, :, :] else: Kxc = self._gramian(None, X, C, kernel=kernel) Kcc = self._gramian(self.alpha, C, kernel=kernel) Kxc = Kxc[y_mask, :] t_kernel = time.perf_counter() - t_kernel t_linalg = time.perf_counter() Kcc_rsqrt = self._corespace(Kcc=Kcc) F = Kxc @ Kcc_rsqrt K = lr.dot(F, rcond=self.beta, mode='clamp') K_inv = K.pinv() logdet = K.logdet() Ky = K_inv @ y yKy = y @ Ky logP = yKy + logdet if eval_gradient is True: D_theta = np.zeros_like(theta) K_inv2 = K_inv**2 for i, t in enumerate(theta): d_F = d_Kxc[:, :, i] @ Kcc_rsqrt d_K = lr.dot(F, d_F.T) + lr.dot(d_F, F.T) - lr.dot( F @ Kcc_rsqrt.T @ d_Kcc[:, :, i], Kcc_rsqrt @ F.T ) d_logdet = (K_inv @ d_K).trace() d_Kinv_part = K_inv2 @ d_K - K_inv2 @ d_K @ (K @ K_inv) d_Kinv = d_Kinv_part + d_Kinv_part.T - K_inv @ d_K @ K_inv d_yKy = d_Kinv.quadratic(y, y) D_theta[i] = (d_logdet + d_yKy) * np.exp(t) retval = (logP, D_theta) else: retval = logP t_linalg = time.perf_counter() - t_linalg if verbose: mprint.table( ('logP', '%12.5g', yKy + logdet), ('dlogP', '%12.5g', np.linalg.norm(D_theta)), ('y^T.K.y', '%12.5g', yKy), ('log|K| ', '%12.5g', logdet), ('Cond(K)', '%12.5g', K.cond()), ('GPU time', '%10.2g', t_kernel), ('CPU time', '%10.2g', t_linalg), ) return retval
# def squared_loocv_error(self, theta=None, X=None, y=None, # eval_gradient=False, clone_kernel=True, # verbose=False): # """Returns the squared LOOCV error of a given set of log-scale # hyperparameters. # Parameters # ---------- # theta: array-like # Kernel hyperparameters for which the log-marginal likelihood is # to be evaluated. If None, the current hyperparameters will be # used. # X: list of objects or feature vectors. # Input values of the training data. If None, the data saved by # fit() will be used. # y: 1D array # Output/target values of the training data. If None, the data # saved by fit() will be used. # eval_gradient: boolean # If True, the gradient of the log-marginal likelihood with respect # to the kernel hyperparameters at position theta will be returned # alongside. # clone_kernel: boolean # If True, the kernel is copied so that probing with theta does not # alter the trained kernel. If False, the kernel hyperparameters # will be modified in-place. # verbose: boolean # If True, the log-likelihood value and its components will be # printed to the screen. # Returns # ------- # squared_error: float # Squared LOOCV error of theta for training data. # squared_error_gradient: 1D array # Gradient of the Squared LOOCV error with respect to the kernel # hyperparameters at position theta. Only returned when # eval_gradient is True. # """ # raise RuntimeError('Not implemented') # theta = theta if theta is not None else self.kernel.theta # X = X if X is not None else self._X # y = y if y is not None else self._y # if clone_kernel is True: # kernel = self.kernel.clone_with_theta(theta) # else: # kernel = self.kernel # kernel.theta = theta # t_kernel = time.perf_counter() # if eval_gradient is True: # K, dK = self._gramian(X, kernel=kernel, jac=True) # else: # K = self._gramian(X, kernel=kernel) # t_kernel = time.perf_counter() - t_kernel # t_linalg = time.perf_counter() # L = CholSolver(K) # Kinv = L(np.eye(len(X))) # Kinv_diag = Kinv.diagonal() # Ky = Kinv @ y # e = Ky / Kinv_diag # squared_error = 0.5 * np.sum(e**2) # t_linalg = time.perf_counter() - t_linalg # if eval_gradient is True: # D_theta = np.zeros_like(theta) # for i, t in enumerate(theta): # dk = dK[:, :, i] # KdK = Kinv @ dk # D_theta[i] = ( # - (e / Kinv_diag) @ (KdK @ Ky) # + (e**2 / Kinv_diag) @ (KdK @ Kinv).diagonal() # ) * np.exp(t) # if verbose: # mprint.table( # ('Sq.Err.', '%12.5g', squared_error), # ('logdet(K)', '%12.5g', np.prod(np.linalg.slogdet(K))), # ('Norm(dK)', '%12.5g', np.linalg.norm(D_theta)), # ('Cond(K)', '%12.5g', np.linalg.cond(K)), # ('t_GPU (s)', '%10.2g', t_kernel), # ('t_CPU (s)', '%10.2g', t_linalg), # ) # return squared_error, D_theta # else: # return squared_error