Source code for prosemble.models.dk_matrix_lvq

"""
Differentiating Kernel GMLVQ (DKGMLVQ).

GMLVQ with the exponential kernel and adaptive matrix
:math:`\\hat\\Lambda = \\hat\\Omega \\hat\\Omega^T`:

.. math::

    \\kappa_{\\exp}(x, w, \\hat\\Lambda) = \\exp(x^T \\hat\\Lambda w)

.. math::

    d_\\kappa^2(x, w) = \\exp(x^T \\hat\\Lambda x)
                      + \\exp(w^T \\hat\\Lambda w)
                      - 2 \\exp(x^T \\hat\\Lambda w)

References
----------
.. [1] Villmann, T., Haase, S., & Kaden, M. (2015). Kernelized vector
       quantization in gradient-descent learning. Neurocomputing.
"""

import jax
import jax.numpy as jnp
from jax import jit
import numpy as np

from prosemble.models.prototype_base import SupervisedPrototypeModel
from prosemble.core.competitions import wtac
from prosemble.core.initializers import identity_omega_init
from prosemble.core.kernel import exponential_kernel_distance_squared


@jit
def _predict_dkgmlvq_jit(X, prototypes, omega_hat, proto_labels):
    """JIT-compiled DKGMLVQ prediction with exponential kernel distance."""
    distances = exponential_kernel_distance_squared(X, prototypes, omega_hat)
    return wtac(distances, proto_labels)


[docs] class DKGMLVQ(SupervisedPrototypeModel): """Differentiating Kernel GMLVQ with Exponential Kernel. Learns a global transformation matrix :math:`\\hat\\Omega` (d x latent_dim) such that distances are computed via the exponential kernel: .. math:: \\kappa_{\\exp}(x, w) = \\exp(x^T \\hat\\Lambda w), \\quad \\hat\\Lambda = \\hat\\Omega \\hat\\Omega^T .. math:: d_\\kappa^2(x, w) = \\exp(x^T \\hat\\Lambda x) + \\exp(w^T \\hat\\Lambda w) - 2 \\exp(x^T \\hat\\Lambda w) Note: :math:`\\kappa(v, v) \\neq 1` for the exponential kernel, so the full three-term distance formula is used. Parameters ---------- latent_dim : int, optional Dimensionality of the transformation. If None, uses input dim. omega_hat_scale : float Scale factor for omega_hat initialization. Default: 0.1. Smaller values prevent exp overflow at initialization. beta : float Transfer function steepness. n_prototypes_per_class : int Number of prototypes per class. max_iter : int Maximum training iterations. lr : float Learning rate. epsilon : float Convergence threshold on loss change. random_seed : int Random seed for reproducibility. optimizer : str or optax optimizer, optional Optimizer name ('adam', 'sgd') or optax GradientTransformation. transfer_fn : callable, optional Transfer function for loss shaping (default: identity). margin : float Margin for loss computation. callbacks : list, optional List of Callback objects. use_scan : bool If True (default), use jax.lax.scan for training. batch_size : int, optional Mini-batch size. If None, use full-batch training. lr_scheduler : str or optax.Schedule, optional Learning rate schedule. lr_scheduler_kwargs : dict, optional Keyword arguments for the learning rate scheduler. prototypes_initializer : str or callable, optional How to initialize prototypes. patience : int, optional Epochs with no improvement before stopping. restore_best : bool If True, restore best parameters after training. class_weight : dict or 'balanced', optional Weights for each class. gradient_accumulation_steps : int, optional Accumulate gradients over this many steps. ema_decay : float, optional Exponential moving average decay for parameters. freeze_params : list of str, optional Parameter group names to freeze. lookahead : dict, optional Lookahead optimizer wrapper configuration. mixed_precision : str or None, optional Compute dtype for mixed precision training. References ---------- .. [1] Villmann, T., Haase, S., & Kaden, M. (2015). Kernelized vector quantization in gradient-descent learning. Neurocomputing. See Also -------- SupervisedPrototypeModel : Full list of base parameters. """ def __init__(self, latent_dim=None, omega_hat_scale=0.1, beta=10.0, n_prototypes_per_class=1, max_iter=100, lr=0.01, epsilon=1e-6, random_seed=42, distance_fn=None, optimizer='adam', transfer_fn=None, margin=0.0, callbacks=None, use_scan=True, batch_size=None, lr_scheduler=None, lr_scheduler_kwargs=None, prototypes_initializer=None, patience=None, restore_best=False, class_weight=None, gradient_accumulation_steps=None, ema_decay=None, freeze_params=None, lookahead=None, mixed_precision=None, **kwargs): super().__init__( n_prototypes_per_class=n_prototypes_per_class, max_iter=max_iter, lr=lr, epsilon=epsilon, random_seed=random_seed, distance_fn=distance_fn, optimizer=optimizer, transfer_fn=transfer_fn, margin=margin, callbacks=callbacks, use_scan=use_scan, batch_size=batch_size, lr_scheduler=lr_scheduler, lr_scheduler_kwargs=lr_scheduler_kwargs, prototypes_initializer=prototypes_initializer, patience=patience, restore_best=restore_best, class_weight=class_weight, gradient_accumulation_steps=gradient_accumulation_steps, ema_decay=ema_decay, freeze_params=freeze_params, lookahead=lookahead, mixed_precision=mixed_precision, **kwargs, ) self.latent_dim = latent_dim self.omega_hat_scale = omega_hat_scale self.beta = beta self.omega_hat_ = None def _get_resume_params(self, params): params['omega_hat'] = self.omega_hat_ return params def _init_state(self, X, y, key): n_features = X.shape[1] latent_dim = self.latent_dim or n_features key1, key2 = jax.random.split(key) prototypes, proto_labels = self._init_prototypes( X, y, self.n_prototypes_per_class, key1 ) omega_hat = self.omega_hat_scale * identity_omega_init( n_features, latent_dim ) params = {'prototypes': prototypes, 'omega_hat': omega_hat} opt_state = self._optimizer.init(params) from prosemble.models.prototype_base import SupervisedState state = SupervisedState( prototypes=prototypes, opt_state=opt_state, loss=jnp.array(float('inf')), iteration=0, converged=False, ) return state, params, proto_labels def _compute_loss(self, params, X, y, proto_labels): prototypes = params['prototypes'] omega_hat = params['omega_hat'] distances = exponential_kernel_distance_squared( X, prototypes, omega_hat ) from prosemble.core.losses import glvq_loss_with_transfer return glvq_loss_with_transfer( distances, y, proto_labels, transfer_fn=self.transfer_fn, margin=self.margin, beta=self.beta, ) def _extract_results(self, params, proto_labels, loss_history, n_iter, **kwargs): super()._extract_results(params, proto_labels, loss_history, n_iter, **kwargs) self.omega_hat_ = params['omega_hat'] @property def omega_hat_matrix(self): """Return the learned :math:`\\hat\\Omega` matrix.""" if self.omega_hat_ is None: raise ValueError("Model not fitted.") return self.omega_hat_ @property def lambda_hat_matrix(self): """Return :math:`\\hat\\Lambda = \\hat\\Omega \\hat\\Omega^T`.""" if self.omega_hat_ is None: raise ValueError("Model not fitted.") return self.omega_hat_ @ self.omega_hat_.T def _compute_distances_for_rejection(self, X): """Exponential kernel distances for reject option.""" return exponential_kernel_distance_squared(X, self.prototypes_, self.omega_hat_)
[docs] def predict(self, X): """Predict using learned exponential kernel distance.""" self._check_fitted() X = jnp.asarray(X, dtype=jnp.float32) return _predict_dkgmlvq_jit( X, self.prototypes_, self.omega_hat_, self.prototype_labels_ )
def _get_quantizable_attrs(self): attrs = super()._get_quantizable_attrs() if self.omega_hat_ is not None: attrs.append('omega_hat_') return attrs def _get_fitted_arrays(self): arrays = super()._get_fitted_arrays() if self.omega_hat_ is not None: arrays['omega_hat_'] = np.asarray(self.omega_hat_) return arrays def _set_fitted_arrays(self, arrays): super()._set_fitted_arrays(arrays) if 'omega_hat_' in arrays: self.omega_hat_ = jnp.asarray(arrays['omega_hat_']) def _get_hyperparams(self): hp = super()._get_hyperparams() hp['beta'] = self.beta hp['omega_hat_scale'] = self.omega_hat_scale if self.latent_dim is not None: hp['latent_dim'] = self.latent_dim return hp