MCP Trader Server

  • src
# Standard library imports import argparse import asyncio from enum import Enum import json import logging import sys from pathlib import Path from typing import ( List, Optional, Dict, Any, cast, ) # Third-party imports try: from dotenv import load_dotenv from langchain.chat_models import init_chat_model from langchain.schema import ( AIMessage, BaseMessage, HumanMessage, SystemMessage, ) from langchain_core.runnables.base import Runnable from langchain_core.messages.tool import ToolMessage from langgraph.prebuilt import create_react_agent from langchain_mcp_tools import ( convert_mcp_to_langchain_tools, McpServerCleanupFn, ) except ImportError as e: print(f'\nError: Required package not found: {e}') print('Please ensure all required packages are installed\n') sys.exit(1) # Local application imports from config_loader import load_config # Type definitions ConfigType = Dict[str, Any] # ANSI color escape codes class Colors(str, Enum): YELLOW = '\033[33m' # color to yellow CYAN = '\033[36m' # color to cyan RESET = '\033[0m' # reset color def __str__(self): return self.value def parse_arguments() -> argparse.Namespace: """Parse and return command line args for config path and verbosity.""" parser = argparse.ArgumentParser( description='CLI Chat Application', formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument( '-c', '--config', default='llm_mcp_config.json5', help='path to config file', type=Path, metavar='PATH' ) parser.add_argument( '-v', '--verbose', action='store_true', help='run with verbose logging' ) return parser.parse_args() def init_logger(verbose: bool) -> logging.Logger: """Initialize and return a logger with appropriate verbosity level.""" logging.basicConfig( level=logging.DEBUG if verbose else logging.INFO, format='\x1b[90m[%(levelname)s]\x1b[0m %(message)s' ) return logging.getLogger() def print_colored(text: str, color: Colors, end: str = "\n") -> None: """Print text in specified color and reset afterwards.""" print(f"{color}{text}{Colors.RESET}", end=end) def set_color(color: Colors) -> None: """Set terminal color.""" print(color, end='') def clear_line() -> None: """Move up one line and clear it.""" print('\x1b[1A\x1b[2K', end='') async def get_user_query(remaining_queries: List[str]) -> Optional[str]: """Get user input or next example query, handling empty inputs and quit commands.""" set_color(Colors.YELLOW) query = input('Query: ').strip() if len(query) == 0: if len(remaining_queries) > 0: query = remaining_queries.pop(0) clear_line() print_colored(f'Example Query: {query}', Colors.YELLOW) else: set_color(Colors.RESET) print('\nPlease type a query, or "quit" or "q" to exit\n') return await get_user_query(remaining_queries) print(Colors.RESET) # Reset after input if query.lower() in ['quit', 'q']: print_colored('Goodbye!\n', Colors.CYAN) return None return query async def handle_conversation( agent: Runnable, messages: List[BaseMessage], example_queries: List[str], verbose: bool ) -> None: """Manage an interactive conversation loop between the user and AI agent. Args: agent (Runnable): The initialized ReAct agent that processes queries messages (List[BaseMessage]): List to maintain conversation history example_queries (List[str]): list of example queries that can be used when user presses Enter verbose (bool): Flag to control detailed output of tool responses Exception handling: - TypeError: Ensures response is in correct string format - General exceptions: Allows conversation to continue after errors The conversation continues until user types 'quit' or 'q'. """ print('\nConversation started. ' 'Type "quit" or "q" to end the conversation.\n') if len(example_queries) > 0: print('Example Queries (just type Enter to supply them one by one):') for ex_q in example_queries: print(f"- {ex_q}") print() while True: try: query = await get_user_query(example_queries) if not query: break messages.append(HumanMessage(content=query)) result = await agent.ainvoke({ 'messages': messages }) result_messages = cast(List[BaseMessage], result['messages']) # the last message should be an AIMessage response = result_messages[-1].content if not isinstance(response, str): raise TypeError( f"Expected string response, got {type(response)}" ) # check if msg one before is a ToolMessage message_one_before = result_messages[-2] if isinstance(message_one_before, ToolMessage): if verbose: # show tools call response print(message_one_before.content) # new line after tool call output print() print_colored(f"{response}\n", Colors.CYAN) messages.append(AIMessage(content=response)) except Exception as e: print(f'Error getting response: {str(e)}') print('You can continue chatting or type "quit" to exit.') async def init_react_agent( config: ConfigType, logger: logging.Logger ) -> tuple[Runnable, List[BaseMessage], McpServerCleanupFn]: """Initialize and configure a ReAct agent for conversation handling. Args: config (ConfigType): Configuration dictionary containing LLM and MCP server settings logger (logging.Logger): Logger instance for initialization status updates Returns: tuple[Runnable, List[BaseMessage], McpServerCleanupFn]: Returns a tuple containing: - Configured ReAct agent ready for conversation - Initial message list (empty or with system prompt) - Cleanup function for MCP server connections """ llm_config = config['llm'] logger.info(f'Initializing model... {json.dumps(llm_config, indent=2)}\n') llm = init_chat_model( model=llm_config['model'], model_provider=llm_config['model_provider'], temperature=llm_config['temperature'], max_tokens=llm_config['max_tokens'], ) mcp_configs = config['mcp_servers'] logger.info(f'Initializing {len(mcp_configs)} MCP server(s)...\n') tools, mcp_cleanup = await convert_mcp_to_langchain_tools( mcp_configs, logger ) agent = create_react_agent( llm, tools ) messages: List[BaseMessage] = [] system_prompt = llm_config.get('system_prompt') if system_prompt and isinstance(system_prompt, str): messages.append(SystemMessage(content=system_prompt)) return agent, messages, mcp_cleanup async def run() -> None: """Main async function to set up and run the simple chat app.""" mcp_cleanup: Optional[McpServerCleanupFn] = None try: load_dotenv() args = parse_arguments() logger = init_logger(args.verbose) config = load_config(args.config) example_queries = ( config.get('example_queries')[:] if config.get('example_queries') is not None else [] ) agent, messages, mcp_cleanup = await init_react_agent(config, logger) await handle_conversation( agent, messages, example_queries, args.verbose ) finally: if mcp_cleanup is not None: await mcp_cleanup() def main() -> None: """Entry point of the script.""" asyncio.run(run()) if __name__ == '__main__': main()