Yahoo Finance MCP for LLaMA 3.2 3B
by Shak2000
Verified
#!/usr/bin/env python3
"""
Test suite for Yahoo Finance MCP functionality.
This script runs 12 specific financial queries to test the MCP's ability to retrieve
various types of financial data from Yahoo Finance, displaying responses in a
conversational format similar to interacting with an LLM.
"""
import sys
import os
import io
import logging
import warnings
# Create a custom logger class that does nothing
class SilentLogger(logging.Logger):
def __init__(self, name):
super().__init__(name)
def debug(self, *args, **kwargs): pass
def info(self, *args, **kwargs): pass
def warning(self, *args, **kwargs): pass
def error(self, *args, **kwargs): pass
def critical(self, *args, **kwargs): pass
def log(self, *args, **kwargs): pass
def exception(self, *args, **kwargs): pass
# Register our custom logger class
logging.setLoggerClass(SilentLogger)
# Completely disable logging
logging.disable(logging.CRITICAL)
# Suppress all warnings
warnings.filterwarnings("ignore")
# Configure logging - MUST be done before any imports that might configure logging
logging.basicConfig(
level=logging.NOTSET, # Setting to NOTSET but we've disabled logging anyway
format='', # Empty format to minimize any accidental output
handlers=[] # No handlers
)
logging.getLogger().setLevel(logging.NOTSET)
# Disable all loggers that might exist or be created later
logging.Logger.manager.loggerDict = {}
# Monkey patch the critical logging function to do nothing
original_critical = logging.Logger.critical
def silent_critical(self, *args, **kwargs):
pass
logging.Logger.critical = silent_critical
# Redirect stdout during imports to suppress any print statements
original_stdout = sys.stdout
sys.stdout = io.StringIO()
# Now import modules that might produce output
import json
from yahoo_finance_mcp import YahooFinanceMCP
# After importing, replace the module's logger with our silent version
import yahoo_finance_mcp
yahoo_finance_mcp.logger = SilentLogger("yahoo_finance_mcp")
# Also replace any existing loggers
for name in logging.Logger.manager.loggerDict:
logging.Logger.manager.loggerDict[name] = SilentLogger(name)
# Restore stdout
sys.stdout = original_stdout
# Define a NullHandler that does nothing with log records
class NullHandler(logging.Handler):
def emit(self, record):
pass
# Add null handler to the root logger to prevent "No handlers found" warnings
logging.getLogger().addHandler(NullHandler())
def silence_function(mcp_function):
"""Decorator to silence any function call by redirecting stdout/stderr temporarily."""
def wrapper(*args, **kwargs):
# Save original configuration
old_stdout = sys.stdout
old_stderr = sys.stderr
old_log_disable = logging.disable
# Redirect stdout/stderr to capture any print statements
sys.stdout = io.StringIO()
sys.stderr = io.StringIO()
# Create a custom logging filter to block all messages
class BlockAllFilter(logging.Filter):
def filter(self, record):
return False
# Apply the filter to all loggers
block_filter = BlockAllFilter()
root_logger = logging.getLogger()
root_logger.addFilter(block_filter)
# Disable all logging
logging.disable(logging.CRITICAL)
try:
# Call the original function
result = mcp_function(*args, **kwargs)
return result
finally:
# Restore settings no matter what
sys.stdout = old_stdout
sys.stderr = old_stderr
# Remove our filter
root_logger.removeFilter(block_filter)
return wrapper
def print_conversational_result(query, result):
"""Print the result of a query in a conversational way, like an LLM would respond."""
print(f"\nUser: {query}")
if isinstance(result, dict) and "error" in result:
# Filter out any critical log details from error messages
error_msg = str(result["error"])
# Remove anything after the first occurrence of CRITICAL
if "CRITICAL" in error_msg:
error_msg = error_msg.split("CRITICAL")[0].strip()
# Simplify error message
error_msg = "Could not retrieve the requested information."
print(f"Assistant: I'm sorry, I couldn't retrieve that information. {error_msg}")
return
# Try to get the llm_response_template if available
template = result.get("llm_response_template", None)
if template:
print(f"Assistant: {template}")
else:
# Fallback formatting if template is not available
if "price" in result:
# Stock price query
print(f"Assistant: The current stock price for {result.get('company_name', result.get('symbol'))} ({result.get('symbol')}) is ${result.get('price', 'N/A'):.2f} per share.")
elif "value" in result:
# Single field query
print(f"Assistant: {result.get('field_description', result.get('field', 'Value'))} for {result.get('company_name', result.get('symbol'))} ({result.get('symbol')}): {result.get('value', 'N/A')}")
elif "fields" in result and isinstance(result["fields"], list):
# Multiple fields query
print(f"Assistant: For {result.get('company_name', result.get('symbol'))} ({result.get('symbol')}):")
for field in result["fields"]:
print(f"- {field.get('field_description', field.get('field', 'Value'))}: {field.get('value', 'N/A')}")
else:
# Generic fallback without raw data
print(f"Assistant: Here's the information for {result.get('symbol', 'the requested stock')}.")
def run_test_suite():
"""Run the test suite with the specified queries."""
print("Yahoo Finance MCP Test Suite - Conversational Responses\n")
# Ensure all loggers are using our silent logger
for name in logging.Logger.manager.loggerDict:
logging.Logger.manager.loggerDict[name] = SilentLogger(name)
# Replace the root logger
logging.root = SilentLogger("root")
# Ensure yahoo_finance_mcp logger is silent
import yahoo_finance_mcp
yahoo_finance_mcp.logger = SilentLogger("yahoo_finance_mcp")
# Initialize the MCP and silence its methods
mcp = YahooFinanceMCP()
mcp.get_stock_price = silence_function(mcp.get_stock_price)
mcp.get_stock_field = silence_function(mcp.get_stock_field)
mcp.get_multiple_stock_fields = silence_function(mcp.get_multiple_stock_fields)
mcp.get_stock_info = silence_function(mcp.get_stock_info)
mcp.get_stock_history = silence_function(mcp.get_stock_history)
# Test 1: What is Intel's current stock price?
query = "What is Intel's current stock price?"
result = mcp.get_stock_price("INTC")
print_conversational_result(query, result)
# Test 2: How much does one Spotify share cost?
query = "How much does one Spotify share cost?"
result = mcp.get_stock_price("SPOT")
print_conversational_result(query, result)
# Test 3: What is the beta of Meta?
query = "What is the beta of Meta?"
result = mcp.get_stock_field("META", "beta")
print_conversational_result(query, result)
# Test 4: What is Walmart's P/E ratio?
query = "What is Walmart's P/E ratio?"
result = mcp.get_stock_field("WMT", "trailingPE")
print_conversational_result(query, result)
# Test 5: What is Ford's market capitalization?
query = "What is Ford's market capitalization?"
result = mcp.get_stock_field("F", "marketCap")
print_conversational_result(query, result)
# Test 6: What are Amazon's P/E ratio and market capitalization?
query = "What are Amazon's P/E ratio and market capitalization?"
result = mcp.get_multiple_stock_fields("AMZN", ["trailingPE", "marketCap"])
print_conversational_result(query, result)
# Test 7: What is Nike's 52-week low stock price?
query = "What is Nike's 52-week low stock price?"
result = mcp.get_stock_field("NKE", "fiftyTwoWeekLow")
print_conversational_result(query, result)
# Test 8: What is Google's 52-week high stock price?
query = "What is Google's 52-week high stock price?"
result = mcp.get_stock_field("GOOGL", "fiftyTwoWeekHigh")
print_conversational_result(query, result)
# Test 9: What are Disney's 52-week low stock price and 52-week high stock price?
query = "What are Disney's 52-week low stock price and 52-week high stock price?"
result = mcp.get_multiple_stock_fields("DIS", ["fiftyTwoWeekLow", "fiftyTwoWeekHigh"])
print_conversational_result(query, result)
# Test 10: What is the Coca Cola dividend yield?
query = "What is the Coca Cola dividend yield?"
result = mcp.get_stock_field("KO", "dividendYield")
print_conversational_result(query, result)
# Test 11: What is Apple's EPS?
query = "What is Apple's EPS?"
# Fix field name for Apple's EPS - try each possible field name
try:
# First try trailingEps
result = mcp.get_stock_field("AAPL", "trailingEps")
if "error" in result:
# If error, try different field name: eps
result = mcp.get_stock_field("AAPL", "eps")
if "error" in result:
# If still error, try different field name: epsTrailingTwelveMonths
result = mcp.get_stock_field("AAPL", "epsTrailingTwelveMonths")
except Exception as e:
result = {"error": f"Failed to get Apple's EPS: {str(e)}"}
print_conversational_result(query, result)
# Test 12: What are the stock price and the beta of General Motors?
query = "What are the stock price and the beta of General Motors?"
# For GM's price and beta, first get current price and then beta separately if needed
try:
# Get stock price
price_result = mcp.get_stock_price("GM")
# Get beta
beta_result = mcp.get_stock_field("GM", "beta")
# Combine results
combined_result = {
"symbol": "GM",
"company_name": price_result.get("company_name", "General Motors"),
"llm_response_template": f"The current stock price for {price_result.get('company_name', 'General Motors')} (GM) is ${price_result.get('price', 'N/A'):.2f} per share. The beta value, which measures volatility relative to the market, is {beta_result.get('value', 'N/A')}."
}
result = combined_result
except Exception as e:
result = {"error": f"Failed to get General Motors data: {str(e)}"}
print_conversational_result(query, result)
if __name__ == "__main__":
try:
# Silence all logging before we begin
logging.disable(logging.CRITICAL)
# Create a stream handler that captures stderr
stderr_capture = io.StringIO()
stderr_handler = logging.StreamHandler(stderr_capture)
stderr_handler.setLevel(logging.NOTSET)
# Add a filter to prevent any logging
class BlockAllFilter(logging.Filter):
def filter(self, record):
return False
stderr_handler.addFilter(BlockAllFilter())
# Add the handler to the root logger
root_logger = logging.getLogger()
for handler in list(root_logger.handlers):
root_logger.removeHandler(handler)
root_logger.addHandler(stderr_handler)
# Run with all output suppressed except our explicit prints
run_test_suite()
except Exception as e:
# Only print the error message without any log details
error_msg = str(e)
# Filter out any log-like content
if ": CRITICAL :" in error_msg:
error_msg = error_msg.split(": CRITICAL :")[0]
print(f"Error: {error_msg}")
sys.exit(1)