ghostMalone / utils /orchestrator.py
francischung222's picture
Initial commit
b11a5e9
raw
history blame
10.7 kB
# utils/orchestrator.py
# Coordinates emotion, memory, and reflection servers together
from typing import Dict, Any, Optional
import asyncio
import time
from utils.mcp_client import MCPMux
from utils.needs_lexicon import infer_needs
from utils.intervention_lexicon import (
get_interventions,
should_show_interventions,
format_interventions,
)
class GhostMaloneMux:
"""Orchestrator for Ghost Malone's three servers: emotion, memory, reflection"""
def __init__(self):
self.mux = MCPMux()
self.initialized = False
async def initialize(self):
"""Connect to all three MCP servers"""
if self.initialized:
return
print("[Orchestrator] Connecting to emotion server...")
await self.mux.connect_stdio(
"emotion", "python", args=["servers/emotion_server.py"]
)
print("[Orchestrator] Connected to emotion server.")
print("[Orchestrator] Connecting to memory server...")
await self.mux.connect_stdio(
"memory", "python", args=["servers/memory_server.py"]
)
print("[Orchestrator] Connected to memory server.")
print("[Orchestrator] Connecting to reflection server...")
await self.mux.connect_stdio(
"reflection", "python", args=["servers/reflection_server.py"]
)
print("[Orchestrator] Connected to reflection server.")
self.initialized = True
async def process_message(
self,
user_text: str,
conversation_context: list[dict] | None = None,
user_id: str = "user_001",
intervention_thresholds: dict | None = None,
) -> dict:
"""
Process a user message through the full pipeline:
1. Emotion analysis
2. Needs inference
3. Memory operations (parallel)
4. Reflection generation
5. Intervention generation (if thresholds met)
Args:
user_text: The user's message
conversation_context: Previous messages
user_id: User identifier
intervention_thresholds: Optional dict with min_messages, min_confidence, min_arousal
Returns dict with:
"user_text": str,
"emotion": dict (emotion analysis),
"inferred_needs": list (psychological needs detected),
"emotion_arc": dict (trajectory),
"response": str (assistant reply),
"tone": str
}
"""
if not self.initialized:
await self.initialize()
# Extract intervention thresholds (SIMPLIFIED defaults for demo)
thresholds = intervention_thresholds or {}
min_messages = thresholds.get("min_messages", 2) # Just need one exchange
min_confidence = thresholds.get("min_confidence", 0.70) # Lower bar
min_arousal = thresholds.get("min_arousal", 0.40) # Catch more cases
start_time = time.time()
# Initialize toolbox log
toolbox_log = []
# Step 1: Analyze emotion
t1 = time.time()
toolbox_log.append(
"🔧 **emotion_server.analyze()** - Detecting emotions from text"
)
emotion_data = await self.mux.call(
"analyze", {"text": user_text, "user_id": user_id}
)
elapsed = (time.time() - t1) * 1000
print(f"⏱️ Emotion analysis: {elapsed:.0f}ms")
# Parse emotion response (it comes as JSON string from MCP)
if isinstance(emotion_data, str):
import json
emotion_dict = json.loads(emotion_data)
else:
emotion_dict = emotion_data
toolbox_log.append(
f" ✅ Found: {emotion_dict.get('labels', [])} ({elapsed:.0f}ms)"
)
# Step 2: Infer psychological needs from emotion + context (fast, <100ms)
t2 = time.time()
toolbox_log.append(
"\n📖 **needs_lexicon.infer_needs()** - Detecting psychological needs"
)
emotion_labels = emotion_dict.get("labels", [])
valence = emotion_dict.get("valence", 0.0)
arousal = emotion_dict.get("arousal", 0.0)
inferred_needs = infer_needs(emotion_labels, user_text, valence, arousal)
elapsed = (time.time() - t2) * 1000
print(f"⏱️ Needs inference: {elapsed:.0f}ms")
if inferred_needs:
need_summary = ", ".join(
[f"{n['label']} ({n['confidence']:.0%})" for n in inferred_needs]
)
toolbox_log.append(f" ✅ Detected: {need_summary} ({elapsed:.0f}ms)")
else:
toolbox_log.append(f" ℹ️ No strong needs detected ({elapsed:.0f}ms)")
# Debug: Print detected needs
print(f"🎯 Inferred needs: {inferred_needs}")
if inferred_needs:
for need in inferred_needs:
print(f" - {need['icon']} {need['label']} ({need['confidence']:.0%})")
# Step 3 & 4: Parallelize memory operations (they don't depend on each other)
t3 = time.time()
toolbox_log.append("\n🧠 **memory_server** - Parallel operations:")
toolbox_log.append(" 📥 get_emotion_arc() - Retrieving emotion history")
toolbox_log.append(" 💾 remember_event() - Storing current message")
emotion_arc_task = self.mux.call("get_emotion_arc", {"k": 10})
event = {
"text": user_text,
"emotion": emotion_dict,
"role": "user",
"user_id": user_id,
}
remember_task = self.mux.call("remember_event", {"event": event})
# Run both memory operations in parallel
emotion_arc, _ = await asyncio.gather(emotion_arc_task, remember_task)
elapsed = (time.time() - t3) * 1000
print(f"⏱️ Memory operations (parallel): {elapsed:.0f}ms")
toolbox_log.append(f" ✅ Completed ({elapsed:.0f}ms)")
if isinstance(emotion_arc, str):
import json
emotion_arc = json.loads(emotion_arc)
# Step 5: Generate reflection using emotion + arc
t4 = time.time()
toolbox_log.append(
"\n🤖 **reflection_server.generate()** - Claude response generation"
)
tone = emotion_dict.get("tone", "neutral")
reflection_response = await self.mux.call(
"generate",
{
"text": user_text,
"context": conversation_context,
"tone": tone,
"emotion_arc": emotion_arc,
},
)
elapsed = (time.time() - t4) * 1000
print(f"⏱️ Reflection generation (Claude): {elapsed:.0f}ms")
toolbox_log.append(f" ✅ Response generated ({elapsed:.0f}ms)")
if isinstance(reflection_response, str):
import json
try:
response_dict = json.loads(reflection_response)
reply = response_dict.get("reply", reflection_response)
except Exception:
reply = reflection_response
else:
reply = str(reflection_response)
# Step 6: Generate interventions (SIMPLIFIED FOR DEMO)
toolbox_log.append(
"\n💡 **intervention_lexicon** - Checking intervention criteria"
)
message_count = len(conversation_context) + 1 if conversation_context else 1
arousal = emotion_dict.get("arousal", 0.5)
intervention_text = ""
# Simple rule: If we detected a need, check if we should show interventions
if inferred_needs and should_show_interventions(
confidence=inferred_needs[0]["confidence"],
message_count=message_count,
emotional_intensity=arousal,
min_messages=min_messages,
min_confidence=min_confidence,
min_arousal=min_arousal,
):
toolbox_log.append(
f" ✅ Thresholds met (msg≥{min_messages}, conf≥{min_confidence:.0%}, arousal≥{min_arousal:.1f})"
)
need_type = inferred_needs[0]["need"]
contexts = inferred_needs[0].get("matched_contexts", [])
interventions = get_interventions(need_type, contexts, limit=3)
if interventions:
toolbox_log.append(
f" 📋 get_interventions() - Retrieved {len(interventions)} strategies for {need_type}"
)
intervention_text = format_interventions(
need_type,
interventions,
inferred_needs[0]["confidence"],
emotion_arc=emotion_arc,
user_text=user_text,
)
print(f"💡 Showing {len(interventions)} interventions for {need_type}")
else:
reasons = []
if not inferred_needs:
reasons.append("no needs detected")
else:
if message_count < min_messages:
reasons.append(f"msg count {message_count}<{min_messages}")
if inferred_needs[0]["confidence"] < min_confidence:
reasons.append(
f"confidence {inferred_needs[0]['confidence']:.0%}<{min_confidence:.0%}"
)
if arousal < min_arousal:
reasons.append(f"arousal {arousal:.1f}<{min_arousal:.1f}")
toolbox_log.append(f" ℹ️ No interventions ({', '.join(reasons)})")
# Combine empathetic response + interventions
full_response = reply + intervention_text
total_time = time.time() - start_time
print(f"⏱️ TOTAL pipeline time: {total_time*1000:.0f}ms ({total_time:.1f}s)")
toolbox_log.append(f"\n⏱️ **Total pipeline:** {total_time*1000:.0f}ms")
# Format toolbox log
toolbox_log_str = "🧰 **Toolbox Activity:**\n\n" + "\n".join(toolbox_log)
return {
"user_text": user_text,
"emotion": emotion_dict,
"inferred_needs": inferred_needs,
"emotion_arc": emotion_arc,
"response": full_response,
"tone": tone,
"has_interventions": bool(intervention_text),
"toolbox_log": toolbox_log_str,
}
async def close(self):
"""Close all server connections"""
await self.mux.close()
# Global instance
_orchestrator = None
async def get_orchestrator() -> GhostMaloneMux:
"""Get or create the global orchestrator"""
global _orchestrator
if _orchestrator is None:
_orchestrator = GhostMaloneMux()
await _orchestrator.initialize()
return _orchestrator