Skip to main content
Glama

Fledge MCP Server

by Krupalp525
secure_server.py9.55 kB
import aiohttp from aiohttp import web import requests import json import subprocess import logging import secrets import os from datetime import datetime, timedelta import random # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("SecureFledgeMCP") # Fledge API base URL (adjust if different) FLEDGE_API = "http://localhost:8081/fledge" # Mock data store for subscriptions (in-memory for simplicity) subscriptions = {} # Authentication configuration API_KEY_FILE = "api_key.txt" API_KEY_HEADER = "X-API-Key" # Create API key if it doesn't exist def ensure_api_key(): """Ensure an API key exists, generate one if not.""" if os.path.exists(API_KEY_FILE): with open(API_KEY_FILE, "r") as f: key = f.read().strip() logger.info(f"Using existing API key from {API_KEY_FILE}") return key else: # Generate a secure API key key = secrets.token_urlsafe(32) with open(API_KEY_FILE, "w") as f: f.write(key) logger.info(f"Generated new API key and saved to {API_KEY_FILE}") return key API_KEY = ensure_api_key() # Middleware to check API key @web.middleware async def auth_middleware(request, handler): """Middleware to validate API key.""" if request.path == "/health": # Health check doesn't require authentication return await handler(request) api_key = request.headers.get(API_KEY_HEADER) if not api_key or api_key != API_KEY: return web.json_response( {"error": "Invalid or missing API key"}, status=401 ) return await handler(request) async def handle_tool_call(request): """Handle incoming tool calls from Cursor.""" data = await request.json() tool_name = data.get("name") params = data.get("parameters", {}) logger.info(f"Received tool call: {tool_name} with params: {params}") try: # Data Access and Management Tools if tool_name == "get_sensor_data": sensor_id = params.get("sensor_id") time_range = params.get("time_range") limit = params.get("limit", 100) if not sensor_id: return web.json_response({"error": "sensor_id required"}, status=400) url = f"{FLEDGE_API}/asset/{sensor_id}?limit={limit}" if time_range: url += f"&time_range={time_range}" response = requests.get(url) return web.json_response(response.json()) elif tool_name == "list_sensors": response = requests.get(f"{FLEDGE_API}/asset") return web.json_response(response.json()) elif tool_name == "ingest_test_data": sensor_id = params.get("sensor_id") value = params.get("value") count = params.get("count", 1) if not sensor_id or value is None: return web.json_response({"error": "sensor_id and value required"}, status=400) for _ in range(count): payload = {"asset": sensor_id, "timestamp": "now", "readings": {"value": value}} requests.post(f"{FLEDGE_API}/south/ingest", json=payload) return web.json_response({"result": f"Ingested {count} data points"}) # Fledge Service Control Tools elif tool_name == "get_service_status": response = requests.get(f"{FLEDGE_API}/service") return web.json_response(response.json()) elif tool_name == "start_stop_service": service_type = params.get("service_type") action = params.get("action") if not service_type or action not in ["start", "stop"]: return web.json_response({"error": "Invalid service_type or action"}, status=400) subprocess.run(["fledge", action, service_type], check=True) return web.json_response({"result": f"{service_type} {action}ed"}) elif tool_name == "update_config": config_key = params.get("config_key") value = params.get("value") if not config_key or value is None: return web.json_response({"error": "config_key and value required"}, status=400) payload = {config_key: value} response = requests.put(f"{FLEDGE_API}/category/core", json=payload) return web.json_response(response.json()) # Frontend Code Generation Tools elif tool_name == "generate_ui_component": component_type = params.get("component_type") sensor_id = params.get("sensor_id", "example_sensor") framework = params.get("framework", "react") if component_type == "chart": code = f""" import React, {{ useEffect, useState }} from 'react'; import {{ Line }} from 'react-chartjs-2'; import axios from 'axios'; const {sensor_id}Chart = () => {{ const [data, setData] = useState({{ labels: [], datasets: [] }}); useEffect(() => {{ axios.get('http://localhost:8081/fledge/asset/{sensor_id}') .then(res => {{ const readings = res.data; setData({{ labels: readings.map(r => r.timestamp), datasets: [{{ label: '{sensor_id}', data: readings.map(r => r.readings.value) }}] }}); }}); }}, []); return <Line data={{data}} />; }}; export default {sensor_id}Chart; """ return web.json_response({"code": code}) return web.json_response({"error": "Unsupported component_type"}, status=400) elif tool_name == "fetch_sample_frontend": framework = params.get("framework", "react") # Simplified: return a basic template code = f"// Sample {framework} frontend for Fledge\nconsole.log('Hello Fledge');" return web.json_response({"code": code}) # Real-Time Data Streaming Tools elif tool_name == "subscribe_to_sensor": sensor_id = params.get("sensor_id") interval = params.get("interval", 5) if not sensor_id: return web.json_response({"error": "sensor_id required"}, status=400) subscriptions[sensor_id] = interval # Store subscription return web.json_response({"result": f"Subscribed to {sensor_id} every {interval}s"}) elif tool_name == "get_latest_reading": sensor_id = params.get("sensor_id") if not sensor_id: return web.json_response({"error": "sensor_id required"}, status=400) response = requests.get(f"{FLEDGE_API}/asset/{sensor_id}?limit=1") return web.json_response(response.json()[0]) # Debugging and Validation Tools elif tool_name == "validate_api_connection": try: response = requests.get(f"{FLEDGE_API}/ping") return web.json_response({"result": f"API reachable, version {response.json()['version']}"}) except Exception as e: return web.json_response({"error": f"API unreachable: {str(e)}"}, status=503) elif tool_name == "simulate_frontend_request": endpoint = params.get("endpoint") method = params.get("method", "GET") payload = params.get("payload", {}) if not endpoint: return web.json_response({"error": "endpoint required"}, status=400) url = f"{FLEDGE_API}{endpoint}" response = requests.request(method, url, json=payload) return web.json_response(response.json()) # Documentation and Schema Tools elif tool_name == "get_api_schema": # Simplified: return known endpoints schema = {"endpoints": ["/asset", "/service", "/south/ingest"]} return web.json_response(schema) elif tool_name == "list_plugins": response = requests.get(f"{FLEDGE_API}/plugin") return web.json_response(response.json()) # Advanced AI-Assisted Features elif tool_name == "suggest_ui_improvements": code = params.get("code", "") suggestions = ["Add error handling for API calls"] if "try" not in code else ["Looks good!"] return web.json_response({"suggestions": suggestions}) elif tool_name == "generate_mock_data": sensor_id = params.get("sensor_id", "mock_sensor") count = params.get("count", 10) mock_data = [ {"timestamp": (datetime.now() - timedelta(seconds=i)).isoformat(), "readings": {"value": random.uniform(20, 30)}} for i in range(count) ] return web.json_response(mock_data) else: return web.json_response({"error": "Unknown tool"}, status=404) except Exception as e: logger.error(f"Error in {tool_name}: {str(e)}") return web.json_response({"error": str(e)}, status=500) async def health_check(request): """Simple health check endpoint.""" return web.Response(text="Secure Fledge MCP Server is running") # Set up the server with middleware app = web.Application(middlewares=[auth_middleware]) app.router.add_post("/tools", handle_tool_call) app.router.add_get("/health", health_check) if __name__ == "__main__": logger.info(f"Starting Secure Fledge MCP Server on port 8082...") logger.info(f"API Key is required in the '{API_KEY_HEADER}' header for all requests") logger.info(f"API Key stored in {API_KEY_FILE}") web.run_app(app, host="localhost", port=8082)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Krupalp525/fledge-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server