Project Content Server

from typing import Sequence from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import TextContent, ImageContent, EmbeddedResource from mcpagentai.core.logging import get_logger from mcpagentai.core.multi_tool_agent import MultiToolAgent # Sub-agents from mcpagentai.tools.calculator_agent import CalculatorAgent from mcpagentai.tools.currency_agent import CurrencyAgent from mcpagentai.tools.dictionary_agent import DictionaryAgent from mcpagentai.tools.eliza.agent import ElizaAgent from mcpagentai.tools.eliza.mcp_agent import ElizaMCPAgent from mcpagentai.tools.stock_agent import StockAgent from mcpagentai.tools.time_agent import TimeAgent #from mcpagentai.tools.twitter.api_agent import TwitterAgent # from mcpagentai.tools.twitter.client_agent import TwitterAgent from mcpagentai.tools.twitter.agent import TwitterAgent from mcpagentai.tools.weather_agent import WeatherAgent async def start_server(local_timezone: str | None = None) -> None: logger = get_logger("mcpagentai.server") logger.info("Starting MCPAgentAI server...") time_agent = TimeAgent(local_timezone=local_timezone) weather_agent = WeatherAgent() dictionary_agent = DictionaryAgent() calculator_agent = CalculatorAgent() currency_agent = CurrencyAgent() eliza_agent = ElizaAgent() eliza_mcp_agent = ElizaMCPAgent() stock_agent = StockAgent() twitter_agent = TwitterAgent() # Combine them into one aggregator multi_tool_agent = MultiToolAgent([ # time_agent, # weather_agent, # dictionary_agent, # calculator_agent, # currency_agent, # eliza_agent, # eliza_mcp_agent, # stock_agent, twitter_agent, ]) server = Server("mcpagentai") @server.list_tools() async def list_tools(): """ List all available tools. """ logger.debug("server.list_tools called") return multi_tool_agent.list_tools() @server.call_tool() async def call_tool(name: str, arguments: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """ Dispatch calls to the aggregator agent, which routes to the correct sub-agent. """ try: return multi_tool_agent.call_tool(name, arguments) except Exception as e: logger.exception("Error in call_tool") # Avoid using e.message. Use str(e) instead raise ValueError(f"Error processing request: {str(e)}") from e options = server.create_initialization_options() async with stdio_server() as (read_stream, write_stream): logger.info("Running server on stdio_server...") await server.run(read_stream, write_stream, options)