Source code for prosemble.models.riemannian_stng

"""
Supervised Riemannian Tangent Neural Gas (RiemannianSTNG).

Extends RiemannianSRNG with per-prototype tangent subspace projection.
Each prototype defines an orthonormal basis for an invariant subspace
in its tangent space; distance is measured orthogonal to this subspace.
"""

import jax
import jax.numpy as jnp
import numpy as np

from prosemble.models.riemannian_smng import RiemannianSMNG
from prosemble.models.prototype_base import SupervisedState
from prosemble.core.activations import sigmoid_beta
from prosemble.core.utils import orthogonalize


[docs] class RiemannianSTNG(RiemannianSMNG): """Supervised Riemannian Tangent Neural Gas. Extends RiemannianSRNG with per-prototype tangent subspace projection. Each prototype :math:`w_k` has an orthonormal basis :math:`\\Omega_k` defining an invariant subspace; the distance measures how far the tangent vector lies *outside* this subspace: .. math:: d(x, w_k) = \\|(I - \\Omega_k \\Omega_k^T) \\cdot \\text{Log}_{w_k}(x)_{\\text{flat}}\\|^2 The subspace bases are re-orthogonalized after each gradient step. Parameters ---------- manifold : SO, SPD, or Grassmannian Riemannian manifold instance. subspace_dim : int, optional Dimensionality of each prototype's tangent subspace. Default: n_features // 2. beta : float Transfer function steepness. gamma_init : float, optional Initial neighborhood range. Default: max prototypes per class / 2. gamma_final : float Final neighborhood range. Default: 0.01. gamma_decay : float, optional Per-step decay factor for gamma. tau : float Injectivity radius safety factor. Default: 0.95. n_prototypes_per_class : int Number of prototypes per class. max_iter : int Maximum training iterations. lr : float Learning rate. epsilon : float Convergence threshold on loss change. random_seed : int Random seed for reproducibility. optimizer : str or optax optimizer, optional Default: 'adam'. transfer_fn : callable, optional Transfer function for loss shaping. margin : float Margin for loss computation. callbacks : list, optional List of Callback objects. use_scan : bool If True, use jax.lax.scan. Default: False. batch_size : int, optional Mini-batch size. lr_scheduler : str or optax.Schedule, optional Learning rate schedule. lr_scheduler_kwargs : dict, optional Keyword arguments for the learning rate scheduler. prototypes_initializer : str or callable, optional How to initialize prototypes. patience : int, optional Epochs with no improvement before stopping. restore_best : bool Restore best parameters. Default: False. class_weight : dict or 'balanced', optional Class weights. gradient_accumulation_steps : int, optional Gradient accumulation steps. ema_decay : float, optional EMA decay for parameters. freeze_params : list of str, optional Parameter groups to freeze. lookahead : dict, optional Lookahead optimizer config. mixed_precision : str or None, optional Mixed precision dtype. """ def __init__(self, manifold, subspace_dim=None, beta=10.0, gamma_init=None, gamma_final=0.01, gamma_decay=None, lr_ratio=0.5, tau=0.95, n_prototypes_per_class=1, max_iter=100, lr=0.01, epsilon=1e-6, random_seed=42, optimizer='adam', transfer_fn=None, margin=0.0, callbacks=None, use_scan=False, batch_size=None, lr_scheduler=None, lr_scheduler_kwargs=None, prototypes_initializer=None, patience=None, restore_best=False, class_weight=None, gradient_accumulation_steps=None, ema_decay=None, freeze_params=None, lookahead=None, mixed_precision=None): # Pass latent_dim=None since STNG uses subspace_dim instead super().__init__( manifold=manifold, latent_dim=None, beta=beta, gamma_init=gamma_init, gamma_final=gamma_final, gamma_decay=gamma_decay, lr_ratio=lr_ratio, tau=tau, n_prototypes_per_class=n_prototypes_per_class, max_iter=max_iter, lr=lr, epsilon=epsilon, random_seed=random_seed, optimizer=optimizer, transfer_fn=transfer_fn, margin=margin, callbacks=callbacks, use_scan=use_scan, batch_size=batch_size, lr_scheduler=lr_scheduler, lr_scheduler_kwargs=lr_scheduler_kwargs, prototypes_initializer=prototypes_initializer, patience=patience, restore_best=restore_best, class_weight=class_weight, gradient_accumulation_steps=gradient_accumulation_steps, ema_decay=ema_decay, freeze_params=freeze_params, lookahead=lookahead, mixed_precision=mixed_precision, ) self.subspace_dim = subspace_dim self.omegas_ = None def _get_resume_params(self, params): gamma = params.get('gamma', jnp.array(self.gamma_final)) omegas = params.get('omegas', self.omegas_) if self.omegas_ is not None else params.get('omegas') return { 'prototypes': params['prototypes'], 'omegas': omegas, 'gamma': gamma, } def _init_state(self, X, y, key): # Call RiemannianSRNG._init_state (skip RiemannianSMNG) state, params, proto_labels = RiemannianSMNG.__bases__[0]._init_state(self, X, y, key) d_flat = X.shape[1] n_protos = params['prototypes'].shape[0] subspace_dim = self.subspace_dim if self.subspace_dim is not None else d_flat // 2 key_omega = jax.random.PRNGKey(self.random_seed + 1) omegas = jax.random.normal(key_omega, (n_protos, d_flat, subspace_dim)) omegas = jax.vmap(orthogonalize)(omegas) params = {**params, 'omegas': omegas} opt_state = self._optimizer.init(params) state = SupervisedState( prototypes=params['prototypes'], opt_state=opt_state, loss=jnp.array(float('inf')), iteration=0, converged=False, ) return state, params, proto_labels def _compute_loss(self, params, X, y, proto_labels): prototypes = params['prototypes'] omegas = params['omegas'] # (p, d_flat, s) gamma = params['gamma'] n = X.shape[0] p = prototypes.shape[0] X_m = self._reshape_to_manifold(X, n) W_m = self._reshape_to_manifold(prototypes, p) # 1. Tangent subspace distance tangent_flat = self._compute_tangent_vectors(X_m, W_m) # (n, p, d_flat) proj_onto_subspace = jnp.einsum('npd,pds->nps', tangent_flat, omegas) reconstruction = jnp.einsum('nps,pds->npd', proj_onto_subspace, omegas) residual = tangent_flat - reconstruction distances = jnp.sum(residual ** 2, axis=2) # (n, p) # 2-8: NG+GLVQ same_class = (y[:, None] == proto_labels[None, :]) INF = jnp.finfo(distances.dtype).max d_same = jnp.where(same_class, distances, INF) order = jnp.argsort(d_same, axis=1) ranks = jnp.argsort(order, axis=1).astype(jnp.float32) h = jnp.exp(-ranks / (gamma + 1e-10)) h = jnp.where(same_class, h, 0.0) C = jnp.sum(h, axis=1, keepdims=True) h_normalized = h / (C + 1e-10) d_diff = jnp.where(~same_class, distances, INF) dm = jnp.min(d_diff, axis=1) # Separate learning rates (Hammer et al. 2003: epsilon^- = lr_ratio * epsilon^+) # Scale gradient through dm by lr_ratio; forward pass unchanged. dm = jax.lax.stop_gradient(dm) + self.lr_ratio * ( dm - jax.lax.stop_gradient(dm)) mu = (distances - dm[:, None]) / (distances + dm[:, None] + 1e-10) transfer = self.transfer_fn or sigmoid_beta cost = transfer(mu + self.margin, self.beta) weighted_cost = jnp.sum(h_normalized * cost, axis=1) return jnp.mean(weighted_cost) def _post_update(self, params): # Project prototypes and decay gamma via parent params = RiemannianSMNG.__bases__[0]._post_update(self, params) # Re-orthogonalize tangent subspace bases omegas = jax.vmap(orthogonalize)(params['omegas']) return {**params, 'omegas': omegas} def _extract_results(self, params, proto_labels, loss_history, n_iter, **kwargs): RiemannianSMNG.__bases__[0]._extract_results( self, params, proto_labels, loss_history, n_iter, **kwargs ) self.omegas_ = params['omegas']
[docs] def predict(self, X): """Predict using tangent subspace distance. Parameters ---------- X : array-like of shape (n_samples, n_features_flat) Returns ------- labels : array of shape (n_samples,) """ self._check_fitted() X = jnp.asarray(X, dtype=jnp.float32) n = X.shape[0] p = self.prototypes_.shape[0] X_m = self._reshape_to_manifold(X, n) W_m = self._reshape_to_manifold(self.prototypes_, p) tangent_flat = self._compute_tangent_vectors(X_m, W_m) proj = jnp.einsum('npd,pds->nps', tangent_flat, self.omegas_) recon = jnp.einsum('nps,pds->npd', proj, self.omegas_) residual = tangent_flat - recon distances = jnp.sum(residual ** 2, axis=2) from prosemble.core.competitions import wtac return wtac(distances, self.prototype_labels_)
def _get_quantizable_attrs(self): return {'prototypes_': self.prototypes_, 'omegas_': self.omegas_} def _get_fitted_arrays(self): arrays = RiemannianSMNG.__bases__[0]._get_fitted_arrays(self) if self.omegas_ is not None: arrays['omegas_'] = np.asarray(self.omegas_) return arrays def _set_fitted_arrays(self, arrays): RiemannianSMNG.__bases__[0]._set_fitted_arrays(self, arrays) if 'omegas_' in arrays: self.omegas_ = jnp.asarray(arrays['omegas_']) def _get_hyperparams(self): # Call RiemannianSRNG._get_hyperparams (skip SMNG's latent_dim) hp = RiemannianSMNG.__bases__[0]._get_hyperparams(self) hp['subspace_dim'] = self.subspace_dim hp['lr_ratio'] = self.lr_ratio return hp