#!/usr/bin/env python3
import os
import json
import sys
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
from dotenv import load_dotenv
import autogen
from autogen import (
Agent, AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager,
ConversableAgent
)
try:
from autogen import TeachableAgent, RetrieveUserProxyAgent
except ImportError:
# Fallback for older versions
TeachableAgent = None
RetrieveUserProxyAgent = None
import mcp.types as types
from mcp import server
from .agents import AgentManager
from .config import ServerConfig, AgentConfig
from .workflows import WorkflowManager
load_dotenv()
class EnhancedAutoGenServer:
def __init__(self):
"""Initialize the enhanced AutoGen MCP server."""
self.config_path = os.getenv("AUTOGEN_MCP_CONFIG")
if not self.config_path:
# Create a default config if not specified
self.config = {
"llm_config": {
"config_list": [
{
"model": "gpt-4o",
"api_key": os.getenv("OPENAI_API_KEY")
}
],
"temperature": 0.7
},
"code_execution_config": {
"work_dir": "coding",
"use_docker": False
}
}
else:
with open(self.config_path) as f:
self.config = json.load(f)
self.server_config = ServerConfig(
default_llm_config=self.config.get("llm_config"),
default_code_execution_config=self.config.get("code_execution_config")
)
self.agent_manager = AgentManager()
self.workflow_manager = WorkflowManager()
self.chat_history = []
self.resource_cache = {}
# Enhanced capabilities
self.capabilities = {
"tools": True,
"prompts": True,
"resources": True,
"sampling": False # Can be enabled if needed
}
async def handle_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tool calls with enhanced AutoGen features."""
try:
if tool_name == "create_agent":
return await self._create_agent(arguments)
elif tool_name == "create_workflow":
return await self._create_workflow(arguments)
elif tool_name == "execute_chat":
return await self._execute_chat(arguments)
elif tool_name == "execute_group_chat":
return await self._execute_group_chat(arguments)
elif tool_name == "execute_nested_chat":
return await self._execute_nested_chat(arguments)
elif tool_name == "execute_swarm":
return await self._execute_swarm(arguments)
elif tool_name == "execute_workflow":
return await self._execute_workflow(arguments)
elif tool_name == "manage_agent_memory":
return await self._manage_agent_memory(arguments)
elif tool_name == "configure_teachability":
return await self._configure_teachability(arguments)
elif tool_name == "get_agent_status":
return await self._get_agent_status(arguments)
elif tool_name == "get_resource":
return await self._get_resource(arguments)
else:
return {"error": f"Unknown tool: {tool_name}"}
except Exception as e:
return {"error": str(e)}
async def _create_agent(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create an enhanced AutoGen agent."""
name = args["name"]
agent_type = args["type"]
system_message = args.get("system_message", "You are a helpful AI assistant.")
llm_config = args.get("llm_config", self.server_config.default_llm_config)
code_execution_config = args.get("code_execution_config", self.server_config.default_code_execution_config)
human_input_mode = args.get("human_input_mode", "NEVER")
tools = args.get("tools", [])
teachability_config = args.get("teachability", {})
try:
if agent_type == "assistant":
agent = AssistantAgent(
name=name,
system_message=system_message,
llm_config=llm_config,
human_input_mode=human_input_mode,
)
elif agent_type == "user_proxy":
agent = UserProxyAgent(
name=name,
system_message=system_message,
code_execution_config=code_execution_config,
human_input_mode=human_input_mode,
)
elif agent_type == "conversable":
agent = ConversableAgent(
name=name,
system_message=system_message,
llm_config=llm_config,
human_input_mode=human_input_mode,
)
elif agent_type == "teachable" and TeachableAgent:
agent = TeachableAgent(
name=name,
system_message=system_message,
llm_config=llm_config,
teach_config=teachability_config,
)
elif agent_type == "retrievable" and RetrieveUserProxyAgent:
agent = RetrieveUserProxyAgent(
name=name,
system_message=system_message,
code_execution_config=code_execution_config,
retrieve_config=args.get("retrieve_config", {}),
)
else:
return {"error": f"Unknown or unsupported agent type: {agent_type}"}
# Register tools if provided
if tools:
for tool in tools:
if hasattr(agent, 'register_for_execution'):
agent.register_for_execution(name=tool["name"])(tool["function"])
self.agent_manager.add_agent(name, agent)
return {
"success": True,
"message": f"Agent '{name}' created successfully",
"agent_info": {
"name": name,
"type": agent_type,
"capabilities": {
"llm_enabled": hasattr(agent, "llm_config") and agent.llm_config is not None,
"code_execution": hasattr(agent, "code_execution_config") and agent.code_execution_config is not False,
"teachable": agent_type == "teachable",
"tools_count": len(tools)
}
}
}
except Exception as e:
return {"error": f"Failed to create agent: {str(e)}"}
async def _create_workflow(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a complete multi-agent workflow."""
workflow_name = args["workflow_name"]
workflow_type = args["workflow_type"]
agents_config = args["agents"]
task_description = args["task_description"]
max_rounds = args.get("max_rounds", 10)
termination_conditions = args.get("termination_conditions", ["TERMINATE"])
try:
# Create agents for the workflow
created_agents = []
for agent_config in agents_config:
agent_result = await self._create_agent(agent_config)
if agent_result.get("success"):
created_agents.append(agent_config["name"])
# Store workflow configuration
workflow_config = {
"name": workflow_name,
"type": workflow_type,
"agents": created_agents,
"task": task_description,
"max_rounds": max_rounds,
"termination_conditions": termination_conditions,
"created_at": datetime.now().isoformat()
}
self.workflow_manager.add_workflow(workflow_name, workflow_config)
return {
"success": True,
"message": f"Workflow '{workflow_name}' created with {len(created_agents)} agents",
"workflow_info": workflow_config
}
except Exception as e:
return {"error": f"Failed to create workflow: {str(e)}"}
async def _execute_chat(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute an enhanced chat between two agents."""
initiator_name = args["initiator"]
responder_name = args["responder"]
message = args["message"]
max_turns = args.get("max_turns", 10)
clear_history = args.get("clear_history", False)
summary_method = args.get("summary_method", "last_msg")
try:
initiator = self.agent_manager.get_agent(initiator_name)
responder = self.agent_manager.get_agent(responder_name)
if not initiator or not responder:
return {"error": "One or both agents not found"}
if clear_history:
if hasattr(initiator, 'clear_history'):
initiator.clear_history()
if hasattr(responder, 'clear_history'):
responder.clear_history()
# Execute the chat
chat_result = initiator.initiate_chat(
responder,
message=message,
max_turns=max_turns,
summary_method=summary_method
)
# Store chat history
chat_record = {
"timestamp": datetime.now().isoformat(),
"initiator": initiator_name,
"responder": responder_name,
"initial_message": message,
"result": str(chat_result),
"turns": max_turns
}
self.chat_history.append(chat_record)
return {
"success": True,
"chat_result": str(chat_result),
"summary": getattr(chat_result, 'summary', "Chat completed"),
"cost": getattr(chat_result, 'cost', None)
}
except Exception as e:
return {"error": f"Chat execution failed: {str(e)}"}
async def _execute_group_chat(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute an enhanced group chat."""
agent_names = args["agent_names"]
initiator_name = args["initiator"]
message = args["message"]
max_round = args.get("max_round", 10)
speaker_selection_method = args.get("speaker_selection_method", "auto")
allow_repeat_speaker = args.get("allow_repeat_speaker", True)
admin_name = args.get("admin_name", "Admin")
try:
agents = []
for name in agent_names:
agent = self.agent_manager.get_agent(name)
if agent:
agents.append(agent)
else:
return {"error": f"Agent '{name}' not found"}
initiator = self.agent_manager.get_agent(initiator_name)
if not initiator:
return {"error": f"Initiator agent '{initiator_name}' not found"}
# Create group chat with enhanced features
group_chat = GroupChat(
agents=agents,
messages=[],
max_round=max_round,
speaker_selection_method=speaker_selection_method,
allow_repeat_speaker=allow_repeat_speaker,
admin_name=admin_name
)
manager = GroupChatManager(
groupchat=group_chat,
llm_config=self.server_config.default_llm_config
)
# Execute group chat
chat_result = initiator.initiate_chat(manager, message=message)
# Store chat history
chat_record = {
"timestamp": datetime.now().isoformat(),
"type": "group_chat",
"agents": agent_names,
"initiator": initiator_name,
"initial_message": message,
"result": str(chat_result),
"rounds": max_round
}
self.chat_history.append(chat_record)
return {
"success": True,
"chat_result": str(chat_result),
"participants": agent_names,
"total_messages": len(group_chat.messages)
}
except Exception as e:
return {"error": f"Group chat execution failed: {str(e)}"}
async def _execute_nested_chat(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute nested conversations with hierarchical structures."""
primary_agent_name = args["primary_agent"]
secondary_agent_names = args["secondary_agents"]
task = args["task"]
max_turns = args.get("max_turns", 5)
nesting_depth = args.get("nesting_depth", 2)
try:
primary_agent = self.agent_manager.get_agent(primary_agent_name)
if not primary_agent:
return {"error": f"Primary agent '{primary_agent_name}' not found"}
secondary_agents = []
for name in secondary_agent_names:
agent = self.agent_manager.get_agent(name)
if agent:
secondary_agents.append(agent)
else:
return {"error": f"Secondary agent '{name}' not found"}
# Implementation for nested chat
nested_results = []
for depth in range(nesting_depth):
for secondary_agent in secondary_agents:
nested_task = f"Depth {depth + 1}: {task}"
result = primary_agent.initiate_chat(
secondary_agent,
message=nested_task,
max_turns=max_turns
)
nested_results.append({
"depth": depth + 1,
"secondary_agent": secondary_agent.name,
"result": str(result)
})
return {
"success": True,
"nested_results": nested_results,
"total_conversations": len(nested_results)
}
except Exception as e:
return {"error": f"Nested chat execution failed: {str(e)}"}
async def _execute_swarm(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a swarm-based multi-agent conversation."""
# This would be implemented based on specific swarm requirements
# For now, return a placeholder implementation
return {
"success": True,
"message": "Swarm execution not yet fully implemented",
"placeholder": True
}
async def _execute_workflow(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a predefined workflow."""
workflow_name = args["workflow_name"]
input_data = args["input_data"]
output_format = args.get("output_format", "json")
quality_checks = args.get("quality_checks", False)
try:
result = await self.workflow_manager.execute_workflow(
workflow_name, input_data, output_format, quality_checks
)
return {
"success": True,
"workflow": workflow_name,
"result": result,
"format": output_format
}
except Exception as e:
return {"error": f"Workflow execution failed: {str(e)}"}
async def _manage_agent_memory(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Manage agent memory and knowledge persistence."""
agent_name = args["agent_name"]
action = args["action"]
memory_type = args.get("memory_type", "conversation")
try:
agent = self.agent_manager.get_agent(agent_name)
if not agent:
return {"error": f"Agent '{agent_name}' not found"}
if action == "save":
# Save memory data
data = args.get("data", {})
# Implementation depends on agent type and memory system
return {"success": True, "message": f"Memory saved for {agent_name}"}
elif action == "load":
# Load memory data
return {"success": True, "memory_data": {}, "message": f"Memory loaded for {agent_name}"}
elif action == "clear":
# Clear memory
if hasattr(agent, 'clear_history'):
agent.clear_history()
return {"success": True, "message": f"Memory cleared for {agent_name}"}
elif action == "query":
# Query memory
query = args.get("query", "")
return {"success": True, "query_result": [], "message": f"Memory queried for {agent_name}"}
else:
return {"error": f"Unknown memory action: {action}"}
except Exception as e:
return {"error": f"Memory management failed: {str(e)}"}
async def _configure_teachability(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Configure teachability features for agents."""
agent_name = args["agent_name"]
enable_teachability = args["enable_teachability"]
try:
agent = self.agent_manager.get_agent(agent_name)
if not agent:
return {"error": f"Agent '{agent_name}' not found"}
# Implementation would depend on the specific teachability requirements
return {
"success": True,
"message": f"Teachability {'enabled' if enable_teachability else 'disabled'} for {agent_name}",
"teachability_enabled": enable_teachability
}
except Exception as e:
return {"error": f"Teachability configuration failed: {str(e)}"}
async def _get_agent_status(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Get detailed status and metrics for agents."""
agent_name = args.get("agent_name")
include_metrics = args.get("include_metrics", False)
include_memory = args.get("include_memory", False)
try:
if agent_name:
agent = self.agent_manager.get_agent(agent_name)
if not agent:
return {"error": f"Agent '{agent_name}' not found"}
agents = {agent_name: agent}
else:
agents = self.agent_manager.get_all_agents()
status_data = {}
for name, agent in agents.items():
status = {
"name": name,
"type": type(agent).__name__,
"active": True,
"capabilities": {
"llm_enabled": hasattr(agent, "llm_config") and agent.llm_config is not None,
"code_execution": hasattr(agent, "code_execution_config") and agent.code_execution_config is not False,
}
}
if include_metrics:
status["metrics"] = {
"total_messages": len(getattr(agent, "chat_messages", {})),
"conversations": 0 # Would need to track this
}
if include_memory:
status["memory"] = {
"history_length": len(getattr(agent, "chat_messages", {})),
"memory_size": 0 # Would need to calculate this
}
status_data[name] = status
return {
"success": True,
"agents": status_data,
"total_agents": len(status_data)
}
except Exception as e:
return {"error": f"Status retrieval failed: {str(e)}"}
async def _get_resource(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Get resource data for MCP resources."""
uri = args["uri"]
try:
if uri == "autogen://agents/list":
agents = self.agent_manager.get_all_agents()
agent_list = []
for name, agent in agents.items():
agent_list.append({
"name": name,
"type": type(agent).__name__,
"status": "active"
})
return {"agents": agent_list}
elif uri == "autogen://workflows/templates":
templates = {
"sequential": {
"description": "Sequential agent workflow",
"agents": ["initiator", "processor", "reviewer"],
"flow": "linear",
},
"group_chat": {
"description": "Group chat with multiple agents",
"agents": ["facilitator", "expert1", "expert2", "critic"],
"flow": "collaborative",
},
"hierarchical": {
"description": "Hierarchical workflow with manager",
"agents": ["manager", "worker1", "worker2", "validator"],
"flow": "top-down",
},
"available_workflows": list(self.workflow_manager._workflow_templates.keys())
}
return templates
elif uri == "autogen://chat/history":
# Return recent chat history
recent_history = self.chat_history[-10:] # Last 10 chats
history_text = ""
for chat in recent_history:
history_text += f"[{chat['timestamp']}] {chat['initiator']} -> {chat.get('responder', 'Group')}: {chat['initial_message'][:100]}...\n"
return {"content": history_text}
elif uri == "autogen://config/current":
return {
"llm_config": self.server_config.default_llm_config,
"code_execution_config": self.server_config.default_code_execution_config,
"capabilities": self.capabilities
}
else:
return {"error": f"Unknown resource URI: {uri}"}
except Exception as e:
return {"error": f"Resource retrieval failed: {str(e)}"}
# MCP-style tool handlers for compatibility
async def handle_create_agent(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle create_agent tool call."""
return await self._create_agent(arguments)
async def handle_delete_agent(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle delete_agent tool call."""
agent_name = arguments.get("name")
if not agent_name:
return {"content": [{"type": "text", "text": "Agent name is required"}]}
try:
self.agent_manager.agents.pop(agent_name, None)
return {"content": [{"type": "text", "text": f"Agent {agent_name} deleted successfully"}]}
except Exception as e:
return {"content": [{"type": "text", "text": f"Error deleting agent: {str(e)}"}]}
async def handle_list_agents(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle list_agents tool call."""
agents = self.agent_manager.list_agents()
return {"content": [{"type": "text", "text": f"Available agents: {', '.join(agents)}"}]}
async def handle_start_chat(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle start_chat tool call."""
return await self._execute_chat(arguments)
async def handle_send_message(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle send_message tool call."""
agent_name = arguments.get("agent_name")
message = arguments.get("message")
if not agent_name or not message:
return {"content": [{"type": "text", "text": "Agent name and message are required"}]}
try:
agent = self.agent_manager.get_agent(agent_name)
if not agent:
return {"content": [{"type": "text", "text": f"Agent {agent_name} not found"}]}
# Simulate message sending
response = f"Message sent to {agent_name}: {message}"
return {"content": [{"type": "text", "text": response}]}
except Exception as e:
return {"content": [{"type": "text", "text": f"Error sending message: {str(e)}"}]}
async def handle_get_chat_history(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get_chat_history tool call."""
history = self.chat_history[-10:] # Last 10 chats
history_text = "\n".join([f"[{chat['timestamp']}] {chat['initial_message'][:100]}..." for chat in history])
return {"content": [{"type": "text", "text": history_text or "No chat history available"}]}
async def handle_create_group_chat(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle create_group_chat tool call."""
return await self._execute_group_chat(arguments)
async def handle_execute_workflow(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle execute_workflow tool call."""
return await self._execute_workflow(arguments)
async def handle_teach_agent(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle teach_agent tool call."""
return await self._configure_teachability(arguments)
async def handle_save_conversation(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle save_conversation tool call."""
conversation_id = arguments.get("conversation_id", "default")
conversation_data = arguments.get("conversation_data", {})
try:
# Save conversation logic
self.resource_cache[f"conversation_{conversation_id}"] = {
"id": conversation_id,
"data": conversation_data,
"timestamp": datetime.now().isoformat()
}
return {"content": [{"type": "text", "text": f"Conversation {conversation_id} saved successfully"}]}
except Exception as e:
return {"content": [{"type": "text", "text": f"Error saving conversation: {str(e)}"}]}
def main():
"""Main function for command line execution."""
if len(sys.argv) != 3:
print(json.dumps({"error": "Usage: python server.py <tool_name> <arguments_json>"}))
sys.exit(1)
tool_name = sys.argv[1]
try:
arguments = json.loads(sys.argv[2])
except json.JSONDecodeError:
print(json.dumps({"error": "Invalid JSON arguments"}))
sys.exit(1)
# Create server instance
server = EnhancedAutoGenServer()
# Run the tool call
try:
result = asyncio.run(server.handle_tool_call(tool_name, arguments))
print(json.dumps(result))
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()