import sys
import os
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Body
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
# Add project root to sys.path to allow importing from src
PROJECT_ROOT = Path(__file__).parents[2]
sys.path.append(str(PROJECT_ROOT))
from src.config_manager import ConfigManager
from src.graphiti_client import GraphitiClient
# ACE Manager 为可选依赖,避免未安装 ace-framework 时整个 Dashboard Backend 无法启动
try:
from src.ace_manager import ACEManager as AceManager # type: ignore
except ImportError:
AceManager = None # type: ignore
# Global instances
# Explicitly point to the user config file
CONFIG_PATH = Path.home() / ".graphitiace" / "config.json"
config_manager = ConfigManager(config_path=CONFIG_PATH)
graphiti_client: Optional[GraphitiClient] = None
ace_manager: Optional[AceManager] = None
init_error: Optional[str] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global graphiti_client, ace_manager, init_error
print("🚀 Starting Dashboard Backend...")
print(f"📂 Loading config from: {CONFIG_PATH}")
# Load config
config = config_manager.load_config()
print(f"🔑 Current Group ID: {config.group_id}")
# Initialize Graphiti Client
try:
graphiti_client = GraphitiClient(config_manager)
if graphiti_client.connect():
print("✅ Graphiti Client Connected")
# PROBE DATA
try:
with graphiti_client.driver.session() as session:
count = session.run("MATCH (n) RETURN count(n) as c").single()["c"]
print(f"🕵️ DATA PROBE: Found {count} nodes in the database at startup.")
except Exception as e:
print(f"⚠️ Data Probe Failed: {e}")
else:
error_msg = "Failed to connect to Neo4j. Check logs."
print(f"❌ {error_msg}")
init_error = f"Graphiti: {error_msg}"
except Exception as e:
print(f"⚠️ Graphiti Client Initialization Warning: {e}")
init_error = f"Graphiti: {str(e)}"
# Initialize ACE Manager
if graphiti_client and graphiti_client.is_connected() and AceManager is not None:
try:
ace_manager = AceManager(config_manager, graphiti_client)
print("✅ ACE Manager Initialized")
except Exception as e:
print(f"⚠️ ACE Manager Initialization Warning: {e}")
init_error = f"ACE: {str(e)}"
else:
print("⚠️ Skipping ACE Manager initialization because Graphiti Client is not connected")
yield
# Shutdown
if graphiti_client:
graphiti_client.disconnect()
print("👋 Dashboard Backend Stopped")
app = FastAPI(title="Graphiti Dashboard API", lifespan=lifespan)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # For dev, allow all. In prod, be specific.
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Models ---
class ConfigUpdate(BaseModel):
neo4j: Optional[Dict[str, Any]] = None
api: Optional[Dict[str, Any]] = None
group_id: Optional[str] = None
class EpisodeCreate(BaseModel):
content: str
metadata: Optional[Dict[str, Any]] = None
# --- Routes ---
@app.get("/api/health")
async def health_check():
"""
简单健康检查,用于 Dashboard 顶部心跳检测。
- Graphiti Core: 是否成功初始化 GraphitiClient
- Neo4j: 尝试使用 graphiti_client.is_connected 判断真实连接状态
- ACE: 检查是否初始化,且(如果可用)is_enabled()
"""
graphiti_ok = False
neo4j_ok = False
ace_ok = False
# 检查 Graphiti Client 是否就绪(配置/实例本身是否正常)
if graphiti_client is not None:
graphiti_ok = True
# 检查 Neo4j 连接状态
try:
if graphiti_client is not None and graphiti_client.is_connected():
neo4j_ok = True
except Exception as e:
print(f"⚠️ Neo4j health check failed: {e}")
neo4j_ok = False
# 检查 ACE 状态(如果已初始化)
try:
if ace_manager is not None:
# ACEManager 提供 is_enabled() 用于判断是否启用
if hasattr(ace_manager, "is_enabled"):
ace_ok = bool(ace_manager.is_enabled())
else:
ace_ok = True
except Exception as e:
print(f"⚠️ ACE health check failed: {e}")
ace_ok = False
status = "healthy"
# 三个组件全部不可用时为 unhealthy,只要有一个不健康则为 degraded
if not (graphiti_ok or neo4j_ok or ace_ok):
status = "unhealthy"
elif not (graphiti_ok and neo4j_ok and ace_ok):
status = "degraded"
return {
"status": status,
"components": {
"graphiti": graphiti_ok,
"neo4j": neo4j_ok,
"ace": ace_ok
}
}
# 1. Configuration Management
@app.get("/api/config")
async def get_config():
conf = config_manager.load_config().model_dump()
# Mask sensitive data
if conf.get('neo4j') and conf['neo4j'].get('password'):
conf['neo4j']['password'] = "******"
if conf.get('api') and conf['api'].get('api_key'):
conf['api']['api_key'] = "******"
return conf
async def reload_backend():
global graphiti_client, ace_manager
print("🔄 Reloading backend services...")
# Close existing
if graphiti_client:
graphiti_client.disconnect()
# Re-init Graphiti
try:
graphiti_client = GraphitiClient(config_manager)
if graphiti_client.connect():
print("✅ Graphiti Client Reconnected")
else:
print("❌ Graphiti Client Reconnection Failed")
graphiti_client = None
except Exception as e:
print(f"⚠️ Graphiti Re-init Warning: {e}")
graphiti_client = None
# Re-init ACE
if graphiti_client and graphiti_client.is_connected() and AceManager is not None:
try:
ace_manager = AceManager(config_manager, graphiti_client)
print("✅ ACE Manager Reinitialized")
except Exception as e:
print(f"⚠️ ACE Re-init Warning: {e}")
ace_manager = None
else:
ace_manager = None
@app.post("/api/config")
async def update_config(data: ConfigUpdate):
# Load FRESH config from disk to ensure we have the latest full data (including real passwords)
current_config_obj = config_manager.load_config()
current = current_config_obj.model_dump()
need_reload = False
# Neo4j Update
if data.neo4j:
if not current.get('neo4j'): current['neo4j'] = {}
# Check fields
for key, val in data.neo4j.items():
# Skip if value is mask or None/Empty (unless user explicitly cleared it, but here we assume empty means 'no change' for simplicity in this UI context)
if key == 'password' and (val == "******" or val == ""):
continue
# If value changed, update
if current['neo4j'].get(key) != val:
current['neo4j'][key] = val
if key in ['uri', 'username', 'password', 'database']:
need_reload = True
# API Update
if data.api:
if not current.get('api'): current['api'] = {}
for key, val in data.api.items():
if key == 'api_key' and (val == "******" or val == ""):
continue
if current['api'].get(key) != val:
current['api'][key] = val
# API change also needs reload usually to re-init graphiti core if key changed
if key in ['provider', 'api_key', 'model', 'base_url']:
need_reload = True
# Group ID Update
if data.group_id and data.group_id != current.get('group_id'):
current['group_id'] = data.group_id
# Group ID change doesn't technically require re-connection, but affects queries.
# Since we bypass group_id in global stats, it mostly affects ACE.
need_reload = True
# Save
from src.config_manager import ServerConfig
try:
new_config = ServerConfig(**current)
config_manager._config = new_config
config_manager.save_config()
if need_reload:
await reload_backend()
# Return masked config
response_conf = current.copy()
if response_conf.get('neo4j') and response_conf['neo4j'].get('password'):
response_conf['neo4j']['password'] = "******"
if response_conf.get('api') and response_conf['api'].get('api_key'):
response_conf['api']['api_key'] = "******"
return {"status": "updated", "config": response_conf}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 2. Graph Management
@app.get("/api/stats")
async def get_stats():
if not graphiti_client:
raise HTTPException(status_code=503, detail="Graphiti client not connected")
# Custom global stats query bypassing client's group_id filter
try:
print(f"🔍 Connecting to Neo4j at: {config_manager.load_config().neo4j.uri}")
with graphiti_client.driver.session() as session:
query = """
MATCH (n)
OPTIONAL MATCH ()-[r]->()
WITH count(n) as total_nodes, count(r) as total_rels
CALL {
MATCH (e:Episode)
RETURN count(e) as total_episodes
}
CALL {
MATCH (n)
WHERE labels(n)[0] <> 'Episode'
RETURN labels(n)[0] as type, count(*) as count
ORDER BY count DESC
LIMIT 10
}
RETURN
total_nodes,
total_rels,
total_episodes,
collect({type: type, count: count}) as node_types
"""
result = session.run(query)
record = result.single()
# Stats query successful
# Neo4j 在空库时,MATCH (n) 不会返回任何记录,record 可能为 None
if not record:
return {
"nodes": {
"total": 0,
"by_type": {}
},
"relationships": {
"total": 0
},
"episodes": {
"total": 0
},
"group_id": "Global (All Groups)"
}
node_types: Dict[str, int] = {}
if record.get("node_types"):
for item in record["node_types"]:
t = item.get("type")
c = item.get("count", 0)
if t:
node_types[t] = c
return {
"nodes": {
"total": record.get("total_nodes", 0),
"by_type": node_types
},
"relationships": {
"total": record.get("total_rels", 0)
},
"episodes": {
"total": record.get("total_episodes", 0)
},
"group_id": "Global (All Groups)"
}
except Exception as e:
print(f"Error fetching global stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/episodes")
async def add_episode(episode: EpisodeCreate):
if not graphiti_client:
raise HTTPException(status_code=503, detail="Graphiti client not connected")
# Reuse the logic similar to mcp tools
# For dashboard, we might assume API key is set or handle it
try:
# This is a simplified direct add.
# In a full implementation, we'd want to reuse the 'add_episode' logic from tools.py
# which handles entity extraction if API key is present.
# For now, just simple add to graph:
result = await graphiti_client.add_episode(
episode.content,
group_id=config_manager.config.group_id,
metadata=episode.metadata
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/search")
async def search_entities(query: str):
if not graphiti_client:
raise HTTPException(status_code=503, detail="Graphiti client not connected")
try:
with graphiti_client.driver.session() as session:
# Simple case-insensitive contains search on name or content
cypher = """
MATCH (n)
WHERE (toLower(n.name) CONTAINS toLower($q) OR toLower(n.content) CONTAINS toLower($q))
RETURN n, labels(n) as labels
LIMIT 20
"""
result = session.run(cypher, q=query)
entities = []
for record in result:
node = record["n"]
labels = record["labels"]
entity = dict(node)
entity["type"] = labels[0] if labels else "Unknown"
# Ensure UUID is present
if "uuid" not in entity:
entity["uuid"] = str(node.element_id) # Fallback if no uuid prop
entities.append(entity)
return entities
except Exception as e:
print(f"Error searching entities: {e}")
raise HTTPException(status_code=500, detail=str(e))
# 3. ACE Strategies
@app.get("/api/strategies")
async def get_strategies():
if not ace_manager:
detail = f"ACE manager not initialized. Error: {init_error}" if init_error else "ACE manager not initialized"
raise HTTPException(status_code=503, detail=detail)
return ace_manager.query_strategies()
@app.get("/api/strategies/stats")
async def get_strategy_stats():
if not ace_manager:
raise HTTPException(status_code=503, detail="ACE manager not initialized")
return ace_manager.get_strategy_stats()
@app.get("/api/strategies/trend")
async def get_strategy_trend(tool_name: str, days: int = 30):
if not graphiti_client:
return []
try:
with graphiti_client.driver.session() as session:
query = """
MATCH (t:StrategyTrend {tool_name: $tool_name})
WHERE t.date >= date() - duration({days: $days})
RETURN
toString(t.date) as date,
sum(t.usage_count) as usage_count,
sum(t.success_count) as success_count
ORDER BY date ASC
"""
result = session.run(query, tool_name=tool_name, days=days)
return [dict(record) for record in result]
except Exception as e:
print(f"Error fetching strategy trend: {e}")
return []
@app.get("/api/trends")
async def get_trends(days: int = 30):
if not graphiti_client:
return []
try:
with graphiti_client.driver.session() as session:
query = """
MATCH (e:Episode)
WHERE e.timestamp IS NOT NULL OR e.created_at IS NOT NULL
WITH coalesce(e.timestamp, e.created_at) as ts
WHERE date(ts) >= date() - duration({days: $days})
WITH date(ts) as date, count(*) as count
RETURN toString(date) as date, count as usage_count, 0 as success_count
ORDER BY date ASC
"""
# Note: Success count is hard to track globally without Strategy logs,
# 所以这里仅展示调用量趋势。
result = session.run(query, days=days)
return [dict(record) for record in result]
except Exception as e:
print(f"Error fetching global trends: {e}")
return []
@app.get("/api/neighbors")
def get_neighbors(uuid: str):
if not graphiti_client:
raise HTTPException(status_code=503, detail="Graphiti client not connected")
try:
with graphiti_client.driver.session() as session:
query = """
MATCH (n)
WHERE n.uuid = $uuid OR elementId(n) = $uuid
MATCH (n)-[r]-(m)
RETURN
type(r) as type,
startNode(r) = n as is_outgoing,
coalesce(m.uuid, elementId(m)) as neighbor_uuid,
m.name as neighbor_name,
labels(m) as neighbor_types
LIMIT 50
"""
result = session.run(query, uuid=uuid)
neighbors = []
for record in result:
neighbors.append({
"type": record["type"],
"direction": "OUT" if record["is_outgoing"] else "IN",
"neighbor": {
"uuid": record["neighbor_uuid"],
"name": record["neighbor_name"],
"types": record["neighbor_types"]
}
})
return neighbors
except Exception as e:
print(f"Error fetching neighbors: {e}")
return []
# 4. Episode Time Travel
@app.get("/api/episodes/list")
async def get_episodes_list(limit: int = 50):
if not graphiti_client:
return []
try:
with graphiti_client.driver.session() as session:
query = """
MATCH (e:Episode)
RETURN e, coalesce(e.timestamp, e.created_at) as ts
ORDER BY ts DESC
LIMIT $limit
"""
result = session.run(query, limit=limit)
episodes = []
for record in result:
node = record["e"]
episodes.append({
"uuid": node.get("uuid") or node.element_id,
"content": node.get("content", ""),
"timestamp": str(record["ts"] or ""),
"group_id": node.get("group_id")
})
return episodes
except Exception as e:
print(f"Error fetching episodes: {e}")
return []
@app.get("/api/episodes/{uuid}/graph")
async def get_episode_graph(uuid: str):
if not graphiti_client:
return {"nodes": [], "links": []}
try:
with graphiti_client.driver.session() as session:
# Query the episode and all entities it mentions or is connected to
# Use OPTIONAL MATCH so we get the episode even if it has no connections
query = """
MATCH (e)
WHERE e.uuid = $uuid OR elementId(e) = $uuid
OPTIONAL MATCH (e)-[r]-(n)
RETURN e, r, n
LIMIT 100
"""
result = session.run(query, uuid=uuid)
nodes = {}
links = []
for record in result:
source = record["e"]
target = record["n"]
rel = record["r"]
s_id = source.get("uuid") or source.element_id
if s_id not in nodes:
nodes[s_id] = {
"id": s_id,
"name": source.get("name") or "Episode",
"type": "Episode",
"content": source.get("content"),
"group": "center"
}
if target:
t_id = target.get("uuid") or target.element_id
if t_id not in nodes:
lbls = list(target.labels)
nodes[t_id] = {
"id": t_id,
"name": target.get("name") or target.get("content", "")[:20],
"type": lbls[0] if lbls else "Entity",
"group": "neighbor"
}
if rel:
links.append({
"source": s_id,
"target": t_id,
"label": type(rel).__name__
})
# Episode graph loaded successfully
return {
"nodes": list(nodes.values()),
"links": links
}
except Exception as e:
print(f"Error fetching episode graph: {e}")
return {"nodes": [], "links": []}
@app.get("/api/graph/by-label")
async def get_graph_by_label(label: str, limit: int = 50):
"""
根据节点 Label 抽样构建一个小图,用于 Dashboard 中「数据库」视图的标签点击预览。
"""
if not graphiti_client:
return {"nodes": [], "links": []}
try:
with graphiti_client.driver.session() as session:
query = """
MATCH (n)
WHERE $label IN labels(n)
WITH n LIMIT $limit
OPTIONAL MATCH (n)-[r]-(m)
RETURN n, r, m
"""
result = session.run(query, label=label, limit=limit)
nodes = {}
links = []
for record in result:
center = record["n"]
neighbor = record["m"]
rel = record["r"]
c_id = center.get("uuid") or center.element_id
if c_id not in nodes:
lbls = list(center.labels)
nodes[c_id] = {
"id": c_id,
"name": center.get("name") or center.get("content", "")[:20] or label,
"type": lbls[0] if lbls else label,
"group": "center",
"entity": dict(center)
}
if neighbor:
n_id = neighbor.get("uuid") or neighbor.element_id
if n_id not in nodes:
n_lbls = list(neighbor.labels)
nodes[n_id] = {
"id": n_id,
"name": neighbor.get("name") or neighbor.get("content", "")[:20] or "Neighbor",
"type": n_lbls[0] if n_lbls else "Entity",
"group": "neighbor",
"entity": dict(neighbor)
}
if rel:
links.append({
"source": c_id,
"target": n_id,
"label": type(rel).__name__
})
return {
"nodes": list(nodes.values()),
"links": links
}
except Exception as e:
print(f"Error fetching graph by label: {e}")
return {"nodes": [], "links": []}
@app.get("/api/graph/path")
async def get_shortest_path(start_uuid: str, end_uuid: str):
if not graphiti_client:
return {"nodes": [], "links": []}
# 检查起点和终点是否相同,避免 Neo4j shortestPath 报错
if start_uuid == end_uuid:
return {"nodes": [], "links": []}
try:
with graphiti_client.driver.session() as session:
# Find shortest path between two nodes
# Supports both uuid property and elementId
query = """
MATCH (start), (end)
WHERE (start.uuid = $start_uuid OR elementId(start) = $start_uuid)
AND (end.uuid = $end_uuid OR elementId(end) = $end_uuid)
AND start <> end
MATCH p = shortestPath((start)-[*]-(end))
RETURN p
"""
result = session.run(query, start_uuid=start_uuid, end_uuid=end_uuid)
record = result.single()
if not record:
return {"nodes": [], "links": []}
path = record["p"]
nodes = {}
links = []
for node in path.nodes:
n_id = node.get("uuid") or node.element_id
lbls = list(node.labels)
nodes[n_id] = {
"id": n_id,
"name": node.get("name") or node.get("content", "")[:20] or "Unknown",
"type": lbls[0] if lbls else "Node",
"group": "path",
"entity": dict(node)
}
# Ensure entity has uuid
if "uuid" not in nodes[n_id]["entity"]:
nodes[n_id]["entity"]["uuid"] = n_id
for rel in path.relationships:
s_id = rel.start_node.get("uuid") or rel.start_node.element_id
t_id = rel.end_node.get("uuid") or rel.end_node.element_id
links.append({
"source": s_id,
"target": t_id,
"label": type(rel).__name__,
"color": "#eab308" # Yellow for path
})
return {
"nodes": list(nodes.values()),
"links": links
}
except Exception as e:
print(f"Error fetching shortest path: {e}")
return {"nodes": [], "links": []}
# 5. Maintenance & Health
@app.get("/api/maintenance/health")
async def health_check_full():
if not graphiti_client:
raise HTTPException(status_code=503, detail="Graphiti client not connected")
issues = []
stats = {}
try:
with graphiti_client.driver.session() as session:
# 1. Check Orphaned Nodes (excluding Episodes)
r_orphans = session.run("""
MATCH (n)
WHERE NOT (n)--() AND labels(n)[0] <> 'Episode'
RETURN count(n) as count
""").single()
stats["orphaned_nodes"] = r_orphans["count"]
if stats["orphaned_nodes"] > 0:
issues.append({"type": "warning", "message": f"Found {stats['orphaned_nodes']} orphaned nodes (no relationships)."})
# 2. Check Duplicate Nodes (by Name & Label, simple check)
r_dupes = session.run("""
MATCH (n)
WHERE n.name IS NOT NULL
WITH n.name as name, labels(n) as labels, count(*) as c
WHERE c > 1
RETURN sum(c) - count(*) as dupes
""").single()
stats["duplicate_nodes"] = r_dupes["dupes"]
if stats["duplicate_nodes"] > 0:
issues.append({"type": "warning", "message": f"Found {stats['duplicate_nodes']} potentially duplicate nodes."})
# 3. Check Connectivity
stats["status"] = "healthy" if not issues else "warning"
return {"status": stats["status"], "stats": stats, "issues": issues}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/api/maintenance/cleanup/orphans")
async def cleanup_orphans():
if not graphiti_client:
raise HTTPException(status_code=503, detail="Graphiti client not connected")
try:
with graphiti_client.driver.session() as session:
result = session.run("""
MATCH (n)
WHERE NOT (n)--() AND labels(n)[0] <> 'Episode'
WITH collect(n) AS ns, size(ns) AS count
FOREACH (x IN ns | DELETE x)
RETURN count
""")
count = result.single()["count"]
return {"success": True, "message": f"Cleaned up {count} orphaned nodes."}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/maintenance/reset")
async def reset_database(confirm: bool = Body(embed=True)):
if not confirm:
raise HTTPException(status_code=400, detail="Confirmation required")
if not graphiti_client:
raise HTTPException(status_code=503, detail="Graphiti client not connected")
try:
with graphiti_client.driver.session() as session:
session.run("MATCH (n) DETACH DELETE n")
return {"success": True, "message": "Database has been completely reset."}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 6. Meta Information (Schema)
@app.get("/api/meta")
async def get_database_meta():
if not graphiti_client:
raise HTTPException(status_code=503, detail="Graphiti client not connected")
try:
with graphiti_client.driver.session() as session:
# 1. Labels
labels_res = session.run("""
CALL db.labels() YIELD label
CALL {
WITH label
MATCH (n) WHERE label in labels(n)
RETURN count(n) as count
}
RETURN label, count
ORDER BY count DESC
""")
labels = [{"name": r["label"], "count": r["count"]} for r in labels_res]
# 2. Relationship Types
rels_res = session.run("""
CALL db.relationshipTypes() YIELD relationshipType
CALL {
WITH relationshipType
MATCH ()-[r]->() WHERE type(r) = relationshipType
RETURN count(r) as count
}
RETURN relationshipType, count
ORDER BY count DESC
""")
types = [{"name": r["relationshipType"], "count": r["count"]} for r in rels_res]
# 3. DBMS Info
dbms_res = session.run("""
CALL dbms.components() YIELD name, versions, edition
RETURN name, versions[0] as version, edition
""").single()
user_res = session.run("CALL dbms.showCurrentUser() YIELD username, roles RETURN username, roles").single()
return {
"labels": labels,
"relationshipTypes": types,
"database": {
"name": dbms_res["name"] if dbms_res else "Neo4j",
"version": dbms_res["version"] if dbms_res else "Unknown",
"edition": dbms_res["edition"] if dbms_res else "Unknown",
"user": user_res["username"] if user_res else "Unknown"
}
}
except Exception as e:
print(f"Error fetching meta: {e}")
return {"labels": [], "relationshipTypes": [], "database": {}}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)