Source code for moosefs.feature_selection_pipeline

import random
from itertools import combinations
from typing import Any, Optional
import os

import numpy as np
import pandas as pd
from joblib import Parallel, delayed, parallel_backend
from sklearn.model_selection import train_test_split
# tqdm is not used; keep imports minimal

from .core import Feature, ParetoAnalysis
from .metrics.stability_metrics import compute_stability_metrics, diversity_agreement
from .utils import extract_params, get_class_info

# for test purpose
agreement_flag = False
        
[docs] class FeatureSelectionPipeline: """End-to-end pipeline for ensemble feature selection. Orchestrates feature scoring, merging, metric evaluation, and Pareto-based selection across repeated runs and method subgroups. """
[docs] def __init__( self, data: pd.DataFrame, fs_methods: list, merging_strategy: Any, num_repeats: int, num_features_to_select: Optional[int], metrics: list = ["logloss", "f1_score", "accuracy"], task: str = "classification", min_group_size: int = 2, fill: bool = False, random_state: Optional[int] = None, n_jobs: Optional[int] = None, ) -> None: """Initialize the pipeline. Args: data: DataFrame including a 'target' column. fs_methods: Feature selectors (identifiers or instances). merging_strategy: Merging strategy (identifier or instance). num_repeats: Number of repeats for the pipeline. num_features_to_select: Desired number of features to select. metrics: Metric functions (identifiers or instances). task: 'classification' or 'regression'. min_group_size: Minimum number of methods in each subgroup. fill: If True, enforce exact size after merging. random_state: Seed for reproducibility. n_jobs: Parallel jobs (use num_repeats when -1 or None). Raises: ValueError: If task is invalid or required parameters are missing. """ # parameters validation self._validate_task(task) self.data = data self.task = task self.num_repeats = num_repeats self.num_features_to_select = num_features_to_select self.random_state = ( random_state if random_state is not None else random.randint(0, 1000) ) self.n_jobs = n_jobs self.min_group_size = min_group_size self.fill = fill # set seed for reproducibility self._set_seed(self.random_state) # Keep original specs and also instantiate now for introspection self._fs_method_specs = list(fs_methods) self._metric_specs = list(metrics) self._merging_spec = merging_strategy # dynamically load classes or instantiate them (initial instances) self.fs_methods = [self._load_class(m, instantiate=True) for m in self._fs_method_specs] self.metrics = [self._load_class(m, instantiate=True) for m in self._metric_specs] self.merging_strategy = self._load_class(self._merging_spec, instantiate=True) # validate and preparation if self.num_features_to_select is None: raise ValueError("num_features_to_select must be provided") # subgroup names are generated in run() after instantiation self.subgroup_names: list = []
[docs] @staticmethod def _validate_task(task: str) -> None: """Validate task string. Args: task: Expected 'classification' or 'regression'. """ if task not in ["classification", "regression"]: raise ValueError("Task must be either 'classification' or 'regression'.")
[docs] @staticmethod def _set_seed(seed: int, idx: Optional[int] = None) -> None: """Seed numpy/python RNGs for reproducibility.""" np.random.seed(seed) random.seed(seed) os.environ["PYTHONHASHSEED"] = str(seed)
[docs] def _per_repeat_seed(self, idx: int) -> int: """Derive a per-repeat seed from the top-level seed.""" return int(self.random_state) + int(idx)
[docs] def _effective_n_jobs(self) -> int: """Return parallel job count capped by number of repeats.""" n = self.n_jobs if self.n_jobs is not None and self.n_jobs != -1 else self.num_repeats return min(int(n), int(self.num_repeats))
[docs] def _generate_subgroup_names(self, min_group_size: int) -> list: """Generate all selector-name combinations with minimum size. Args: min_group_size: Minimum subgroup size. Returns: List of tuples of selector names. """ fs_method_names = [fs_method.name for fs_method in self.fs_methods] if min_group_size > len(fs_method_names): raise ValueError( f"Minimum group size of {min_group_size} exceeds available methods ({len(fs_method_names)})." ) return [ combo for r in range(min_group_size, len(fs_method_names) + 1) for combo in combinations(fs_method_names, r) ]
# Public method to run the feature selection pipeline
[docs] def run(self, verbose: bool = True) -> tuple: """Execute the pipeline and return best merged features. Returns: (merged_features, best_repeat_idx, best_group_names). """ self._set_seed(self.random_state) # Fresh objects for each run to avoid hidden state self.fs_methods = [ self._load_class(m, instantiate=True) for m in self._fs_method_specs ] self.metrics = [ self._load_class(m, instantiate=True) for m in self._metric_specs ] self.merging_strategy = self._load_class(self._merging_spec, instantiate=True) # Regenerate subgroup names from fresh fs_methods self.subgroup_names = self._generate_subgroup_names(self.min_group_size) # Reset internal state so that run() always starts fresh self.fs_subsets: dict = {} self.merged_features: dict = {} num_metrics = self._num_metrics_total() result_dicts: list = [ {} for _ in range(num_metrics) ] # Ensure we don't allocate more jobs than repeats n_jobs = self._effective_n_jobs() with parallel_backend( "loky", inner_max_num_threads=1 ): # Prevents oversubscription parallel_results = Parallel(n_jobs=n_jobs)( delayed(self._pipeline_run_for_repeat)(i, verbose) for i in range(self.num_repeats) ) # Sort results by repeat index parallel_results.sort(key=lambda x: x[0]) # Now, x[0] is the repeat index # Merge results in a fixed order self.fs_subsets = {} self.merged_features = {} for ( _, partial_fs_subsets, partial_merged_features, partial_result_dicts, ) in parallel_results: self.fs_subsets.update(partial_fs_subsets) self.merged_features.update(partial_merged_features) for dict_idx in range(num_metrics): for key in sorted(partial_result_dicts[dict_idx].keys()): result_dicts[dict_idx][key] = partial_result_dicts[dict_idx][key] # Compute Pareto analysis as usual means_list = self._calculate_means(result_dicts, self.subgroup_names) means_list = self._replace_none(means_list) # pairs = sorted(zip(self.subgroup_names, means_list), key=lambda p: tuple(p[0])) #self.subgroup_names, means_list = map(list, zip(*pairs)) best_group = self._compute_pareto(means_list, self.subgroup_names) best_group_metrics = self._extract_repeat_metrics(best_group, *result_dicts) best_group_metrics = self._replace_none(best_group_metrics) best_repeat = self._compute_pareto( best_group_metrics, [str(i) for i in range(self.num_repeats)] ) return (self.merged_features[(int(best_repeat), best_group)], int(best_repeat), best_group)
[docs] def _pipeline_run_for_repeat(self, i: int, verbose: bool) -> Any: """Execute one repeat and return partial results tuple.""" self._set_seed(self._per_repeat_seed(i)) train_data, test_data = self._split_data(test_size=0.20, random_state=self._per_repeat_seed(i)) fs_subsets_local = self._compute_subset(train_data, i) merged_features_local = self._compute_merging(fs_subsets_local, i, verbose) local_result_dicts = self._compute_metrics( fs_subsets_local, merged_features_local, train_data, test_data, i ) # Return repeat index as the first element return i, fs_subsets_local, merged_features_local, local_result_dicts
[docs] def _replace_none(self, metrics: list) -> list: """Replace any group with None with a list of -inf. Args: metrics: Per-group metric lists. Returns: Same shape with None replaced by -inf rows. """ return [ ( group_metrics if all(metric is not None for metric in group_metrics) else [-float("inf")] * len(group_metrics) ) for group_metrics in metrics ]
[docs] def _split_data(self, test_size: float, random_state: int) -> tuple: """Split data into train/test using stratification when classification.""" stratify = self.data["target"] if self.task == "classification" else None return train_test_split( self.data, test_size=test_size, random_state=random_state, stratify=stratify )
[docs] def _compute_subset(self, train_data: pd.DataFrame, idx: int) -> dict: """Compute selected Feature objects per method for this repeat.""" self._set_seed(self._per_repeat_seed(idx)) X_train = train_data.drop("target", axis=1) y_train = train_data["target"] feature_names = X_train.columns.tolist() fs_subsets_local = {} for fs_method in self.fs_methods: method_name = fs_method.name scores, indices = fs_method.select_features(X_train, y_train) fs_subsets_local[(idx, method_name)] = [ Feature( name, score=scores[i] if scores is not None else None, selected=(i in indices), ) for i, name in enumerate(feature_names) ] return fs_subsets_local
[docs] def _compute_merging( self, fs_subsets_local: dict, idx: int, verbose: bool = True, ) -> dict: """Merge per-group features and return mapping for this repeat.""" self._set_seed(self._per_repeat_seed(idx)) merged_features_local = {} for group in self.subgroup_names: merged = self._merge_group_features(fs_subsets_local, idx, group) if merged: merged_features_local[(idx, group)] = merged elif verbose: print(f"Warning: {group} produced no merged features.") return merged_features_local
[docs] def _merge_group_features( self, fs_subsets_local: dict, idx: int, group: tuple, ) -> list: """Merge features for a specific group of methods. Args: idx: Repeat index. group: Tuple of selector names. Returns: Merged features (type depends on strategy). """ group_features = [ [f for f in fs_subsets_local[(idx, method)] if f.selected] for method in group ] # Determine set-based vs rank-based via method call when available is_set_based_attr = getattr(self.merging_strategy, "is_set_based", None) if callable(is_set_based_attr): is_set_based = bool(is_set_based_attr()) elif isinstance(is_set_based_attr, bool): is_set_based = is_set_based_attr else: is_set_based = True # default behavior as before if is_set_based: return self.merging_strategy.merge( group_features, self.num_features_to_select, fill=self.fill ) else: return self.merging_strategy.merge( group_features, self.num_features_to_select )
[docs] def _compute_performance_metrics( self, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, ) -> list: """Compute performance metrics using configured metric methods. Returns: Averaged metric values per configured metric. """ self._set_seed(self.random_state) return [ metric.compute(X_train, y_train, X_test, y_test) for metric in self.metrics ]
[docs] def _compute_metrics( self, fs_subsets_local: dict, merged_features_local: dict, train_data: pd.DataFrame, test_data: pd.DataFrame, idx: int, ) -> list: """Compute and collect performance and stability metrics for subgroups. Args: fs_subsets_local: Local selected Feature lists per (repeat, method). merged_features_local: Merged features per (repeat, group). train_data: Training dataframe. test_data: Test dataframe. idx: Repeat index. Returns: List of per-metric dicts keyed by (repeat, group). """ self._set_seed(self._per_repeat_seed(idx)) num_metrics = self._num_metrics_total() local_result_dicts = [{} for _ in range(num_metrics)] for group in self.subgroup_names: key = (idx, group) if key not in merged_features_local: continue # order selected features to ensure consistency (decision tree) selected_feats = [c for c in train_data.columns if c in merged_features_local[key]] # selected_feats = list(merged_features_local[key]) X_train_subset = train_data[selected_feats] y_train = train_data["target"] X_test_subset = test_data[selected_feats] y_test = test_data["target"] metric_vals = self._compute_performance_metrics( X_train_subset, y_train, X_test_subset, y_test ) for m_idx, val in enumerate(metric_vals): local_result_dicts[m_idx][key] = val fs_lists = [ [f.name for f in fs_subsets_local[(idx, method)] if f.selected] for method in group ] stability = compute_stability_metrics(fs_lists) if fs_lists else 0 if agreement_flag: agreement = ( diversity_agreement(fs_lists, selected_feats, alpha=0.5) if fs_lists else 0 ) local_result_dicts[len(metric_vals)][key] = agreement local_result_dicts[len(metric_vals) + 1][key] = stability else: local_result_dicts[len(metric_vals)][key] = stability return local_result_dicts
[docs] @staticmethod def _calculate_means( result_dicts: list, group_names: list, ) -> list: """Calculate mean metrics per subgroup across repeats. Args: result_dicts: Per-metric dicts keyed by (repeat, group). group_names: Subgroup names to summarize. Returns: List of [means per metric] for each subgroup. """ means_list = [] for group in group_names: group_means = [] for d in result_dicts: vals = [value for (idx, name), value in d.items() if name == group] m = np.mean(vals) if len(vals) else np.nan group_means.append(None if np.isnan(m) else float(m)) means_list.append(group_means) return means_list
[docs] @staticmethod def _compute_pareto(groups: list, names: list) -> Any: """Return the name of the winner using Pareto analysis.""" pareto = ParetoAnalysis(groups, names) pareto_results = pareto.get_results() return pareto_results[0][0]
[docs] def _extract_repeat_metrics( self, group: Any, *result_dicts: dict, ) -> list: """Return a row per repeat for the given group. Missing values remain as None and are later replaced by -inf. """ result_array: list = [] for idx in range(self.num_repeats): # <- full range row = [d.get((idx, group)) for d in result_dicts] result_array.append(row) return result_array
[docs] def _load_class(self, input: Any, instantiate: bool = False) -> Any: """Resolve identifiers to classes/instances and optionally instantiate. Args: input: Identifier or instance of a selector/merger/metric. instantiate: If True, instantiate using extracted parameters. Returns: Class or instance. Raises: ValueError: If ``input`` is invalid. """ if isinstance(input, str): cls, params = get_class_info(input) if instantiate: init_params = extract_params(cls, self, params) return cls(**init_params) return cls elif hasattr(input, "select_features") or hasattr(input, "merge"): # Assumes valid instance if it has a 'select_features' or 'merge' method. if instantiate: # Best-effort: re-instantiate using the class and pipeline params cls = input.__class__ init_params = extract_params(cls, self, []) try: return cls(**init_params) except Exception: # Fallback to returning the same instance if re-instantiation fails return input return input else: raise ValueError( "Input must be a string identifier or a valid instance of a feature selector or merging strategy." )
[docs] def _num_metrics_total(self) -> int: """Return total number of metrics tracked per group. Includes performance metrics plus stability and optional agreement. """ return len(self.metrics) + (2 if agreement_flag else 1)
def __str__(self) -> str: return ( f"Feature selection pipeline with: merging strategy: {self.merging_strategy}, " f"feature selection methods: {self.fs_methods}, " f"number of repeats: {self.num_repeats}" ) def __call__(self) -> Any: return self.run()