LumiFAI MCP Technical Analysis Server

by Lumif-ai
Verified
from typing import Any import os import httpx from mcp.server.fastmcp import FastMCP import pandas as pd import datetime from ta.utils import dropna from ta.trend import EMAIndicator from dotenv import load_dotenv import logging import sys logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) load_dotenv() # Initialize FastMCP server mcp = FastMCP("weather") # DB Class from pymongo import MongoClient class MongoDB: client = None db = None @classmethod def init_client(cls): if not cls.client: cls.client = MongoClient(os.getenv("MONGODB_URI")) cls.db = cls.client.lumifai @classmethod def get_db(cls): if not cls.client: cls.init_client() return cls.db @classmethod def close_client(cls): if cls.client: cls.client.close() cls.client = None cls.db = None def fetch_agents_like_name(partial_name): db = MongoDB.get_db() collection = db.ai_agents # type: ignore pipeline = [ { '$search': { 'index': 'default_text_index', 'phrase': { 'query': partial_name, 'path': ['agent_name', 'base_token_symbol'] } } } ] agents = list(collection.aggregate(pipeline)) return agents # Helper functions def fetch_binance_ohlcv_data(agent: dict, time_ago: str, interval: int, interval_frequency: str) -> pd.DataFrame: """Fetch OHLCV data from MongoDB for Binance pairs""" try: db = MongoDB.get_db() collection = db.ohlcv_data # type: ignore # Convert time_ago to datetime since_time = pd.to_datetime(time_ago) # Construct the symbol from base and quote tokens symbol = f"{agent['base_token_symbol']}{agent['quote_token_symbol']}" # Map interval and frequency to Binance format (e.g., '1m', '1h', '1d') interval_map = { 'minutes': 'm', 'hours': 'h', 'days': 'd', 'weeks': 'w', 'months': 'M' } binance_interval = f"{interval}{interval_map.get(interval_frequency, 'm')}" # Query MongoDB pipeline = [ { "$match": { "metadata.symbol": symbol, "metadata.interval": binance_interval, "t": {"$gte": since_time} } }, { "$project": { "time": {"$toLong": "$t"}, "open": "$o", "high": "$h", "low": "$l", "close": "$c", "volume": "$v" } }, { "$sort": {"time": 1} } ] results = list(collection.aggregate(pipeline)) if not results: raise Exception(f"No data found for symbol {symbol} with interval {binance_interval}") # Convert to DataFrame df = pd.DataFrame(results) # Convert timestamp to seconds df['time'] = df['time'] // 1000 # Convert milliseconds to seconds return df except Exception as e: logger.error(f"Error fetching Binance OHLCV data: {str(e)}") raise finally: logger.info(f"Completed Binance OHLCV data fetch attempt for symbol {agent['base_token_symbol']}") @mcp.tool() def get_emas(agent_name: str, time_ago: str, interval: int, interval_frequency: str) -> pd.DataFrame: """ Calculate the Exponential Moving Average (EMA) of the given data. Returns the dataframe with two new columns added trend_ema_fast calculated with 12 periods and trend_ema_slow calculated with 26 periods. Args: agent_name (str): Name of the agent token for which the OHCLV data is to be fetched time_ago (datetime): Time since you want the historic data from as an ISO Date string (eg. 2019-08-31T15:47:06Z) interval (int): Interval of the OHCLV data to query interval_frequency (str): Frequency of the interval (eg. minutes, hours, days, weeks, months, years) """ import time start_time = time.time() logger.info(f"Starting EMA calculation for {agent_name}") # First, get the details from the sql table agents_fetched = fetch_agents_like_name(agent_name) # If there are no agents with that name, return an error if len(agents_fetched) == 0: raise Exception(f"No agents found with the name {agent_name}") # Use the first agent retrieved agent = agents_fetched[0] df = fetch_binance_ohlcv_data(agent, time_ago, interval, interval_frequency) if 'Exception' in df.columns: execution_time = time.time() - start_time error_msg = df['Exception'].iloc[0] logger.error(f"Error in EMA calculation after {execution_time:.2f} seconds: {error_msg}") return error_msg df["volume"] = df["volume"].map('{:.10f}'.format) try: df["trend_ema_fast"] = EMAIndicator( close=df["close"], window=12, fillna=True ).ema_indicator() df["trend_ema_slow"] = EMAIndicator( close=df["close"], window=26, fillna=True ).ema_indicator() df = dropna(df) except Exception as e: execution_time = time.time() - start_time logger.error(f"Error in EMA calculation after {execution_time:.2f} seconds: {str(e)}") return str(e) execution_time = time.time() - start_time logger.info(f"EMA calculation completed in {execution_time:.2f} seconds") return df @mcp.tool() def get_date_time() -> datetime.datetime: """Returns the current date and time""" return datetime.datetime.now() if __name__ == "__main__": # Initialize and run the server try: mcp.run(transport='sse') except Exception as e: print(f"An error occurred: {e}", file=sys.stderr)