Source code for moosefs.merging_strategies.consensus_merger

from collections import Counter, defaultdict
from itertools import chain
from typing import Optional

import numpy as np

from ..core.feature import Feature
from .base_merger import MergingStrategy


[docs] class ConsensusMerger(MergingStrategy): """Set-based consensus merger with optional fill. Keeps features selected by at least ``k`` selectors. If ``fill=True``, trims/pads to ``num_features_to_select`` using summed, per-selector min–max–normalized scores as a tie-breaker. """
[docs] def __init__(self, k: int = 2, *, fill: bool = False) -> None: super().__init__("set-based") self.k = k self.fill = fill self.name = f"Consensus_ge_{k}"
# -----------------------------------------------------------------
[docs] def merge( self, subsets: list, num_features_to_select: Optional[int] = None, **kwargs, ) -> set: """Merge by consensus threshold. Args: subsets: Feature lists (one list per selector). num_features_to_select: Required when ``fill=True``. **kwargs: Unused. Returns: Set of selected feature names. """ self._validate_input(subsets) if self.fill and num_features_to_select is None: raise ValueError("`num_features_to_select` required when fill=True") # ── collect names & scores (ragged‑safe) ───────────────────── names_mat = [[f.name for f in s] for s in subsets] # Consensus counts across all selectors counts = Counter(chain.from_iterable(names_mat)) # Summed, per-selector min‑max–normalised scores per feature name sum_scores = defaultdict(float) for subset in subsets: if not subset: continue scores = np.array([f.score for f in subset], dtype=np.float32) min_v = float(scores.min()) rng = float(scores.max() - min_v) or 1.0 norm = (scores - min_v) / rng for name, s in zip([f.name for f in subset], norm): sum_scores[name] += float(s) selected = {f for f, c in counts.items() if c >= self.k} if not self.fill: return selected # ── trim / pad to desired size ─────────────────────────────── core_sorted = sorted(selected, key=lambda n: sum_scores[n], reverse=True) if len(core_sorted) >= num_features_to_select: return set(core_sorted[:num_features_to_select]) extras = sorted( (n for n in counts if n not in selected), key=lambda n: sum_scores[n], reverse=True, ) need = num_features_to_select - len(core_sorted) return set(core_sorted + extras[:need])