Spaces:
Running
Running
| # utils/orchestrator.py | |
| # Coordinates emotion, memory, and reflection servers together | |
| from typing import Dict, Any, Optional | |
| import asyncio | |
| import time | |
| from utils.mcp_client import MCPMux | |
| from utils.needs_lexicon import infer_needs | |
| from utils.intervention_lexicon import ( | |
| get_interventions, | |
| should_show_interventions, | |
| format_interventions, | |
| ) | |
| class GhostMaloneMux: | |
| """Orchestrator for Ghost Malone's three servers: emotion, memory, reflection""" | |
| def __init__(self): | |
| self.mux = MCPMux() | |
| self.initialized = False | |
| async def initialize(self): | |
| """Connect to all three MCP servers""" | |
| if self.initialized: | |
| return | |
| print("[Orchestrator] Connecting to emotion server...") | |
| await self.mux.connect_stdio( | |
| "emotion", "python", args=["servers/emotion_server.py"] | |
| ) | |
| print("[Orchestrator] Connected to emotion server.") | |
| print("[Orchestrator] Connecting to memory server...") | |
| await self.mux.connect_stdio( | |
| "memory", "python", args=["servers/memory_server.py"] | |
| ) | |
| print("[Orchestrator] Connected to memory server.") | |
| print("[Orchestrator] Connecting to reflection server...") | |
| await self.mux.connect_stdio( | |
| "reflection", "python", args=["servers/reflection_server.py"] | |
| ) | |
| print("[Orchestrator] Connected to reflection server.") | |
| self.initialized = True | |
| async def process_message( | |
| self, | |
| user_text: str, | |
| conversation_context: list[dict] | None = None, | |
| user_id: str = "user_001", | |
| intervention_thresholds: dict | None = None, | |
| ) -> dict: | |
| """ | |
| Process a user message through the full pipeline: | |
| 1. Emotion analysis | |
| 2. Needs inference | |
| 3. Memory operations (parallel) | |
| 4. Reflection generation | |
| 5. Intervention generation (if thresholds met) | |
| Args: | |
| user_text: The user's message | |
| conversation_context: Previous messages | |
| user_id: User identifier | |
| intervention_thresholds: Optional dict with min_messages, min_confidence, min_arousal | |
| Returns dict with: | |
| "user_text": str, | |
| "emotion": dict (emotion analysis), | |
| "inferred_needs": list (psychological needs detected), | |
| "emotion_arc": dict (trajectory), | |
| "response": str (assistant reply), | |
| "tone": str | |
| } | |
| """ | |
| if not self.initialized: | |
| await self.initialize() | |
| # Extract intervention thresholds (SIMPLIFIED defaults for demo) | |
| thresholds = intervention_thresholds or {} | |
| min_messages = thresholds.get("min_messages", 2) # Just need one exchange | |
| min_confidence = thresholds.get("min_confidence", 0.70) # Lower bar | |
| min_arousal = thresholds.get("min_arousal", 0.40) # Catch more cases | |
| start_time = time.time() | |
| # Initialize toolbox log | |
| toolbox_log = [] | |
| # Step 1: Analyze emotion | |
| t1 = time.time() | |
| toolbox_log.append( | |
| "🔧 **emotion_server.analyze()** - Detecting emotions from text" | |
| ) | |
| emotion_data = await self.mux.call( | |
| "analyze", {"text": user_text, "user_id": user_id} | |
| ) | |
| elapsed = (time.time() - t1) * 1000 | |
| print(f"⏱️ Emotion analysis: {elapsed:.0f}ms") | |
| # Parse emotion response (it comes as JSON string from MCP) | |
| if isinstance(emotion_data, str): | |
| import json | |
| emotion_dict = json.loads(emotion_data) | |
| else: | |
| emotion_dict = emotion_data | |
| toolbox_log.append( | |
| f" ✅ Found: {emotion_dict.get('labels', [])} ({elapsed:.0f}ms)" | |
| ) | |
| # Step 2: Infer psychological needs from emotion + context (fast, <100ms) | |
| t2 = time.time() | |
| toolbox_log.append( | |
| "\n📖 **needs_lexicon.infer_needs()** - Detecting psychological needs" | |
| ) | |
| emotion_labels = emotion_dict.get("labels", []) | |
| valence = emotion_dict.get("valence", 0.0) | |
| arousal = emotion_dict.get("arousal", 0.0) | |
| inferred_needs = infer_needs(emotion_labels, user_text, valence, arousal) | |
| elapsed = (time.time() - t2) * 1000 | |
| print(f"⏱️ Needs inference: {elapsed:.0f}ms") | |
| if inferred_needs: | |
| need_summary = ", ".join( | |
| [f"{n['label']} ({n['confidence']:.0%})" for n in inferred_needs] | |
| ) | |
| toolbox_log.append(f" ✅ Detected: {need_summary} ({elapsed:.0f}ms)") | |
| else: | |
| toolbox_log.append(f" ℹ️ No strong needs detected ({elapsed:.0f}ms)") | |
| # Debug: Print detected needs | |
| print(f"🎯 Inferred needs: {inferred_needs}") | |
| if inferred_needs: | |
| for need in inferred_needs: | |
| print(f" - {need['icon']} {need['label']} ({need['confidence']:.0%})") | |
| # Step 3 & 4: Parallelize memory operations (they don't depend on each other) | |
| t3 = time.time() | |
| toolbox_log.append("\n🧠 **memory_server** - Parallel operations:") | |
| toolbox_log.append(" 📥 get_emotion_arc() - Retrieving emotion history") | |
| toolbox_log.append(" 💾 remember_event() - Storing current message") | |
| emotion_arc_task = self.mux.call("get_emotion_arc", {"k": 10}) | |
| event = { | |
| "text": user_text, | |
| "emotion": emotion_dict, | |
| "role": "user", | |
| "user_id": user_id, | |
| } | |
| remember_task = self.mux.call("remember_event", {"event": event}) | |
| # Run both memory operations in parallel | |
| emotion_arc, _ = await asyncio.gather(emotion_arc_task, remember_task) | |
| elapsed = (time.time() - t3) * 1000 | |
| print(f"⏱️ Memory operations (parallel): {elapsed:.0f}ms") | |
| toolbox_log.append(f" ✅ Completed ({elapsed:.0f}ms)") | |
| if isinstance(emotion_arc, str): | |
| import json | |
| emotion_arc = json.loads(emotion_arc) | |
| # Step 5: Generate reflection using emotion + arc | |
| t4 = time.time() | |
| toolbox_log.append( | |
| "\n🤖 **reflection_server.generate()** - Claude response generation" | |
| ) | |
| tone = emotion_dict.get("tone", "neutral") | |
| reflection_response = await self.mux.call( | |
| "generate", | |
| { | |
| "text": user_text, | |
| "context": conversation_context, | |
| "tone": tone, | |
| "emotion_arc": emotion_arc, | |
| }, | |
| ) | |
| elapsed = (time.time() - t4) * 1000 | |
| print(f"⏱️ Reflection generation (Claude): {elapsed:.0f}ms") | |
| toolbox_log.append(f" ✅ Response generated ({elapsed:.0f}ms)") | |
| if isinstance(reflection_response, str): | |
| import json | |
| try: | |
| response_dict = json.loads(reflection_response) | |
| reply = response_dict.get("reply", reflection_response) | |
| except Exception: | |
| reply = reflection_response | |
| else: | |
| reply = str(reflection_response) | |
| # Step 6: Generate interventions (SIMPLIFIED FOR DEMO) | |
| toolbox_log.append( | |
| "\n💡 **intervention_lexicon** - Checking intervention criteria" | |
| ) | |
| message_count = len(conversation_context) + 1 if conversation_context else 1 | |
| arousal = emotion_dict.get("arousal", 0.5) | |
| intervention_text = "" | |
| # Simple rule: If we detected a need, check if we should show interventions | |
| if inferred_needs and should_show_interventions( | |
| confidence=inferred_needs[0]["confidence"], | |
| message_count=message_count, | |
| emotional_intensity=arousal, | |
| min_messages=min_messages, | |
| min_confidence=min_confidence, | |
| min_arousal=min_arousal, | |
| ): | |
| toolbox_log.append( | |
| f" ✅ Thresholds met (msg≥{min_messages}, conf≥{min_confidence:.0%}, arousal≥{min_arousal:.1f})" | |
| ) | |
| need_type = inferred_needs[0]["need"] | |
| contexts = inferred_needs[0].get("matched_contexts", []) | |
| interventions = get_interventions(need_type, contexts, limit=3) | |
| if interventions: | |
| toolbox_log.append( | |
| f" 📋 get_interventions() - Retrieved {len(interventions)} strategies for {need_type}" | |
| ) | |
| intervention_text = format_interventions( | |
| need_type, | |
| interventions, | |
| inferred_needs[0]["confidence"], | |
| emotion_arc=emotion_arc, | |
| user_text=user_text, | |
| ) | |
| print(f"💡 Showing {len(interventions)} interventions for {need_type}") | |
| else: | |
| reasons = [] | |
| if not inferred_needs: | |
| reasons.append("no needs detected") | |
| else: | |
| if message_count < min_messages: | |
| reasons.append(f"msg count {message_count}<{min_messages}") | |
| if inferred_needs[0]["confidence"] < min_confidence: | |
| reasons.append( | |
| f"confidence {inferred_needs[0]['confidence']:.0%}<{min_confidence:.0%}" | |
| ) | |
| if arousal < min_arousal: | |
| reasons.append(f"arousal {arousal:.1f}<{min_arousal:.1f}") | |
| toolbox_log.append(f" ℹ️ No interventions ({', '.join(reasons)})") | |
| # Combine empathetic response + interventions | |
| full_response = reply + intervention_text | |
| total_time = time.time() - start_time | |
| print(f"⏱️ TOTAL pipeline time: {total_time*1000:.0f}ms ({total_time:.1f}s)") | |
| toolbox_log.append(f"\n⏱️ **Total pipeline:** {total_time*1000:.0f}ms") | |
| # Format toolbox log | |
| toolbox_log_str = "🧰 **Toolbox Activity:**\n\n" + "\n".join(toolbox_log) | |
| return { | |
| "user_text": user_text, | |
| "emotion": emotion_dict, | |
| "inferred_needs": inferred_needs, | |
| "emotion_arc": emotion_arc, | |
| "response": full_response, | |
| "tone": tone, | |
| "has_interventions": bool(intervention_text), | |
| "toolbox_log": toolbox_log_str, | |
| } | |
| async def close(self): | |
| """Close all server connections""" | |
| await self.mux.close() | |
| # Global instance | |
| _orchestrator = None | |
| async def get_orchestrator() -> GhostMaloneMux: | |
| """Get or create the global orchestrator""" | |
| global _orchestrator | |
| if _orchestrator is None: | |
| _orchestrator = GhostMaloneMux() | |
| await _orchestrator.initialize() | |
| return _orchestrator | |