|
|
""" |
|
|
Advanced Ensemble Methods for TIPM |
|
|
================================== |
|
|
|
|
|
Sophisticated ensemble techniques including voting, stacking, and dynamic |
|
|
ensembles for robust, accurate predictions. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, List, Optional, Any, Union, Tuple |
|
|
from datetime import datetime |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
try: |
|
|
from sklearn.ensemble import VotingClassifier, VotingRegressor |
|
|
from sklearn.linear_model import LogisticRegression, LinearRegression |
|
|
from sklearn.model_selection import cross_val_score, StratifiedKFold |
|
|
from sklearn.metrics import accuracy_score, r2_score, mean_squared_error |
|
|
|
|
|
SKLEARN_AVAILABLE = True |
|
|
except ImportError: |
|
|
SKLEARN_AVAILABLE = False |
|
|
|
|
|
|
|
|
from .base import BaseMLModel, ModelType, PredictionResult, ModelStatus, TrainingResult |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class TIPMEnsemble(BaseMLModel): |
|
|
""" |
|
|
Advanced ensemble system for TIPM predictions |
|
|
|
|
|
Combines multiple models using sophisticated ensemble techniques |
|
|
for maximum accuracy and robustness. |
|
|
""" |
|
|
|
|
|
def __init__(self, model_id: str = "tipm_ensemble"): |
|
|
super().__init__( |
|
|
model_id=model_id, |
|
|
name="TIPM Ensemble System", |
|
|
description="Advanced ensemble combining multiple ML models for robust predictions", |
|
|
model_type=ModelType.ENSEMBLE, |
|
|
) |
|
|
|
|
|
|
|
|
self.base_models = [] |
|
|
self.meta_model = None |
|
|
self.ensemble_type = "voting" |
|
|
|
|
|
|
|
|
self.model_weights = {} |
|
|
self.performance_history = {} |
|
|
|
|
|
|
|
|
self.hyperparameters = { |
|
|
"ensemble_method": "weighted_voting", |
|
|
"weight_optimization": True, |
|
|
"cross_validation_folds": 5, |
|
|
"meta_learning": True, |
|
|
} |
|
|
|
|
|
logger.info(f"Initialized TIPMEnsemble: {model_id}") |
|
|
|
|
|
def add_model(self, model: BaseMLModel, weight: float = 1.0): |
|
|
"""Add a model to the ensemble""" |
|
|
if not model._is_trained: |
|
|
raise ValueError( |
|
|
f"Model {model.model_id} must be trained before adding to ensemble" |
|
|
) |
|
|
|
|
|
self.base_models.append(model) |
|
|
self.model_weights[model.model_id] = weight |
|
|
|
|
|
logger.info(f"Added model {model.model_id} to ensemble with weight {weight}") |
|
|
|
|
|
def remove_model(self, model_id: str): |
|
|
"""Remove a model from the ensemble""" |
|
|
self.base_models = [m for m in self.base_models if m.model_id != model_id] |
|
|
if model_id in self.model_weights: |
|
|
del self.model_weights[model_id] |
|
|
|
|
|
logger.info(f"Removed model {model_id} from ensemble") |
|
|
|
|
|
def _create_model(self): |
|
|
"""Create the ensemble model""" |
|
|
if not self.base_models: |
|
|
raise RuntimeError("No models added to ensemble") |
|
|
|
|
|
if self.ensemble_type == "voting": |
|
|
return self._create_voting_ensemble() |
|
|
elif self.ensemble_type == "stacking": |
|
|
return self._create_stacking_ensemble() |
|
|
else: |
|
|
raise ValueError(f"Unsupported ensemble type: {self.ensemble_type}") |
|
|
|
|
|
def _create_voting_ensemble(self): |
|
|
"""Create voting ensemble""" |
|
|
if not SKLEARN_AVAILABLE: |
|
|
raise RuntimeError("Scikit-learn required for voting ensembles") |
|
|
|
|
|
|
|
|
first_model = self.base_models[0] |
|
|
if first_model.model_type in [ |
|
|
ModelType.CLASSIFICATION, |
|
|
ModelType.MULTI_CLASS, |
|
|
ModelType.BINARY, |
|
|
]: |
|
|
ensemble = VotingClassifier( |
|
|
estimators=[(m.model_id, m._model) for m in self.base_models], |
|
|
voting=( |
|
|
"soft" |
|
|
if self.hyperparameters["ensemble_method"] == "weighted_voting" |
|
|
else "hard" |
|
|
), |
|
|
) |
|
|
else: |
|
|
ensemble = VotingRegressor( |
|
|
estimators=[(m.model_id, m._model) for m in self.base_models] |
|
|
) |
|
|
|
|
|
return ensemble |
|
|
|
|
|
def _create_stacking_ensemble(self): |
|
|
"""Create stacking ensemble with meta-learner""" |
|
|
if not SKLEARN_AVAILABLE: |
|
|
raise RuntimeError("Scikit-learn required for stacking ensembles") |
|
|
|
|
|
|
|
|
first_model = self.base_models[0] |
|
|
if first_model.model_type in [ |
|
|
ModelType.CLASSIFICATION, |
|
|
ModelType.MULTI_CLASS, |
|
|
ModelType.BINARY, |
|
|
]: |
|
|
self.meta_model = LogisticRegression(random_state=42) |
|
|
else: |
|
|
self.meta_model = LinearRegression() |
|
|
|
|
|
return self.meta_model |
|
|
|
|
|
def _prepare_features(self, X: Union[pd.DataFrame, np.ndarray]) -> np.ndarray: |
|
|
"""Prepare features for ensemble prediction""" |
|
|
|
|
|
if not self.base_models: |
|
|
raise RuntimeError("No models in ensemble") |
|
|
|
|
|
|
|
|
base_predictions = [] |
|
|
for model in self.base_models: |
|
|
pred_result = model.predict(X) |
|
|
base_predictions.append(pred_result.predictions) |
|
|
|
|
|
|
|
|
if isinstance(base_predictions[0], np.ndarray): |
|
|
stacked_predictions = np.column_stack(base_predictions) |
|
|
else: |
|
|
stacked_predictions = np.array(base_predictions).T |
|
|
|
|
|
return stacked_predictions |
|
|
|
|
|
def _prepare_targets(self, y: Union[pd.Series, np.ndarray]) -> np.ndarray: |
|
|
"""Prepare targets for ensemble training""" |
|
|
if isinstance(y, pd.Series): |
|
|
return y.values |
|
|
return y |
|
|
|
|
|
def fit( |
|
|
self, |
|
|
X: Union[pd.DataFrame, np.ndarray], |
|
|
y: Union[pd.Series, np.ndarray], |
|
|
**kwargs, |
|
|
) -> "TIPMEnsemble": |
|
|
"""Train the ensemble model""" |
|
|
if not self.base_models: |
|
|
raise RuntimeError("No models added to ensemble") |
|
|
|
|
|
training_start = datetime.now() |
|
|
|
|
|
try: |
|
|
logger.info(f"Starting ensemble training for {self.model_id}") |
|
|
self.metadata.status = ModelStatus.TRAINING |
|
|
|
|
|
|
|
|
self._model = self._create_model() |
|
|
|
|
|
if self.ensemble_type == "stacking": |
|
|
|
|
|
base_predictions = self._get_base_predictions(X) |
|
|
self._model.fit(base_predictions, y) |
|
|
else: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
if self.hyperparameters["weight_optimization"]: |
|
|
self._optimize_weights(X, y) |
|
|
|
|
|
|
|
|
self._is_trained = True |
|
|
self.metadata.status = ModelStatus.TRAINED |
|
|
self.metadata.last_trained = datetime.now() |
|
|
self.metadata.feature_count = len(self.base_models) |
|
|
self.metadata.sample_count = X.shape[0] if hasattr(X, "shape") else len(X) |
|
|
|
|
|
|
|
|
training_score = self._calculate_ensemble_score(X, y) |
|
|
self.metadata.training_score = training_score |
|
|
|
|
|
|
|
|
training_result = TrainingResult( |
|
|
model_id=self.model_id, |
|
|
training_start=training_start, |
|
|
training_end=datetime.now(), |
|
|
training_score=training_score, |
|
|
validation_score=0.0, |
|
|
) |
|
|
self.training_history.append(training_result) |
|
|
|
|
|
logger.info(f"Ensemble training completed for {self.model_id}") |
|
|
return self |
|
|
|
|
|
except Exception as e: |
|
|
self.metadata.status = ModelStatus.FAILED |
|
|
logger.error(f"Ensemble training failed for {self.model_id}: {e}") |
|
|
raise |
|
|
|
|
|
def _get_base_predictions(self, X: Union[pd.DataFrame, np.ndarray]) -> np.ndarray: |
|
|
"""Get predictions from all base models""" |
|
|
base_predictions = [] |
|
|
|
|
|
for model in self.base_models: |
|
|
pred_result = model.predict(X) |
|
|
predictions = pred_result.predictions |
|
|
|
|
|
|
|
|
if isinstance(predictions, np.ndarray): |
|
|
if predictions.ndim == 1: |
|
|
base_predictions.append(predictions) |
|
|
else: |
|
|
|
|
|
base_predictions.append(predictions[:, 0]) |
|
|
else: |
|
|
base_predictions.append(np.array(predictions)) |
|
|
|
|
|
return np.column_stack(base_predictions) |
|
|
|
|
|
def _optimize_weights( |
|
|
self, X: Union[pd.DataFrame, np.ndarray], y: Union[pd.Series, np.ndarray] |
|
|
): |
|
|
"""Optimize model weights using cross-validation""" |
|
|
if not SKLEARN_AVAILABLE: |
|
|
return |
|
|
|
|
|
logger.info("Optimizing ensemble weights...") |
|
|
|
|
|
|
|
|
base_predictions = self._get_base_predictions(X) |
|
|
|
|
|
|
|
|
best_score = -1 |
|
|
best_weights = None |
|
|
|
|
|
|
|
|
weight_combinations = [ |
|
|
[1.0, 1.0, 1.0], |
|
|
[2.0, 1.0, 1.0], |
|
|
[1.0, 2.0, 1.0], |
|
|
[1.0, 1.0, 2.0], |
|
|
[3.0, 2.0, 1.0], |
|
|
] |
|
|
|
|
|
for weights in weight_combinations: |
|
|
if len(weights) != len(self.base_models): |
|
|
continue |
|
|
|
|
|
|
|
|
weights = np.array(weights) / np.sum(weights) |
|
|
|
|
|
|
|
|
weighted_pred = np.zeros_like(base_predictions[:, 0]) |
|
|
for i, weight in enumerate(weights): |
|
|
weighted_pred += weight * base_predictions[:, i] |
|
|
|
|
|
|
|
|
if self.base_models[0].model_type in [ |
|
|
ModelType.CLASSIFICATION, |
|
|
ModelType.MULTI_CLASS, |
|
|
ModelType.BINARY, |
|
|
]: |
|
|
score = accuracy_score(y, np.round(weighted_pred)) |
|
|
else: |
|
|
score = r2_score(y, weighted_pred) |
|
|
|
|
|
if score > best_score: |
|
|
best_score = score |
|
|
best_weights = weights |
|
|
|
|
|
|
|
|
if best_weights is not None: |
|
|
for i, model in enumerate(self.base_models): |
|
|
self.model_weights[model.model_id] = float(best_weights[i]) |
|
|
|
|
|
logger.info( |
|
|
f"Optimized weights: {dict(zip([m.model_id for m in self.base_models], best_weights))}" |
|
|
) |
|
|
|
|
|
def _calculate_ensemble_score( |
|
|
self, X: Union[pd.DataFrame, np.ndarray], y: Union[pd.Series, np.ndarray] |
|
|
) -> float: |
|
|
"""Calculate ensemble performance score""" |
|
|
try: |
|
|
predictions = self.predict(X).predictions |
|
|
|
|
|
if self.base_models[0].model_type in [ |
|
|
ModelType.CLASSIFICATION, |
|
|
ModelType.MULTI_CLASS, |
|
|
ModelType.BINARY, |
|
|
]: |
|
|
return accuracy_score(y, predictions) |
|
|
else: |
|
|
return r2_score(y, predictions) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not calculate ensemble score: {e}") |
|
|
return 0.0 |
|
|
|
|
|
def predict(self, X: Union[pd.DataFrame, np.ndarray]) -> PredictionResult: |
|
|
"""Make ensemble predictions""" |
|
|
if not self._is_trained: |
|
|
raise RuntimeError( |
|
|
f"Ensemble {self.model_id} must be trained before making predictions" |
|
|
) |
|
|
|
|
|
prediction_start = datetime.now() |
|
|
|
|
|
try: |
|
|
if self.ensemble_type == "stacking": |
|
|
|
|
|
base_predictions = self._get_base_predictions(X) |
|
|
predictions = self._model.predict(base_predictions) |
|
|
else: |
|
|
|
|
|
predictions = self._weighted_voting_predict(X) |
|
|
|
|
|
|
|
|
processing_time = (datetime.now() - prediction_start).total_seconds() |
|
|
|
|
|
|
|
|
result = PredictionResult( |
|
|
prediction_id=f"{self.model_id}_{prediction_start.strftime('%Y%m%d_%H%M%S')}", |
|
|
model_id=self.model_id, |
|
|
timestamp=prediction_start, |
|
|
predictions=predictions, |
|
|
input_features=X.to_dict() if hasattr(X, "to_dict") else None, |
|
|
feature_names=[m.model_id for m in self.base_models], |
|
|
prediction_type=self.model_type.value, |
|
|
processing_time=processing_time, |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Ensemble prediction failed for {self.model_id}: {e}") |
|
|
raise |
|
|
|
|
|
def _weighted_voting_predict( |
|
|
self, X: Union[pd.DataFrame, np.ndarray] |
|
|
) -> np.ndarray: |
|
|
"""Make weighted voting predictions""" |
|
|
|
|
|
base_predictions = [] |
|
|
for model in self.base_models: |
|
|
pred_result = model.predict(X) |
|
|
predictions = pred_result.predictions |
|
|
|
|
|
|
|
|
if isinstance(predictions, np.ndarray): |
|
|
if predictions.ndim == 1: |
|
|
base_predictions.append(predictions.astype(np.float64)) |
|
|
else: |
|
|
base_predictions.append(predictions[:, 0].astype(np.float64)) |
|
|
else: |
|
|
base_predictions.append(np.array(predictions, dtype=np.float64)) |
|
|
|
|
|
|
|
|
weighted_pred = np.zeros_like(base_predictions[0], dtype=np.float64) |
|
|
total_weight = 0.0 |
|
|
|
|
|
for i, model in enumerate(self.base_models): |
|
|
weight = float(self.model_weights.get(model.model_id, 1.0)) |
|
|
weighted_pred += weight * base_predictions[i] |
|
|
total_weight += weight |
|
|
|
|
|
if total_weight > 0: |
|
|
weighted_pred /= total_weight |
|
|
|
|
|
return weighted_pred |
|
|
|
|
|
def get_ensemble_analysis(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive ensemble analysis""" |
|
|
if not self.base_models: |
|
|
return {"error": "No models in ensemble"} |
|
|
|
|
|
analysis = { |
|
|
"ensemble_type": self.ensemble_type, |
|
|
"model_count": len(self.base_models), |
|
|
"models": [], |
|
|
"weights": self.model_weights.copy(), |
|
|
"performance_summary": {}, |
|
|
} |
|
|
|
|
|
|
|
|
for model in self.base_models: |
|
|
model_info = { |
|
|
"model_id": model.model_id, |
|
|
"name": model.name, |
|
|
"type": model.model_type.value, |
|
|
"training_score": model.metadata.training_score, |
|
|
"validation_score": model.metadata.validation_score, |
|
|
"last_trained": ( |
|
|
model.metadata.last_trained.isoformat() |
|
|
if model.metadata.last_trained |
|
|
else None |
|
|
), |
|
|
} |
|
|
analysis["models"].append(model_info) |
|
|
|
|
|
|
|
|
if self._is_trained: |
|
|
analysis["performance_summary"] = { |
|
|
"ensemble_score": self.metadata.training_score, |
|
|
"best_model": max( |
|
|
self.base_models, key=lambda m: m.metadata.training_score or 0 |
|
|
).model_id, |
|
|
"weight_distribution": self._analyze_weight_distribution(), |
|
|
} |
|
|
|
|
|
return analysis |
|
|
|
|
|
def _analyze_weight_distribution(self) -> Dict[str, Any]: |
|
|
"""Analyze the distribution of model weights""" |
|
|
weights = list(self.model_weights.values()) |
|
|
|
|
|
return { |
|
|
"total_weight": sum(weights), |
|
|
"average_weight": np.mean(weights), |
|
|
"weight_variance": np.var(weights), |
|
|
"dominant_model": max(self.model_weights.items(), key=lambda x: x[1])[0], |
|
|
"weight_balance": "Balanced" if np.var(weights) < 0.1 else "Unbalanced", |
|
|
} |
|
|
|
|
|
|
|
|
class ModelVoting(BaseMLModel): |
|
|
"""Simple voting ensemble for model combination""" |
|
|
|
|
|
def __init__(self, model_id: str = "model_voting"): |
|
|
super().__init__( |
|
|
model_id=model_id, |
|
|
name="Model Voting Ensemble", |
|
|
description="Simple voting ensemble combining multiple models", |
|
|
model_type=ModelType.VOTING, |
|
|
) |
|
|
|
|
|
self.models = [] |
|
|
self.voting_method = "hard" |
|
|
|
|
|
logger.info(f"Initialized ModelVoting: {model_id}") |
|
|
|
|
|
def add_model(self, model: BaseMLModel): |
|
|
"""Add a model to the voting ensemble""" |
|
|
if not model._is_trained: |
|
|
raise ValueError( |
|
|
f"Model {model.model_id} must be trained before adding to voting ensemble" |
|
|
) |
|
|
|
|
|
self.models.append(model) |
|
|
logger.info(f"Added model {model.model_id} to voting ensemble") |
|
|
|
|
|
def _create_model(self): |
|
|
"""Voting ensemble doesn't need a separate model""" |
|
|
return None |
|
|
|
|
|
def _prepare_features(self, X: Union[pd.DataFrame, np.ndarray]) -> np.ndarray: |
|
|
"""Features are handled by individual models""" |
|
|
return X |
|
|
|
|
|
def _prepare_targets(self, y: Union[pd.Series, np.ndarray]) -> np.ndarray: |
|
|
"""Targets are handled by individual models""" |
|
|
if isinstance(y, pd.Series): |
|
|
return y.values |
|
|
return y |
|
|
|
|
|
def predict(self, X: Union[pd.DataFrame, np.ndarray]) -> PredictionResult: |
|
|
"""Make voting predictions""" |
|
|
if not self.models: |
|
|
raise RuntimeError("No models in voting ensemble") |
|
|
|
|
|
prediction_start = datetime.now() |
|
|
|
|
|
try: |
|
|
|
|
|
all_predictions = [] |
|
|
for model in self.models: |
|
|
pred_result = model.predict(X) |
|
|
all_predictions.append(pred_result.predictions) |
|
|
|
|
|
|
|
|
if self.voting_method == "hard": |
|
|
|
|
|
predictions = self._hard_voting(all_predictions) |
|
|
else: |
|
|
|
|
|
predictions = self._soft_voting(all_predictions) |
|
|
|
|
|
|
|
|
processing_time = (datetime.now() - prediction_start).total_seconds() |
|
|
|
|
|
|
|
|
result = PredictionResult( |
|
|
prediction_id=f"{self.model_id}_{prediction_start.strftime('%Y%m%d_%H%M%S')}", |
|
|
model_id=self.model_id, |
|
|
timestamp=prediction_start, |
|
|
predictions=predictions, |
|
|
input_features=X.to_dict() if hasattr(X, "to_dict") else None, |
|
|
feature_names=[m.model_id for m in self.models], |
|
|
prediction_type=self.model_type.value, |
|
|
processing_time=processing_time, |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Voting prediction failed for {self.model_id}: {e}") |
|
|
raise |
|
|
|
|
|
def _hard_voting(self, all_predictions: List[np.ndarray]) -> np.ndarray: |
|
|
"""Hard voting: majority class prediction""" |
|
|
|
|
|
predictions_array = np.array(all_predictions) |
|
|
|
|
|
|
|
|
if predictions_array.ndim == 2: |
|
|
from scipy.stats import mode |
|
|
|
|
|
predictions, _ = mode(predictions_array, axis=0) |
|
|
return predictions.flatten() |
|
|
else: |
|
|
|
|
|
from scipy.stats import mode |
|
|
|
|
|
predictions, _ = mode(predictions_array, axis=0) |
|
|
return predictions |
|
|
|
|
|
def _soft_voting(self, all_predictions: List[np.ndarray]) -> np.ndarray: |
|
|
"""Soft voting: average of probabilities""" |
|
|
|
|
|
predictions_array = np.array(all_predictions) |
|
|
|
|
|
|
|
|
return np.mean(predictions_array, axis=0) |
|
|
|
|
|
|
|
|
class StackingEnsemble(BaseMLModel): |
|
|
"""Stacking ensemble with meta-learner""" |
|
|
|
|
|
def __init__(self, model_id: str = "stacking_ensemble"): |
|
|
super().__init__( |
|
|
model_id=model_id, |
|
|
name="Stacking Ensemble", |
|
|
description="Stacking ensemble with meta-learner for optimal combination", |
|
|
model_type=ModelType.STACKING, |
|
|
) |
|
|
|
|
|
self.base_models = [] |
|
|
self.meta_model = None |
|
|
|
|
|
logger.info(f"Initialized StackingEnsemble: {model_id}") |
|
|
|
|
|
def add_base_model(self, model: BaseMLModel): |
|
|
"""Add a base model to the stacking ensemble""" |
|
|
if not model._is_trained: |
|
|
raise ValueError( |
|
|
f"Model {model.model_id} must be trained before adding to stacking ensemble" |
|
|
) |
|
|
|
|
|
self.base_models.append(model) |
|
|
logger.info(f"Added base model {model.model_id} to stacking ensemble") |
|
|
|
|
|
def set_meta_model(self, model: BaseMLModel): |
|
|
"""Set the meta-learner model""" |
|
|
if not model._is_trained: |
|
|
raise ValueError( |
|
|
f"Meta model {model.model_id} must be trained before setting" |
|
|
) |
|
|
|
|
|
self.meta_model = model |
|
|
logger.info(f"Set meta model {model.model_id} for stacking ensemble") |
|
|
|
|
|
def _create_model(self): |
|
|
"""Stacking ensemble uses base models and meta model""" |
|
|
return None |
|
|
|
|
|
def _prepare_features(self, X: Union[pd.DataFrame, np.ndarray]) -> np.ndarray: |
|
|
"""Features are handled by base models""" |
|
|
return X |
|
|
|
|
|
def _prepare_targets(self, y: Union[pd.Series, np.ndarray]) -> np.ndarray: |
|
|
"""Targets are handled by base models""" |
|
|
if isinstance(y, pd.Series): |
|
|
return y.values |
|
|
return y |
|
|
|
|
|
def predict(self, X: Union[pd.DataFrame, np.ndarray]) -> PredictionResult: |
|
|
"""Make stacking ensemble predictions""" |
|
|
if not self.base_models or not self.meta_model: |
|
|
raise RuntimeError("Stacking ensemble requires base models and meta model") |
|
|
|
|
|
prediction_start = datetime.now() |
|
|
|
|
|
try: |
|
|
|
|
|
base_predictions = [] |
|
|
for model in self.base_models: |
|
|
pred_result = model.predict(X) |
|
|
predictions = pred_result.predictions |
|
|
|
|
|
|
|
|
if isinstance(predictions, np.ndarray): |
|
|
if predictions.ndim == 1: |
|
|
base_predictions.append(predictions) |
|
|
else: |
|
|
base_predictions.append(predictions[:, 0]) |
|
|
else: |
|
|
base_predictions.append(np.array(predictions)) |
|
|
|
|
|
|
|
|
stacked_features = np.column_stack(base_predictions) |
|
|
|
|
|
|
|
|
meta_predictions = self.meta_model.predict(stacked_features).predictions |
|
|
|
|
|
|
|
|
processing_time = (datetime.now() - prediction_start).total_seconds() |
|
|
|
|
|
|
|
|
result = PredictionResult( |
|
|
prediction_id=f"{self.model_id}_{prediction_start.strftime('%Y%m%d_%H%M%S')}", |
|
|
model_id=self.model_id, |
|
|
timestamp=prediction_start, |
|
|
predictions=meta_predictions, |
|
|
input_features=X.to_dict() if hasattr(X, "to_dict") else None, |
|
|
feature_names=[m.model_id for m in self.base_models], |
|
|
prediction_type=self.model_type.value, |
|
|
processing_time=processing_time, |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Stacking prediction failed for {self.model_id}: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
class DynamicEnsemble(BaseMLModel): |
|
|
"""Dynamic ensemble that adapts based on performance""" |
|
|
|
|
|
def __init__(self, model_id: str = "dynamic_ensemble"): |
|
|
super().__init__( |
|
|
model_id=model_id, |
|
|
name="Dynamic Ensemble", |
|
|
description="Dynamic ensemble that adapts model weights based on recent performance", |
|
|
model_type=ModelType.ENSEMBLE, |
|
|
) |
|
|
|
|
|
self.models = [] |
|
|
self.performance_window = 10 |
|
|
self.performance_history = {} |
|
|
self.adaptive_weights = {} |
|
|
|
|
|
logger.info(f"Initialized DynamicEnsemble: {model_id}") |
|
|
|
|
|
def add_model(self, model: BaseMLModel, initial_weight: float = 1.0): |
|
|
"""Add a model to the dynamic ensemble""" |
|
|
if not model._is_trained: |
|
|
raise ValueError( |
|
|
f"Model {model.model_id} must be trained before adding to dynamic ensemble" |
|
|
) |
|
|
|
|
|
self.models.append(model) |
|
|
self.adaptive_weights[model.model_id] = initial_weight |
|
|
self.performance_history[model.model_id] = [] |
|
|
|
|
|
logger.info( |
|
|
f"Added model {model.model_id} to dynamic ensemble with initial weight {initial_weight}" |
|
|
) |
|
|
|
|
|
def _create_model(self): |
|
|
"""Dynamic ensemble doesn't need a separate model""" |
|
|
return None |
|
|
|
|
|
def _prepare_features(self, X: Union[pd.DataFrame, np.ndarray]) -> np.ndarray: |
|
|
"""Features are handled by individual models""" |
|
|
return X |
|
|
|
|
|
def _prepare_targets(self, y: Union[pd.Series, np.ndarray]) -> np.ndarray: |
|
|
"""Targets are handled by individual models""" |
|
|
if isinstance(y, pd.Series): |
|
|
return y.values |
|
|
return y |
|
|
|
|
|
def update_performance( |
|
|
self, model_id: str, actual: np.ndarray, predicted: np.ndarray |
|
|
): |
|
|
"""Update performance history for a model""" |
|
|
if model_id not in self.performance_history: |
|
|
return |
|
|
|
|
|
|
|
|
if len(actual) == len(predicted): |
|
|
if isinstance(predicted[0], (int, str)): |
|
|
performance = accuracy_score(actual, predicted) |
|
|
else: |
|
|
performance = r2_score(actual, predicted) |
|
|
|
|
|
|
|
|
self.performance_history[model_id].append(performance) |
|
|
|
|
|
|
|
|
if len(self.performance_history[model_id]) > self.performance_window: |
|
|
self.performance_history[model_id].pop(0) |
|
|
|
|
|
|
|
|
self._update_weights() |
|
|
|
|
|
def _update_weights(self): |
|
|
"""Update model weights based on recent performance""" |
|
|
if not self.models: |
|
|
return |
|
|
|
|
|
|
|
|
avg_performance = {} |
|
|
for model_id in self.performance_history: |
|
|
if self.performance_history[model_id]: |
|
|
avg_performance[model_id] = np.mean(self.performance_history[model_id]) |
|
|
else: |
|
|
avg_performance[model_id] = 0.5 |
|
|
|
|
|
|
|
|
total_performance = sum(avg_performance.values()) |
|
|
if total_performance > 0: |
|
|
for model_id in avg_performance: |
|
|
self.adaptive_weights[model_id] = ( |
|
|
avg_performance[model_id] / total_performance |
|
|
) |
|
|
|
|
|
logger.info(f"Updated weights: {self.adaptive_weights}") |
|
|
|
|
|
def predict(self, X: Union[pd.DataFrame, np.ndarray]) -> PredictionResult: |
|
|
"""Make dynamic ensemble predictions""" |
|
|
if not self.models: |
|
|
raise RuntimeError("No models in dynamic ensemble") |
|
|
|
|
|
prediction_start = datetime.now() |
|
|
|
|
|
try: |
|
|
|
|
|
all_predictions = [] |
|
|
for model in self.models: |
|
|
pred_result = model.predict(X) |
|
|
predictions = pred_result.predictions |
|
|
|
|
|
|
|
|
if isinstance(predictions, np.ndarray): |
|
|
if predictions.ndim == 1: |
|
|
all_predictions.append(predictions) |
|
|
else: |
|
|
all_predictions.append(predictions[:, 0]) |
|
|
else: |
|
|
all_predictions.append(np.array(predictions)) |
|
|
|
|
|
|
|
|
weighted_pred = np.zeros_like(all_predictions[0]) |
|
|
total_weight = 0 |
|
|
|
|
|
for i, model in enumerate(self.models): |
|
|
weight = self.adaptive_weights.get(model.model_id, 1.0) |
|
|
weighted_pred += weight * all_predictions[i] |
|
|
total_weight += weight |
|
|
|
|
|
if total_weight > 0: |
|
|
weighted_pred /= total_weight |
|
|
|
|
|
|
|
|
processing_time = (datetime.now() - prediction_start).total_seconds() |
|
|
|
|
|
|
|
|
result = PredictionResult( |
|
|
prediction_id=f"{self.model_id}_{prediction_start.strftime('%Y%m%d_%H%M%S')}", |
|
|
model_id=self.model_id, |
|
|
timestamp=prediction_start, |
|
|
predictions=weighted_pred, |
|
|
input_features=X.to_dict() if hasattr(X, "to_dict") else None, |
|
|
feature_names=[m.model_id for m in self.models], |
|
|
prediction_type=self.model_type.value, |
|
|
processing_time=processing_time, |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Dynamic ensemble prediction failed for {self.model_id}: {e}") |
|
|
raise |
|
|
|
|
|
def get_dynamic_analysis(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive dynamic ensemble analysis""" |
|
|
analysis = { |
|
|
"model_count": len(self.models), |
|
|
"models": [], |
|
|
"current_weights": self.adaptive_weights.copy(), |
|
|
"performance_summary": {}, |
|
|
"adaptation_history": {}, |
|
|
} |
|
|
|
|
|
|
|
|
for model in self.models: |
|
|
model_info = { |
|
|
"model_id": model.model_id, |
|
|
"name": model.name, |
|
|
"type": model.model_type.value, |
|
|
"current_weight": self.adaptive_weights.get(model.model_id, 0.0), |
|
|
"recent_performance": self.performance_history.get(model.model_id, []), |
|
|
} |
|
|
analysis["models"].append(model_info) |
|
|
|
|
|
|
|
|
if self.models: |
|
|
analysis["performance_summary"] = { |
|
|
"best_performing_model": max( |
|
|
self.adaptive_weights.items(), key=lambda x: x[1] |
|
|
)[0], |
|
|
"weight_variance": np.var(list(self.adaptive_weights.values())), |
|
|
"adaptation_active": any( |
|
|
len(hist) > 0 for hist in self.performance_history.values() |
|
|
), |
|
|
} |
|
|
|
|
|
return analysis |
|
|
|