"""
Matrix Robust Soft LVQ (MRSLVQ) and Localized Matrix RSLVQ (LMRSLVQ).
RSLVQ with learned linear transformation(s) :math:`\\Omega` for metric adaptation.
MRSLVQ uses a single global :math:`\\Omega`; LMRSLVQ uses per-prototype
:math:`\\Omega_k`.
.. math::
d(x, w_k) = (x - w_k)^T \\Omega^T \\Omega (x - w_k)
Loss: :math:`-\\log(P(\\text{correct class}|x) / P(\\text{all}|x))` (RSLVQ objective).
References
----------
.. [1] Schneider, P., Biehl, M., & Hammer, B. (2009). Adaptive
Relevance Matrices in Learning Vector Quantization. Neural
Computation.
.. [2] Seo, S., & Obermayer, K. (2007). Soft Nearest Prototype
Classification. IEEE Trans. Neural Networks.
"""
import jax
import jax.numpy as jnp
import numpy as np
from jax import jit
from functools import partial
from prosemble.models.prototype_base import SupervisedPrototypeModel
from prosemble.core.competitions import wtac
from prosemble.core.initializers import identity_omega_init
from prosemble.core.losses import rslvq_loss
from prosemble.core.pooling import stratified_min_pooling
@jit
def _predict_mrslvq_jit(X, prototypes, omega, proto_labels):
"""JIT-compiled MRSLVQ prediction with learned :math:`\\Omega` metric."""
diff = X[:, None, :] - prototypes[None, :, :]
projected = jnp.einsum('npd,dl->npl', diff, omega)
distances = jnp.sum(projected ** 2, axis=2)
return wtac(distances, proto_labels)
@partial(jit, static_argnums=(4,))
def _predict_proba_mrslvq_jit(X, prototypes, omega, proto_labels, n_classes):
"""JIT-compiled MRSLVQ probability prediction."""
diff = X[:, None, :] - prototypes[None, :, :]
projected = jnp.einsum('npd,dl->npl', diff, omega)
distances = jnp.sum(projected ** 2, axis=2)
class_dists = stratified_min_pooling(distances, proto_labels, n_classes)
return jax.nn.softmax(-class_dists, axis=1)
@jit
def _predict_lmrslvq_jit(X, prototypes, omegas, proto_labels):
"""JIT-compiled LMRSLVQ prediction with per-prototype :math:`\\Omega` metrics."""
diff = X[:, None, :] - prototypes[None, :, :]
projected = jnp.einsum('npd,pdl->npl', diff, omegas)
distances = jnp.sum(projected ** 2, axis=2)
return wtac(distances, proto_labels)
@partial(jit, static_argnums=(4,))
def _predict_proba_lmrslvq_jit(X, prototypes, omegas, proto_labels, n_classes):
"""JIT-compiled LMRSLVQ probability prediction."""
diff = X[:, None, :] - prototypes[None, :, :]
projected = jnp.einsum('npd,pdl->npl', diff, omegas)
distances = jnp.sum(projected ** 2, axis=2)
class_dists = stratified_min_pooling(distances, proto_labels, n_classes)
return jax.nn.softmax(-class_dists, axis=1)
[docs]
class MRSLVQ(SupervisedPrototypeModel):
"""Matrix Robust Soft Learning Vector Quantization.
Combines the RSLVQ probabilistic loss with a learned global linear
mapping :math:`\\Omega` (d x latent_dim) for metric adaptation:
.. math::
d(x, w) = (x - w)^T \\Omega^T \\Omega (x - w)
The relevance matrix :math:`\\Lambda = \\Omega^T \\Omega` captures feature
correlations in the probabilistic framework.
Parameters
----------
sigma : float
Bandwidth of Gaussian mixture.
latent_dim : int, optional
Dimensionality of the latent space. If None, uses input dim.
rejection_confidence : float, optional
Minimum class probability for a confident prediction (0 to 1).
Samples below this threshold are rejected (labeled -1) when using
``predict_with_rejection()``. Default is None (no rejection).
n_prototypes_per_class : int
Number of prototypes per class.
max_iter : int
Maximum training iterations.
lr : float
Learning rate.
epsilon : float
Convergence threshold on loss change.
random_seed : int
Random seed for reproducibility.
distance_fn : callable, optional
Distance function (default: squared Euclidean).
optimizer : str or optax optimizer, optional
Optimizer name ('adam', 'sgd') or optax GradientTransformation.
Default: 'adam'.
transfer_fn : callable, optional
Transfer function for loss shaping (default: identity).
margin : float
Margin for loss computation.
callbacks : list, optional
List of Callback objects.
use_scan : bool
If True (default), use jax.lax.scan for training (faster, JIT-compiled,
but runs all max_iter iterations even after convergence).
If False, use a Python for-loop with true early stopping (no wasted
compute after convergence, but slower per iteration).
batch_size : int, optional
Mini-batch size. If None (default), use full-batch training.
When set, each epoch iterates over shuffled mini-batches of this size.
lr_scheduler : str or optax.Schedule, optional
Learning rate schedule. Supported strings: 'exponential_decay',
'cosine_decay', 'warmup_cosine_decay', 'warmup_exponential_decay',
'warmup_constant', 'polynomial', 'linear', 'piecewise_constant',
'sgdr'. Or pass a custom optax.Schedule. Default: None.
lr_scheduler_kwargs : dict, optional
Keyword arguments passed to the learning rate scheduler
(e.g. ``decay_rate``, ``transition_steps``). Default: None.
prototypes_initializer : str or callable, optional
How to initialize prototypes. Supported strings: 'stratified_random'
(default), 'class_mean', 'class_conditional_mean', 'stratified_noise',
'random_normal', 'uniform', 'zeros', 'ones', 'fill_value'.
Or pass a callable ``(X, y, n_per_class, key) -> (protos, labels)``.
patience : int, optional
Number of consecutive epochs with no improvement before stopping.
If None (default), stops after a single non-improving step (epsilon
check). Requires use_scan=False for true early stopping.
restore_best : bool
If True, restore the parameters that achieved the lowest loss
(or validation loss if validation data is provided). Default: False.
class_weight : dict or 'balanced', optional
Weights for each class. Dict maps class label to weight, e.g.
{0: 1.0, 1: 2.0, 2: 1.5}. 'balanced' auto-computes weights
inversely proportional to class frequencies. Default: None (uniform).
gradient_accumulation_steps : int, optional
Accumulate gradients over this many steps before applying an update.
Effective batch size = batch_size * gradient_accumulation_steps.
Default: None (no accumulation).
ema_decay : float, optional
Exponential moving average decay for parameters (0 < ema_decay < 1).
After training, model parameters are replaced with EMA-smoothed values.
Typical values: 0.999, 0.9999. Default: None (no EMA).
freeze_params : list of str, optional
List of parameter group names to freeze (zero gradients).
E.g. ['backbone'] to freeze the backbone and only train prototypes.
Default: None (all parameters trainable).
lookahead : dict, optional
Enable lookahead optimizer wrapper. Dict with keys:
- 'sync_period': int (default 6) — sync every k steps
- 'slow_step_size': float (default 0.5) — interpolation factor
Default: None (no lookahead).
mixed_precision : str or None, optional
Compute dtype for mixed precision training. 'float16' or 'bfloat16'.
Master weights stay in float32; forward/backward pass runs in lower
precision for ~2x speed and ~half memory on GPU. Float16 uses static
loss scaling to prevent gradient underflow. Default: None (disabled).
"""
def __init__(self, sigma=1.0, latent_dim=None, rejection_confidence=None,
n_prototypes_per_class=1, max_iter=100, lr=0.01,
epsilon=1e-6, random_seed=42, distance_fn=None,
optimizer='adam', transfer_fn=None, margin=0.0,
callbacks=None, use_scan=True, batch_size=None,
lr_scheduler=None, lr_scheduler_kwargs=None,
prototypes_initializer=None, patience=None,
restore_best=False, class_weight=None,
gradient_accumulation_steps=None, ema_decay=None,
freeze_params=None, lookahead=None, mixed_precision=None):
super().__init__(
n_prototypes_per_class=n_prototypes_per_class,
max_iter=max_iter, lr=lr, epsilon=epsilon,
random_seed=random_seed, distance_fn=distance_fn,
optimizer=optimizer, transfer_fn=transfer_fn, margin=margin,
callbacks=callbacks, use_scan=use_scan, batch_size=batch_size,
lr_scheduler=lr_scheduler,
lr_scheduler_kwargs=lr_scheduler_kwargs,
prototypes_initializer=prototypes_initializer,
patience=patience, restore_best=restore_best,
class_weight=class_weight,
gradient_accumulation_steps=gradient_accumulation_steps,
ema_decay=ema_decay, freeze_params=freeze_params,
lookahead=lookahead, mixed_precision=mixed_precision,
)
self.sigma = sigma
self.latent_dim = latent_dim
self.rejection_confidence = rejection_confidence
self.omega_ = None
def _get_resume_params(self, params):
params['omega'] = self.omega_
return params
def _init_state(self, X, y, key):
n_features = X.shape[1]
latent_dim = self.latent_dim or n_features
key1, key2 = jax.random.split(key)
prototypes, proto_labels = self._init_prototypes(
X, y, self.n_prototypes_per_class, key1
)
omega = identity_omega_init(n_features, latent_dim)
params = {'prototypes': prototypes, 'omega': omega}
opt_state = self._optimizer.init(params)
from prosemble.models.prototype_base import SupervisedState
state = SupervisedState(
prototypes=prototypes,
opt_state=opt_state,
loss=jnp.array(float('inf')),
iteration=0,
converged=False,
)
return state, params, proto_labels
def _compute_loss(self, params, X, y, proto_labels):
prototypes = params['prototypes']
omega = params['omega']
diff = X[:, None, :] - prototypes[None, :, :] # (n, p, d)
projected = jnp.einsum('npd,dl->npl', diff, omega) # (n, p, l)
distances = jnp.sum(projected ** 2, axis=2) # (n, p)
return rslvq_loss(distances, y, proto_labels, sigma=self.sigma)
def _extract_results(self, params, proto_labels, loss_history, n_iter,
**kwargs):
super()._extract_results(
params, proto_labels, loss_history, n_iter, **kwargs
)
self.omega_ = params['omega']
@property
def omega_matrix(self):
"""Return the learned :math:`\\Omega` matrix."""
if self.omega_ is None:
raise ValueError("Model not fitted.")
return self.omega_
@property
def lambda_matrix(self):
"""Return :math:`\\Lambda = \\Omega^T \\Omega` (relevance matrix)."""
if self.omega_ is None:
raise ValueError("Model not fitted.")
return self.omega_.T @ self.omega_
[docs]
def predict(self, X):
"""Predict using learned :math:`\\Omega` distance."""
self._check_fitted()
X = jnp.asarray(X, dtype=jnp.float32)
return _predict_mrslvq_jit(
X, self.prototypes_, self.omega_, self.prototype_labels_
)
[docs]
def predict_proba(self, X):
"""Predict class probabilities using :math:`\\Omega`-projected distances."""
self._check_fitted()
X = jnp.asarray(X, dtype=jnp.float32)
return _predict_proba_mrslvq_jit(
X, self.prototypes_, self.omega_, self.prototype_labels_,
self.n_classes_
)
def predict_with_rejection(self, X, confidence=None):
"""Predict with rejection option.
Samples whose maximum class probability is below the confidence
threshold are assigned label -1 (rejected / "I don't know").
Parameters
----------
X : array-like of shape (n_samples, n_features)
confidence : float, optional
Override the model's rejection_confidence for this call.
Returns
-------
labels : array of shape (n_samples,)
Predicted labels, or -1 for rejected samples.
"""
self._check_fitted()
threshold = (confidence if confidence is not None
else self.rejection_confidence)
if threshold is None:
return self.predict(X)
X = jnp.asarray(X, dtype=jnp.float32)
proba = self.predict_proba(X)
max_proba = jnp.max(proba, axis=1)
preds = jnp.argmax(proba, axis=1)
return jnp.where(max_proba >= threshold, preds, -1)
def _get_quantizable_attrs(self):
attrs = super()._get_quantizable_attrs()
if self.omega_ is not None:
attrs.append('omega_')
return attrs
def _get_fitted_arrays(self):
arrays = super()._get_fitted_arrays()
if self.omega_ is not None:
arrays['omega_'] = np.asarray(self.omega_)
return arrays
def _set_fitted_arrays(self, arrays):
super()._set_fitted_arrays(arrays)
if 'omega_' in arrays:
self.omega_ = jnp.asarray(arrays['omega_'])
def _get_hyperparams(self):
hp = super()._get_hyperparams()
hp['sigma'] = self.sigma
hp['rejection_confidence'] = self.rejection_confidence
if self.latent_dim is not None:
hp['latent_dim'] = self.latent_dim
return hp
[docs]
class LMRSLVQ(SupervisedPrototypeModel):
"""Localized Matrix Robust Soft Learning Vector Quantization.
Each prototype :math:`k` has its own :math:`\\Omega_k` matrix. The distance from
sample :math:`x` to prototype :math:`w_k` is:
.. math::
d(x, w_k) = (x - w_k)^T \\Omega_k^T \\Omega_k (x - w_k)
Combined with the RSLVQ probabilistic loss for metric-adaptive
soft classification with local relevance learning.
Parameters
----------
sigma : float
Bandwidth of Gaussian mixture.
latent_dim : int, optional
Latent space dimensionality per prototype. If None, uses input dim.
rejection_confidence : float, optional
Minimum class probability for a confident prediction (0 to 1).
Samples below this threshold are rejected (labeled -1) when using
``predict_with_rejection()``. Default is None (no rejection).
n_prototypes_per_class : int
Number of prototypes per class.
max_iter : int
Maximum training iterations.
lr : float
Learning rate.
epsilon : float
Convergence threshold on loss change.
random_seed : int
Random seed for reproducibility.
distance_fn : callable, optional
Distance function (default: squared Euclidean).
optimizer : str or optax optimizer, optional
Optimizer name ('adam', 'sgd') or optax GradientTransformation.
Default: 'adam'.
transfer_fn : callable, optional
Transfer function for loss shaping (default: identity).
margin : float
Margin for loss computation.
callbacks : list, optional
List of Callback objects.
use_scan : bool
If True (default), use jax.lax.scan for training (faster, JIT-compiled,
but runs all max_iter iterations even after convergence).
If False, use a Python for-loop with true early stopping (no wasted
compute after convergence, but slower per iteration).
batch_size : int, optional
Mini-batch size. If None (default), use full-batch training.
When set, each epoch iterates over shuffled mini-batches of this size.
lr_scheduler : str or optax.Schedule, optional
Learning rate schedule. Supported strings: 'exponential_decay',
'cosine_decay', 'warmup_cosine_decay', 'warmup_exponential_decay',
'warmup_constant', 'polynomial', 'linear', 'piecewise_constant',
'sgdr'. Or pass a custom optax.Schedule. Default: None.
lr_scheduler_kwargs : dict, optional
Keyword arguments passed to the learning rate scheduler
(e.g. ``decay_rate``, ``transition_steps``). Default: None.
prototypes_initializer : str or callable, optional
How to initialize prototypes. Supported strings: 'stratified_random'
(default), 'class_mean', 'class_conditional_mean', 'stratified_noise',
'random_normal', 'uniform', 'zeros', 'ones', 'fill_value'.
Or pass a callable ``(X, y, n_per_class, key) -> (protos, labels)``.
patience : int, optional
Number of consecutive epochs with no improvement before stopping.
If None (default), stops after a single non-improving step (epsilon
check). Requires use_scan=False for true early stopping.
restore_best : bool
If True, restore the parameters that achieved the lowest loss
(or validation loss if validation data is provided). Default: False.
class_weight : dict or 'balanced', optional
Weights for each class. Dict maps class label to weight, e.g.
{0: 1.0, 1: 2.0, 2: 1.5}. 'balanced' auto-computes weights
inversely proportional to class frequencies. Default: None (uniform).
gradient_accumulation_steps : int, optional
Accumulate gradients over this many steps before applying an update.
Effective batch size = batch_size * gradient_accumulation_steps.
Default: None (no accumulation).
ema_decay : float, optional
Exponential moving average decay for parameters (0 < ema_decay < 1).
After training, model parameters are replaced with EMA-smoothed values.
Typical values: 0.999, 0.9999. Default: None (no EMA).
freeze_params : list of str, optional
List of parameter group names to freeze (zero gradients).
E.g. ['backbone'] to freeze the backbone and only train prototypes.
Default: None (all parameters trainable).
lookahead : dict, optional
Enable lookahead optimizer wrapper. Dict with keys:
- 'sync_period': int (default 6) — sync every k steps
- 'slow_step_size': float (default 0.5) — interpolation factor
Default: None (no lookahead).
mixed_precision : str or None, optional
Compute dtype for mixed precision training. 'float16' or 'bfloat16'.
Master weights stay in float32; forward/backward pass runs in lower
precision for ~2x speed and ~half memory on GPU. Float16 uses static
loss scaling to prevent gradient underflow. Default: None (disabled).
"""
def __init__(self, sigma=1.0, latent_dim=None, rejection_confidence=None,
n_prototypes_per_class=1, max_iter=100, lr=0.01,
epsilon=1e-6, random_seed=42, distance_fn=None,
optimizer='adam', transfer_fn=None, margin=0.0,
callbacks=None, use_scan=True, batch_size=None,
lr_scheduler=None, lr_scheduler_kwargs=None,
prototypes_initializer=None, patience=None,
restore_best=False, class_weight=None,
gradient_accumulation_steps=None, ema_decay=None,
freeze_params=None, lookahead=None, mixed_precision=None):
super().__init__(
n_prototypes_per_class=n_prototypes_per_class,
max_iter=max_iter, lr=lr, epsilon=epsilon,
random_seed=random_seed, distance_fn=distance_fn,
optimizer=optimizer, transfer_fn=transfer_fn, margin=margin,
callbacks=callbacks, use_scan=use_scan, batch_size=batch_size,
lr_scheduler=lr_scheduler,
lr_scheduler_kwargs=lr_scheduler_kwargs,
prototypes_initializer=prototypes_initializer,
patience=patience, restore_best=restore_best,
class_weight=class_weight,
gradient_accumulation_steps=gradient_accumulation_steps,
ema_decay=ema_decay, freeze_params=freeze_params,
lookahead=lookahead, mixed_precision=mixed_precision,
)
self.sigma = sigma
self.latent_dim = latent_dim
self.rejection_confidence = rejection_confidence
self.omegas_ = None
def _get_resume_params(self, params):
params['omegas'] = self.omegas_
return params
def _init_state(self, X, y, key):
n_features = X.shape[1]
latent_dim = self.latent_dim or n_features
key1, key2 = jax.random.split(key)
prototypes, proto_labels = self._init_prototypes(
X, y, self.n_prototypes_per_class, key1
)
n_protos = prototypes.shape[0]
omega_single = identity_omega_init(n_features, latent_dim)
omegas = jnp.tile(omega_single[None, :, :], (n_protos, 1, 1))
params = {'prototypes': prototypes, 'omegas': omegas}
opt_state = self._optimizer.init(params)
from prosemble.models.prototype_base import SupervisedState
state = SupervisedState(
prototypes=prototypes,
opt_state=opt_state,
loss=jnp.array(float('inf')),
iteration=0,
converged=False,
)
return state, params, proto_labels
def _compute_loss(self, params, X, y, proto_labels):
prototypes = params['prototypes']
omegas = params['omegas'] # (p, d, l)
diff = X[:, None, :] - prototypes[None, :, :] # (n, p, d)
projected = jnp.einsum('npd,pdl->npl', diff, omegas) # (n, p, l)
distances = jnp.sum(projected ** 2, axis=2) # (n, p)
return rslvq_loss(distances, y, proto_labels, sigma=self.sigma)
def _extract_results(self, params, proto_labels, loss_history, n_iter,
**kwargs):
super()._extract_results(
params, proto_labels, loss_history, n_iter, **kwargs
)
self.omegas_ = params['omegas']
[docs]
def predict(self, X):
"""Predict using local :math:`\\Omega` distances."""
self._check_fitted()
X = jnp.asarray(X, dtype=jnp.float32)
return _predict_lmrslvq_jit(
X, self.prototypes_, self.omegas_, self.prototype_labels_
)
[docs]
def predict_proba(self, X):
"""Predict class probabilities using local :math:`\\Omega`-projected distances."""
self._check_fitted()
X = jnp.asarray(X, dtype=jnp.float32)
return _predict_proba_lmrslvq_jit(
X, self.prototypes_, self.omegas_, self.prototype_labels_,
self.n_classes_
)
def predict_with_rejection(self, X, confidence=None):
"""Predict with rejection option.
Samples whose maximum class probability is below the confidence
threshold are assigned label -1 (rejected / "I don't know").
Parameters
----------
X : array-like of shape (n_samples, n_features)
confidence : float, optional
Override the model's rejection_confidence for this call.
Returns
-------
labels : array of shape (n_samples,)
Predicted labels, or -1 for rejected samples.
"""
self._check_fitted()
threshold = (confidence if confidence is not None
else self.rejection_confidence)
if threshold is None:
return self.predict(X)
X = jnp.asarray(X, dtype=jnp.float32)
proba = self.predict_proba(X)
max_proba = jnp.max(proba, axis=1)
preds = jnp.argmax(proba, axis=1)
return jnp.where(max_proba >= threshold, preds, -1)
def _get_quantizable_attrs(self):
attrs = super()._get_quantizable_attrs()
if self.omegas_ is not None:
attrs.append('omegas_')
return attrs
def _get_fitted_arrays(self):
arrays = super()._get_fitted_arrays()
if self.omegas_ is not None:
arrays['omegas_'] = np.asarray(self.omegas_)
return arrays
def _set_fitted_arrays(self, arrays):
super()._set_fitted_arrays(arrays)
if 'omegas_' in arrays:
self.omegas_ = jnp.asarray(arrays['omegas_'])
def _get_hyperparams(self):
hp = super()._get_hyperparams()
hp['sigma'] = self.sigma
hp['rejection_confidence'] = self.rejection_confidence
if self.latent_dim is not None:
hp['latent_dim'] = self.latent_dim
return hp