Source code for prosemble.models.cbc

"""
Classification-By-Components (CBC).

A reasoning-based classification model where components (prototypes
without class labels) learn both detection (similarity) and reasoning
(positive/negative evidence per class).

References
----------
.. [1] Saralajew, S., Holdijk, L., & Villmann, T. (2020).
       Classification-by-Components: Probabilistic Modeling of
       Reasoning over a Set of Components. NeurIPS.
"""

import jax
import jax.numpy as jnp
import numpy as np
from jax import jit

from prosemble.models.prototype_base import SupervisedPrototypeModel, NotFittedError
from prosemble.core.competitions import cbcc
from prosemble.core.losses import margin_loss
from prosemble.core.similarities import gaussian_similarity


@jit
def _predict_proba_cbc_jit(X, components, reasonings, sigma_sq):
    """JIT-compiled CBC probability prediction."""
    diff = X[:, None, :] - components[None, :, :]
    dist_sq = jnp.sum(diff ** 2, axis=2)
    detections = gaussian_similarity(dist_sq, variance=sigma_sq)
    return cbcc(detections, reasonings)


@jit
def _predict_cbc_jit(X, components, reasonings, sigma_sq):
    """JIT-compiled CBC class prediction."""
    probs = _predict_proba_cbc_jit(X, components, reasonings, sigma_sq)
    return jnp.argmax(probs, axis=1)


[docs] class CBC(SupervisedPrototypeModel): """Classification-By-Components. Components detect patterns in the input (via similarity), then reasoning matrices determine how each detection contributes evidence for/against each class. Parameters ---------- n_components : int Number of components (analogous to prototypes, but classless). n_classes : int Number of output classes. sigma : float Bandwidth for Gaussian similarity in component detection. margin : float Margin for the margin loss. components_initializer : callable, optional Initializer for component vectors. Signature: ``(X, key, n_components) -> components``. Default: None (selects random training samples). reasonings_initializer : callable, optional Initializer for the reasoning matrix. Signature: ``(n_components, n_classes, key) -> reasonings``. Default: None (initializes near-uniform with small noise). similarity_fn : callable, optional Similarity function for component detection. Default: None (uses Gaussian similarity). n_prototypes_per_class : int Number of prototypes per class. max_iter : int Maximum training iterations. lr : float Learning rate. epsilon : float Convergence threshold on loss change. random_seed : int Random seed for reproducibility. distance_fn : callable, optional Distance function (default: squared Euclidean). optimizer : str or optax optimizer, optional Optimizer name ('adam', 'sgd') or optax GradientTransformation. Default: 'adam'. transfer_fn : callable, optional Transfer function for loss shaping (default: identity). callbacks : list, optional List of Callback objects. use_scan : bool If True (default), use jax.lax.scan for training (faster, JIT-compiled, but runs all max_iter iterations even after convergence). If False, use a Python for-loop with true early stopping (no wasted compute after convergence, but slower per iteration). batch_size : int, optional Mini-batch size. If None (default), use full-batch training. When set, each epoch iterates over shuffled mini-batches of this size. lr_scheduler : str or optax.Schedule, optional Learning rate schedule. Supported strings: 'exponential_decay', 'cosine_decay', 'warmup_cosine_decay', 'warmup_exponential_decay', 'warmup_constant', 'polynomial', 'linear', 'piecewise_constant', 'sgdr'. Or pass a custom optax.Schedule. Default: None. lr_scheduler_kwargs : dict, optional Keyword arguments passed to the learning rate scheduler (e.g. ``decay_rate``, ``transition_steps``). Default: None. prototypes_initializer : str or callable, optional How to initialize prototypes. Supported strings: 'stratified_random' (default), 'class_mean', 'class_conditional_mean', 'stratified_noise', 'random_normal', 'uniform', 'zeros', 'ones', 'fill_value'. Or pass a callable ``(X, y, n_per_class, key) -> (protos, labels)``. patience : int, optional Number of consecutive epochs with no improvement before stopping. If None (default), stops after a single non-improving step (epsilon check). Requires use_scan=False for true early stopping. restore_best : bool If True, restore the parameters that achieved the lowest loss (or validation loss if validation data is provided). Default: False. class_weight : dict or 'balanced', optional Weights for each class. Dict maps class label to weight, e.g. {0: 1.0, 1: 2.0, 2: 1.5}. 'balanced' auto-computes weights inversely proportional to class frequencies. Default: None (uniform). gradient_accumulation_steps : int, optional Accumulate gradients over this many steps before applying an update. Effective batch size = batch_size * gradient_accumulation_steps. Default: None (no accumulation). ema_decay : float, optional Exponential moving average decay for parameters (0 < ema_decay < 1). After training, model parameters are replaced with EMA-smoothed values. Typical values: 0.999, 0.9999. Default: None (no EMA). freeze_params : list of str, optional List of parameter group names to freeze (zero gradients). E.g. ['components'] to freeze the components and only train reasonings. Default: None (all parameters trainable). lookahead : dict, optional Enable lookahead optimizer wrapper. Dict with keys: - 'sync_period': int (default 6) — sync every k steps - 'slow_step_size': float (default 0.5) — interpolation factor Default: None (no lookahead). mixed_precision : str or None, optional Compute dtype for mixed precision training. 'float16' or 'bfloat16'. Master weights stay in float32; forward/backward pass runs in lower precision for ~2x speed and ~half memory on GPU. Float16 uses static loss scaling to prevent gradient underflow. Default: None (disabled). """ def __init__(self, n_components=5, n_classes=2, sigma=1.0, margin=0.3, components_initializer=None, reasonings_initializer=None, similarity_fn=None, n_prototypes_per_class=1, max_iter=100, lr=0.01, epsilon=1e-6, random_seed=42, distance_fn=None, optimizer='adam', transfer_fn=None, callbacks=None, use_scan=True, batch_size=None, lr_scheduler=None, lr_scheduler_kwargs=None, prototypes_initializer=None, patience=None, restore_best=False, class_weight=None, gradient_accumulation_steps=None, ema_decay=None, freeze_params=None, lookahead=None, mixed_precision=None): super().__init__( n_prototypes_per_class=n_prototypes_per_class, max_iter=max_iter, lr=lr, epsilon=epsilon, random_seed=random_seed, distance_fn=distance_fn, optimizer=optimizer, transfer_fn=transfer_fn, margin=margin, callbacks=callbacks, use_scan=use_scan, batch_size=batch_size, lr_scheduler=lr_scheduler, lr_scheduler_kwargs=lr_scheduler_kwargs, prototypes_initializer=prototypes_initializer, patience=patience, restore_best=restore_best, class_weight=class_weight, gradient_accumulation_steps=gradient_accumulation_steps, ema_decay=ema_decay, freeze_params=freeze_params, lookahead=lookahead, mixed_precision=mixed_precision, ) self.n_components = n_components self._n_classes = n_classes self.sigma = sigma self.components_initializer = components_initializer self.reasonings_initializer = reasonings_initializer if similarity_fn is not None: self._similarity_fn = similarity_fn else: self._similarity_fn = gaussian_similarity self.components_ = None self.reasonings_ = None def _get_resume_params(self, params): # CBC uses 'components' key instead of 'prototypes' return {'components': self.components_, 'reasonings': self.reasonings_} def _init_state(self, X, y, key): n_classes = self._n_classes key1, key2 = jax.random.split(key) # Initialize components if self.components_initializer is not None: components = self.components_initializer(X, key1, self.n_components) else: indices = jax.random.choice(key1, X.shape[0], (self.n_components,), replace=False) components = X[indices] # Initialize reasoning matrices: (n_components, n_classes, 2) if self.reasonings_initializer is not None: reasonings = self.reasonings_initializer( self.n_components, n_classes, key2 ) else: # Default: near uniform p=0.5, n=0.5 with small noise reasonings = jnp.ones((self.n_components, n_classes, 2)) * 0.5 reasonings = reasonings + 0.01 * jax.random.normal(key2, reasonings.shape) params = {'components': components, 'reasonings': reasonings} opt_state = self._optimizer.init(params) # proto_labels not meaningful for CBC, but needed by base class proto_labels = jnp.zeros(self.n_components, dtype=jnp.int32) from prosemble.models.prototype_base import SupervisedState state = SupervisedState( prototypes=components, opt_state=opt_state, loss=jnp.array(float('inf')), iteration=0, converged=False, ) return state, params, proto_labels def _compute_loss(self, params, X, y, proto_labels): components = params['components'] reasonings = params['reasonings'] # Compute similarities (detections) diff = X[:, None, :] - components[None, :, :] # (n, c, d) dist_sq = jnp.sum(diff ** 2, axis=2) # (n, c) detections = gaussian_similarity(dist_sq, variance=self.sigma ** 2) # Compute class probabilities via CBC reasoning class_probs = cbcc(detections, reasonings) # (n, n_classes) # Margin loss y_one_hot = jax.nn.one_hot(y, self._n_classes) return margin_loss(class_probs, y_one_hot, margin=self.margin) def _extract_results(self, params, proto_labels, loss_history, n_iter, **kwargs): best_params = kwargs.get('best_params') if getattr(self, 'restore_best', False) and best_params is not None: params = best_params self.prototypes_ = params['components'] self.components_ = params['components'] self.reasonings_ = params['reasonings'] self.prototype_labels_ = proto_labels self.loss_ = float(loss_history[-1]) if len(loss_history) > 0 else None self.loss_history_ = jnp.array(loss_history) self.n_iter_ = n_iter val_loss_history = kwargs.get('val_loss_history') if val_loss_history is not None: self.val_loss_history_ = jnp.array(val_loss_history) best_loss = kwargs.get('best_loss') if best_loss is not None: self.best_loss_ = best_loss
[docs] def predict(self, X): """Predict class labels.""" self._check_fitted() X = jnp.asarray(X, dtype=jnp.float32) return _predict_cbc_jit( X, self.components_, self.reasonings_, self.sigma ** 2 )
[docs] def predict_proba(self, X): """Predict class probabilities via CBC reasoning.""" self._check_fitted() X = jnp.asarray(X, dtype=jnp.float32) return _predict_proba_cbc_jit( X, self.components_, self.reasonings_, self.sigma ** 2 )
def _get_quantizable_attrs(self): attrs = super()._get_quantizable_attrs() if self.reasonings_ is not None: attrs.append('reasonings_') if self.components_ is not None: attrs.append('components_') return attrs def _get_fitted_arrays(self): arrays = super()._get_fitted_arrays() if self.reasonings_ is not None: arrays['reasonings_'] = np.asarray(self.reasonings_) return arrays def _set_fitted_arrays(self, arrays): super()._set_fitted_arrays(arrays) if 'reasonings_' in arrays: self.reasonings_ = jnp.asarray(arrays['reasonings_']) if self.prototypes_ is not None: self.components_ = self.prototypes_ def _get_hyperparams(self): hp = super()._get_hyperparams() hp['n_components'] = self.n_components hp['n_classes'] = self._n_classes hp['sigma'] = self.sigma return hp