Fledge MCP Server
by Krupalp525
Verified
- fledge-mcp
- fledge_mcp
- tests
#!/usr/bin/env python3
"""
Test script for the Secure Fledge MCP Server.
This script demonstrates how to call the MCP server tools programmatically with API key auth.
"""
import requests
import json
import time
import os
# MCP Server endpoint
MCP_URL = "http://localhost:8082/tools"
API_KEY_FILE = "api_key.txt"
API_KEY_HEADER = "X-API-Key"
def get_api_key():
"""Read the API key from the file."""
if not os.path.exists(API_KEY_FILE):
print(f"Error: API key file {API_KEY_FILE} not found.")
print("Please start the secure_mcp_server.py first to generate an API key.")
return None
with open(API_KEY_FILE, "r") as f:
return f.read().strip()
def call_tool(name, parameters=None):
"""Call an MCP tool with authentication and return the result."""
api_key = get_api_key()
if not api_key:
return None
if parameters is None:
parameters = {}
payload = {
"name": name,
"parameters": parameters
}
headers = {
"Content-Type": "application/json",
API_KEY_HEADER: api_key
}
print(f"\n--- Calling {name} ---")
print(f"Parameters: {json.dumps(parameters, indent=2)}")
response = requests.post(MCP_URL, json=payload, headers=headers)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
result = response.json()
print(f"Response: {json.dumps(result, indent=2)}")
return result
else:
print(f"Error: {response.text}")
return None
def main():
"""Run a series of tool calls to demonstrate the secure MCP server."""
# First check if the server is running
try:
health = requests.get("http://localhost:8082/health")
if health.status_code != 200:
print("MCP Server is not running. Please start it with 'python secure_mcp_server.py'")
return
except requests.exceptions.ConnectionError:
print("MCP Server is not running. Please start it with 'python secure_mcp_server.py'")
return
print("Secure Fledge MCP Server Test Script")
print("===================================\n")
# Test basic API validation
call_tool("validate_api_connection")
# Test service status
call_tool("get_service_status")
# Test mock data generation
mock_data = call_tool("generate_mock_data", {
"sensor_id": "secure_sensor",
"count": 5
})
# Test listing sensors
call_tool("list_sensors")
# Test ingesting data
call_tool("ingest_test_data", {
"sensor_id": "secure_sensor",
"value": 23.7,
"count": 3
})
# Wait for data processing
time.sleep(1)
# Test subscription
call_tool("subscribe_to_sensor", {
"sensor_id": "secure_sensor",
"interval": 10
})
# Test UI component generation
call_tool("generate_ui_component", {
"component_type": "chart",
"sensor_id": "secure_sensor",
"framework": "react"
})
# Test API schema
call_tool("get_api_schema")
# Test UI code suggestions
call_tool("suggest_ui_improvements", {
"code": "function fetchData() { axios.get('/api'); }"
})
print("\nTest complete! All tools called successfully.")
if __name__ == "__main__":
main()