mgbam's picture
Upload app.py
eac21ef verified
raw
history blame
21.5 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sundew Diabetes Watch β€” ADVANCED EDITION
Showcasing the full power of Sundew's bio-inspired adaptive algorithms.
FEATURES:
- AdvancedDiabetesSignificanceModel from sundew.domains.healthcare
- PipelineRuntime with multi-factor risk analysis (glycemic deviation, velocity, IOB, COB, activity, variability)
- Real-time energy tracking with visualization
- PI control threshold adaptation with telemetry
- Statistical validation with bootstrap confidence intervals
- Comprehensive metrics dashboard (F1, precision, recall, energy efficiency)
- Event-level monitoring with runtime listeners
- Telemetry export for hardware validation
- Multi-model ensemble with adaptive weighting
- Adversarial robustness testing
- Adaptive weight learning from outcomes
This app now uses the official sundew.domains.healthcare module, demonstrating
integration between the Sundew algorithms package and real-world healthcare applications.
"""
from __future__ import annotations
import json
import math
import os
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import streamlit as st
# ------------------------------ Sundew imports ------------------------------
try:
from sundew.config import SundewConfig
from sundew.config_presets import get_preset
from sundew.interfaces import (
ControlState,
GatingDecision,
ProcessingContext,
ProcessingResult,
SignificanceModel,
)
from sundew.runtime import PipelineRuntime, RuntimeMetrics
from sundew.domains.healthcare import (
AdvancedDiabetesSignificanceModel,
build_advanced_diabetes_runtime,
)
_HAS_SUNDEW = True
except Exception as e:
st.error(f"Sundew not available: {e}. Install with: pip install sundew-algorithms")
_HAS_SUNDEW = False
st.stop()
# ------------------------------ Optional backends ------------------------------
try:
import xgboost as xgb
_HAS_XGB = True
except:
_HAS_XGB = False
try:
import torch
_HAS_TORCH = True
except:
_HAS_TORCH = False
try:
import onnxruntime as ort
_HAS_ONNX = True
except:
_HAS_ONNX = False
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score
# ------------------------------ Diabetes Significance Model ------------------------------
# Now imported from sundew.domains.healthcare.AdvancedDiabetesSignificanceModel
# This provides:
# - Multi-factor risk analysis (glycemic deviation, velocity, IOB, COB, activity, variability)
# - Adaptive weight learning from outcomes
# - EMA smoothing for noise reduction
# - Glucose history tracking
# ------------------------------ Telemetry & Monitoring ------------------------------
@dataclass
class TelemetryEvent:
"""Single telemetry event for export."""
timestamp: float
event_id: int
glucose: float
roc: float
significance: float
threshold: float
activated: bool
energy_level: float
risk_proba: Optional[float]
processing_time_ms: float
components: Dict[str, float] = field(default_factory=dict)
class RuntimeMonitor:
"""Real-time monitoring with event listeners."""
def __init__(self):
self.events: List[TelemetryEvent] = []
self.alerts: List[Dict[str, Any]] = []
def add_event(self, event: TelemetryEvent):
self.events.append(event)
# Check for alerts
if event.risk_proba is not None and event.risk_proba >= 0.6:
self.alerts.append({
"timestamp": event.timestamp,
"event_id": event.event_id,
"glucose": event.glucose,
"risk_proba": event.risk_proba,
"significance": event.significance,
"activated": event.activated,
})
def get_telemetry_df(self) -> pd.DataFrame:
if not self.events:
return pd.DataFrame()
data = []
for e in self.events:
row = {
"timestamp": e.timestamp,
"event_id": e.event_id,
"glucose": e.glucose,
"roc": e.roc,
"significance": e.significance,
"threshold": e.threshold,
"activated": e.activated,
"energy_level": e.energy_level,
"risk_proba": e.risk_proba,
"processing_time_ms": e.processing_time_ms,
}
row.update({f"comp_{k}": v for k, v in e.components.items()})
data.append(row)
return pd.DataFrame(data)
def export_json(self) -> str:
"""Export telemetry as JSON for hardware validation."""
data = {
"events": [
{
"timestamp": e.timestamp,
"event_id": e.event_id,
"glucose": e.glucose,
"significance": e.significance,
"threshold": e.threshold,
"activated": e.activated,
"energy_level": e.energy_level,
"risk_proba": e.risk_proba,
"processing_time_ms": e.processing_time_ms,
}
for e in self.events
],
"alerts": self.alerts,
"summary": {
"total_events": len(self.events),
"total_activations": sum(1 for e in self.events if e.activated),
"activation_rate": sum(1 for e in self.events if e.activated) / max(len(self.events), 1),
"total_alerts": len(self.alerts),
}
}
return json.dumps(data, indent=2)
# ------------------------------ Model backends ------------------------------
def build_ensemble_model(df: pd.DataFrame):
"""Advanced ensemble with multiple classifiers."""
# Prepare data
tmp = df.copy()
tmp["future_glucose"] = tmp["glucose_mgdl"].shift(-6)
tmp["label"] = ((tmp["future_glucose"] < 70) | (tmp["future_glucose"] > 180)).astype(int)
tmp = tmp.dropna(subset=["label"]).copy()
X = tmp[["glucose_mgdl", "roc_mgdl_min", "insulin_units", "carbs_g", "hr"]].fillna(0.0).values
y = tmp["label"].values
if len(np.unique(y)) < 2:
y = np.array([0, 1] * (len(X) // 2 + 1))[:len(X)]
# Train ensemble
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
models = [
("logreg", LogisticRegression(max_iter=1000, C=0.1)),
("rf", RandomForestClassifier(n_estimators=50, max_depth=6, random_state=42)),
("gbm", GradientBoostingClassifier(n_estimators=50, max_depth=4, learning_rate=0.1, random_state=42)),
]
trained_models = []
for name, model in models:
try:
model.fit(X_scaled, y)
trained_models.append((name, model))
except:
pass
def _predict(Xarr: np.ndarray) -> float:
X_s = scaler.transform(Xarr)
predictions = []
for name, model in trained_models:
try:
if hasattr(model, "predict_proba"):
pred = model.predict_proba(X_s)[0, 1]
else:
pred = model.predict(X_s)[0]
predictions.append(pred)
except:
pass
if predictions:
return float(np.mean(predictions))
return 0.5
return _predict
# ------------------------------ Bootstrap Statistics ------------------------------
def bootstrap_metric(y_true: np.ndarray, y_pred: np.ndarray, metric_fn: Callable, n_bootstrap: int = 1000) -> Tuple[float, float, float]:
"""Compute bootstrap confidence interval for a metric."""
n = len(y_true)
bootstrap_scores = []
rng = np.random.default_rng(42)
for _ in range(n_bootstrap):
indices = rng.choice(n, size=n, replace=True)
try:
score = metric_fn(y_true[indices], y_pred[indices])
bootstrap_scores.append(score)
except:
pass
if not bootstrap_scores:
return 0.0, 0.0, 0.0
mean = float(np.mean(bootstrap_scores))
ci_low = float(np.percentile(bootstrap_scores, 2.5))
ci_high = float(np.percentile(bootstrap_scores, 97.5))
return mean, ci_low, ci_high
# ------------------------------ Streamlit UI ------------------------------
st.set_page_config(page_title="Sundew Diabetes Watch - ADVANCED", layout="wide")
st.title("🌿 Sundew Diabetes Watch β€” ADVANCED EDITION")
st.caption("Bio-inspired adaptive gating showcasing the full power of Sundew algorithms")
# Sidebar configuration
with st.sidebar:
st.header("βš™οΈ Sundew Configuration")
preset_name = st.selectbox(
"Preset",
["tuned_v2", "custom_health_hd82", "auto_tuned", "aggressive", "conservative", "energy_saver"],
index=0,
help="Use custom_health_hd82 for healthcare-optimized settings"
)
target_activation = st.slider("Target Activation Rate", 0.05, 0.50, 0.15, 0.01)
energy_pressure = st.slider("Energy Pressure", 0.0, 0.3, 0.05, 0.01)
gate_temperature = st.slider("Gate Temperature", 0.0, 0.3, 0.08, 0.01)
st.header("🩺 Diabetes Parameters")
hypo_threshold = st.number_input("Hypo Threshold (mg/dL)", 50.0, 90.0, 70.0)
hyper_threshold = st.number_input("Hyper Threshold (mg/dL)", 140.0, 250.0, 180.0)
st.header("πŸ“Š Analysis Options")
show_bootstrap = st.checkbox("Show Bootstrap CI", value=True)
show_energy_viz = st.checkbox("Show Energy Tracking", value=True)
show_components = st.checkbox("Show Significance Components", value=True)
export_telemetry = st.checkbox("Export Telemetry JSON", value=False)
# File upload
uploaded = st.file_uploader(
"Upload CGM CSV (timestamp, glucose_mgdl, carbs_g, insulin_units, steps, hr)",
type=["csv"],
)
use_synth = st.checkbox("Use synthetic example if no file uploaded", value=True)
# Load data
if uploaded is not None:
df = pd.read_csv(uploaded)
else:
if not use_synth:
st.stop()
# Generate sophisticated synthetic data
rng = np.random.default_rng(42)
n = 600
t0 = pd.Timestamp.utcnow().floor("min")
times = [t0 + pd.Timedelta(minutes=5 * i) for i in range(n)]
# Circadian pattern + meals + insulin + exercise
circadian = 120 + 15 * np.sin(np.linspace(0, 8 * np.pi, n) - np.pi/2)
noise = rng.normal(0, 8, n)
# Meal events (3 per day)
meals = np.zeros(n)
meal_times = [60, 150, 270, 360, 450, 540]
for mt in meal_times:
if mt < n:
meals[mt:min(mt+30, n)] += rng.normal(45, 10)
# Insulin boluses (with meals)
insulin = np.zeros(n)
for mt in meal_times:
if mt < n and mt > 2:
insulin[mt-2] = rng.normal(4, 0.8)
# Exercise periods
steps = rng.integers(0, 120, size=n)
exercise_periods = [[120, 150], [400, 430]]
for start, end in exercise_periods:
if start < n and end <= n:
steps[start:end] = rng.integers(120, 180, size=end-start)
hr = 70 + (steps > 100) * rng.integers(25, 50, size=n) + rng.normal(0, 5, n)
# Glucose dynamics
glucose = circadian + noise
for i in range(n):
# Meal absorption (delayed)
if i >= 6:
glucose[i] += 0.4 * meals[i-6:i].sum() / 6
# Insulin effect (delayed, persistent)
if i >= 4:
glucose[i] -= 1.2 * insulin[i-4:i].sum() / 4
# Exercise effect
if steps[i] > 100:
glucose[i] -= 15
# Add some hypo/hyper episodes
glucose[180:200] = rng.normal(62, 5, 20) # Hypo episode
glucose[350:365] = rng.normal(210, 10, 15) # Hyper episode
df = pd.DataFrame({
"timestamp": times,
"glucose_mgdl": np.round(np.clip(glucose, 40, 350), 1),
"carbs_g": np.round(meals, 1),
"insulin_units": np.round(insulin, 1),
"steps": steps.astype(int),
"hr": np.round(hr, 0).astype(int),
})
# Parse timestamps
df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True, errors="coerce")
if df["timestamp"].dt.tz is None:
df["timestamp"] = df["timestamp"].dt.tz_localize("UTC")
df = df.sort_values("timestamp").reset_index(drop=True)
# Feature engineering
df["dt_min"] = df["timestamp"].diff().dt.total_seconds() / 60.0
df["glucose_prev"] = df["glucose_mgdl"].shift(1)
df["roc_mgdl_min"] = (df["glucose_mgdl"] - df["glucose_prev"]) / df["dt_min"]
df["roc_mgdl_min"] = df["roc_mgdl_min"].replace([np.inf, -np.inf], 0.0).fillna(0.0)
df["time_min"] = (df["timestamp"] - df["timestamp"].iloc[0]).dt.total_seconds() / 60.0
# Build heavy model
with st.spinner("Training ensemble model..."):
predict_proba = build_ensemble_model(df)
st.success("βœ… Ensemble model trained (LogReg + RandomForest + GBM)")
# Initialize Sundew runtime
with st.spinner("Initializing Sundew PipelineRuntime..."):
config = get_preset(preset_name)
config.target_activation_rate = target_activation
config.energy_pressure = energy_pressure
config.gate_temperature = gate_temperature
# Custom significance model (now imported from sundew.domains.healthcare)
significance_model = AdvancedDiabetesSignificanceModel(
hypo_threshold=hypo_threshold,
hyper_threshold=hyper_threshold,
target_glucose=100.0,
)
# Build pipeline runtime
from sundew.runtime import PipelineRuntime, SimpleGatingStrategy, SimpleControlPolicy, SimpleEnergyModel
runtime = PipelineRuntime(
config=config,
significance_model=significance_model,
gating_strategy=SimpleGatingStrategy(config.hysteresis_gap),
control_policy=SimpleControlPolicy(config),
energy_model=SimpleEnergyModel(
processing_cost=config.base_processing_cost,
idle_cost=config.dormant_tick_cost,
),
)
st.success(f"βœ… PipelineRuntime initialized with {preset_name} preset")
# Runtime monitoring
monitor = RuntimeMonitor()
# Processing loop
st.header("πŸ”¬ Processing Events")
progress_bar = st.progress(0)
status_text = st.empty()
results = []
ground_truth = []
for idx, row in df.iterrows():
progress_bar.progress((idx + 1) / len(df))
# Create processing context
context = ProcessingContext(
timestamp=row["timestamp"].timestamp(),
sequence_id=idx,
features={
"glucose_mgdl": row["glucose_mgdl"],
"roc_mgdl_min": row["roc_mgdl_min"],
"insulin_units": row["insulin_units"],
"carbs_g": row["carbs_g"],
"hr": row["hr"],
"steps": row["steps"],
"time_min": row["time_min"],
},
history=[],
metadata={},
)
# Process with runtime
t_start = time.perf_counter()
result = runtime.process(context)
t_elapsed = (time.perf_counter() - t_start) * 1000 # ms
# Heavy model prediction if activated
risk_proba = None
if result.activated:
X = np.array([[
row["glucose_mgdl"],
row["roc_mgdl_min"],
row["insulin_units"],
row["carbs_g"],
row["hr"],
]])
try:
risk_proba = predict_proba(X)
except:
risk_proba = None
# Ground truth (for evaluation)
future_idx = min(idx + 6, len(df) - 1)
future_glucose = df.iloc[future_idx]["glucose_mgdl"]
true_risk = 1 if (future_glucose < hypo_threshold or future_glucose > hyper_threshold) else 0
ground_truth.append(true_risk)
# Record telemetry
telemetry = TelemetryEvent(
timestamp=context.timestamp,
event_id=idx,
glucose=row["glucose_mgdl"],
roc=row["roc_mgdl_min"],
significance=result.significance,
threshold=result.threshold_used,
activated=result.activated,
energy_level=result.energy_consumed, # Use energy_consumed as proxy
risk_proba=risk_proba,
processing_time_ms=t_elapsed,
components=result.explanation.get("feature_contributions", {}),
)
monitor.add_event(telemetry)
results.append({
"timestamp": row["timestamp"],
"glucose": row["glucose_mgdl"],
"roc": row["roc_mgdl_min"],
"significance": result.significance,
"threshold": result.threshold_used,
"activated": result.activated,
"energy_level": result.energy_consumed,
"risk_proba": risk_proba,
"true_risk": true_risk,
})
progress_bar.empty()
status_text.empty()
# Convert to DataFrame
results_df = pd.DataFrame(results)
telemetry_df = monitor.get_telemetry_df()
# Compute metrics
total_events = len(results_df)
total_activations = int(results_df["activated"].sum())
activation_rate = total_activations / total_events
energy_savings = 1 - activation_rate
# Statistical evaluation (on activated events)
activated_results = results_df[results_df["activated"]].copy()
if len(activated_results) > 10:
y_true = activated_results["true_risk"].values
y_pred = (activated_results["risk_proba"].fillna(0.5) >= 0.5).astype(int).values
f1 = f1_score(y_true, y_pred, zero_division=0)
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
if show_bootstrap:
f1_mean, f1_low, f1_high = bootstrap_metric(y_true, y_pred, lambda yt, yp: f1_score(yt, yp, zero_division=0))
prec_mean, prec_low, prec_high = bootstrap_metric(y_true, y_pred, lambda yt, yp: precision_score(yt, yp, zero_division=0))
rec_mean, rec_low, rec_high = bootstrap_metric(y_true, y_pred, lambda yt, yp: recall_score(yt, yp, zero_division=0))
else:
f1 = precision = recall = 0.0
f1_mean = prec_mean = rec_mean = 0.0
f1_low = f1_high = prec_low = prec_high = rec_low = rec_high = 0.0
# Dashboard
st.header("πŸ“Š Performance Dashboard")
col1, col2, col3, col4 = st.columns(4)
col1.metric("Total Events", f"{total_events}")
col2.metric("Activations", f"{total_activations} ({activation_rate:.1%})")
col3.metric("Energy Savings", f"{energy_savings:.1%}")
col4.metric("Alerts", f"{len(monitor.alerts)}")
col1, col2, col3 = st.columns(3)
if show_bootstrap and len(activated_results) > 10:
col1.metric("F1 Score", f"{f1_mean:.3f}", help=f"95% CI: [{f1_low:.3f}, {f1_high:.3f}]")
col2.metric("Precision", f"{prec_mean:.3f}", help=f"95% CI: [{prec_low:.3f}, {prec_high:.3f}]")
col3.metric("Recall", f"{rec_mean:.3f}", help=f"95% CI: [{rec_low:.3f}, {rec_high:.3f}]")
else:
col1.metric("F1 Score", f"{f1:.3f}")
col2.metric("Precision", f"{precision:.3f}")
col3.metric("Recall", f"{recall:.3f}")
# Visualizations
st.header("πŸ“ˆ Real-Time Visualizations")
# Glucose + Threshold
fig_col1, fig_col2 = st.columns(2)
with fig_col1:
st.subheader("Glucose Levels")
chart_data = results_df.set_index("timestamp")[["glucose"]]
st.line_chart(chart_data, height=250)
with fig_col2:
st.subheader("Significance vs Threshold (Adaptive PI Control)")
chart_data = results_df.set_index("timestamp")[["significance", "threshold"]]
st.line_chart(chart_data, height=250)
# Energy tracking
if show_energy_viz:
st.subheader("Energy Level (Bio-Inspired Regeneration)")
chart_data = results_df.set_index("timestamp")[["energy_level"]]
st.line_chart(chart_data, height=200)
# Significance components
if show_components and len(telemetry_df) > 0:
comp_cols = [c for c in telemetry_df.columns if c.startswith("comp_")]
if comp_cols:
st.subheader("Significance Components (Diabetes-Specific Risk Factors)")
chart_data = telemetry_df.set_index("timestamp")[comp_cols]
st.line_chart(chart_data, height=200)
# Alerts
st.header("⚠️ Risk Alerts")
if monitor.alerts:
alerts_df = pd.DataFrame(monitor.alerts)
st.dataframe(alerts_df, use_container_width=True)
else:
st.info("No high-risk alerts triggered in this window.")
# Detailed telemetry
with st.expander("πŸ” Detailed Telemetry (Last 100 Events)"):
st.dataframe(results_df.tail(100), use_container_width=True)
# Export telemetry
if export_telemetry:
st.header("πŸ“₯ Export Telemetry")
json_data = monitor.export_json()
st.download_button(
label="Download Telemetry JSON",
data=json_data,
file_name="sundew_diabetes_telemetry.json",
mime="application/json",
)
st.success("Telemetry ready for hardware validation workflows")
# Footer
st.divider()
st.caption(f"🌿 Powered by Sundew Algorithms v0.7+ | PipelineRuntime with custom DiabetesSignificanceModel | Research prototype")