Skip to main content
Glama
main.py12.3 kB
#!/usr/bin/env python3 """ FastAPI app to import Dexcom Stelo/Clarity CSVs into SQLite with auto-migrations. Now with MCP JSON-RPC protocol support for Simtheory.ai. Run with: uvicorn main:app --host 0.0.0.0 --port 8085 """ import os import csv import sqlite3 import io import math from datetime import datetime, timezone from typing import Optional, List, Tuple, Dict, Any from fastapi import FastAPI, UploadFile, File, HTTPException, Query, Request from fastapi.responses import JSONResponse from dateutil import parser as dtparser DB_PATH = os.environ.get("DB_PATH", os.path.expanduser("~/data/glucose.db")) DB_DIR = os.path.dirname(DB_PATH) LATEST_SCHEMA_VERSION = 2 app = FastAPI(title="Dexcom Stelo/Clarity -> SQLite importer", version="1.0") os.makedirs(DB_DIR, exist_ok=True) # --- SQLite & Migrations --- def get_conn(): conn = sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) conn.row_factory = sqlite3.Row return conn def _apply_migration_v1(conn): cur = conn.cursor() cur.execute("CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value TEXT);") cur.execute(""" CREATE TABLE IF NOT EXISTS glucose ( timestamp_unix INTEGER PRIMARY KEY, glucose_mg_dl INTEGER NOT NULL, source TEXT DEFAULT 'clarity', created_at TEXT DEFAULT (datetime('now')) );""") cur.execute("INSERT OR IGNORE INTO meta(key, value) VALUES ('schema_version', '1');") conn.commit() def _apply_migration_v2(conn): cur = conn.cursor() cur.execute("PRAGMA table_info(glucose);") cols = [r["name"] for r in cur.fetchall()] if "notes" not in cols: cur.execute("ALTER TABLE glucose ADD COLUMN notes TEXT;") cur.execute("UPDATE meta SET value='2' WHERE key='schema_version';") conn.commit() MIGRATIONS = {1: _apply_migration_v1, 2: _apply_migration_v2} def ensure_migrations(): conn = get_conn() try: cur = conn.cursor() cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='meta';") if cur.fetchone() is None: _apply_migration_v1(conn) cur.execute("SELECT value FROM meta WHERE key='schema_version';") row = cur.fetchone() current = int(row["value"]) if row else 0 for v in range(current + 1, LATEST_SCHEMA_VERSION + 1): if fn := MIGRATIONS.get(v): fn(conn) finally: conn.close() ensure_migrations() # --- Parsing --- def parse_timestamp_to_unix(ts_str: str) -> int: dt = dtparser.parse(ts_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return int(dt.astimezone(timezone.utc).timestamp()) def detect_columns(header_row): low = [h.strip().lower() for h in header_row] ts_idx = gl_idx = None for i, h in enumerate(low): if any(x in h for x in ["timestamp", "date", "time", "datetime"]): ts_idx = i if any(x in h for x in ["glucose", "mg/dl", "value", "sgv"]): gl_idx = i if ts_idx is None or gl_idx is None: raise ValueError(f"Could not detect columns from: {header_row}") return ts_idx, gl_idx def parse_clarity_csv_content(content: bytes): text = content.decode("utf-8-sig") reader = csv.reader(io.StringIO(text)) rows = list(reader) if not rows: return [] header_row = None first_data_idx = 1 for i, r in enumerate(rows): if r and any(cell.strip() for cell in r): header_row = r first_data_idx = i + 1 break if not header_row: return [] ts_idx, gl_idx = detect_columns(header_row) parsed = {} for r in rows[first_data_idx:]: if not r or len(r) <= max(ts_idx, gl_idx): continue try: ts_raw, gl_raw = r[ts_idx].strip(), r[gl_idx].strip() if ts_raw and gl_raw: parsed[parse_timestamp_to_unix(ts_raw)] = int(float(gl_raw)) except: continue return sorted(parsed.items()) def insert_glucose_rows(rows, source="clarity"): if not rows: return {"total": 0, "inserted": 0, "skipped_existing": 0} conn = get_conn() total_before = conn.execute("SELECT count(*) FROM glucose").fetchone()[0] now_str = datetime.utcnow().isoformat() conn.executemany( "INSERT OR IGNORE INTO glucose(timestamp_unix, glucose_mg_dl, source, created_at) VALUES (?, ?, ?, ?)", [(ts, gl, source, now_str) for ts, gl in rows] ) conn.commit() total_after = conn.execute("SELECT count(*) FROM glucose").fetchone()[0] conn.close() inserted = total_after - total_before return {"total": len(rows), "inserted": inserted, "skipped_existing": len(rows) - inserted} # --- Stats --- def query_range(start_unix, end_unix): conn = get_conn() rows = conn.execute( "SELECT timestamp_unix, glucose_mg_dl FROM glucose WHERE timestamp_unix BETWEEN ? AND ? ORDER BY timestamp_unix", (start_unix, end_unix) ).fetchall() conn.close() return [(r[0], r[1]) for r in rows] def compute_stats(rows): if not rows: return {} values = [gl for _, gl in rows] mean = sum(values) / len(values) sd = math.sqrt(sum((v - mean)**2 for v in values) / len(values)) if len(values) > 1 else 0 cv = (sd / mean * 100) if mean else 0 gmi = 3.31 + 0.02392 * mean tir = sum(1 for v in values if 70 <= v <= 180) / len(values) * 100 tbr = sum(1 for v in values if v < 70) / len(values) * 100 tar = sum(1 for v in values if v > 180) / len(values) * 100 return { "total_points": len(values), "mean_mg_dl": round(mean, 1), "sd_mg_dl": round(sd, 1), "cv_percent": round(cv, 1), "gmi": round(gmi, 1), "tir_percent": round(tir, 1), "tbr_percent": round(tbr, 1), "tar_percent": round(tar, 1) } # --- MCP JSON-RPC Protocol --- MCP_TOOLS = [ { "name": "get_latest_glucose", "description": "Get the most recent glucose readings within the specified hours", "inputSchema": { "type": "object", "properties": { "hours": {"type": "integer", "description": "Number of hours to look back (default: 24)", "default": 24} } } }, { "name": "get_glucose_range", "description": "Get glucose readings within a specific date/time range", "inputSchema": { "type": "object", "properties": { "start": {"type": "string", "description": "Start timestamp (ISO format or unix)"}, "end": {"type": "string", "description": "End timestamp (ISO format or unix)"} }, "required": ["start", "end"] } }, { "name": "get_glucose_stats", "description": "Get glucose statistics (mean, SD, CV, GMI, TIR, TBR, TAR) for the specified number of days", "inputSchema": { "type": "object", "properties": { "days": {"type": "integer", "description": "Number of days to calculate stats for (default: 14)", "default": 14} } } }, { "name": "get_health_status", "description": "Check the health status of the Stelo MCP service", "inputSchema": {"type": "object", "properties": {}} } ] def handle_tools_list(): return {"tools": MCP_TOOLS} def handle_tool_call(name: str, arguments: dict): if name == "get_latest_glucose": hours = arguments.get("hours", 24) now = int(datetime.now(timezone.utc).timestamp()) rows = query_range(now - hours * 3600, now) result = {"count": len(rows), "data": [{"ts": ts, "glucose": gl} for ts, gl in rows]} return {"content": [{"type": "text", "text": str(result)}]} elif name == "get_glucose_range": start = arguments.get("start", "") end = arguments.get("end", "") start_unix = int(start) if start.isdigit() else parse_timestamp_to_unix(start) end_unix = int(end) if end.isdigit() else parse_timestamp_to_unix(end) rows = query_range(start_unix, end_unix) result = {"count": len(rows), "data": [{"ts": ts, "glucose": gl} for ts, gl in rows]} return {"content": [{"type": "text", "text": str(result)}]} elif name == "get_glucose_stats": days = arguments.get("days", 14) now = int(datetime.now(timezone.utc).timestamp()) rows = query_range(now - days * 86400, now) result = {"days": days, "stats": compute_stats(rows)} return {"content": [{"type": "text", "text": str(result)}]} elif name == "get_health_status": result = {"status": "ok", "db_path": DB_PATH} return {"content": [{"type": "text", "text": str(result)}]} else: raise ValueError(f"Unknown tool: {name}") @app.post("/mcp") async def mcp_jsonrpc(request: Request): """MCP JSON-RPC endpoint for Simtheory.ai integration""" try: body = await request.json() except: return JSONResponse({"jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}, "id": None}) jsonrpc = body.get("jsonrpc", "2.0") method = body.get("method", "") params = body.get("params", {}) req_id = body.get("id") try: if method == "initialize": result = { "protocolVersion": "2024-11-05", "capabilities": {"tools": {"listChanged": False}}, "serverInfo": {"name": "stelo-workaround-mcp", "version": "1.0.0"} } elif method == "tools/list": result = handle_tools_list() elif method == "tools/call": tool_name = params.get("name", "") arguments = params.get("arguments", {}) result = handle_tool_call(tool_name, arguments) elif method == "ping": result = {} else: return JSONResponse({ "jsonrpc": "2.0", "error": {"code": -32601, "message": f"Method not found: {method}"}, "id": req_id }) return JSONResponse({"jsonrpc": "2.0", "result": result, "id": req_id}) except Exception as e: return JSONResponse({ "jsonrpc": "2.0", "error": {"code": -32603, "message": str(e)}, "id": req_id }) # --- REST Endpoints (kept for backward compatibility) --- @app.post("/import") async def import_csv(file: UploadFile = File(...)): content = await file.read() parsed = parse_clarity_csv_content(content) if not parsed: raise HTTPException(400, "No data parsed from file") result = insert_glucose_rows(parsed) return {"filename": file.filename, "result": result} @app.get("/glucose/latest") def latest(hours: int = 24): now = int(datetime.now(timezone.utc).timestamp()) rows = query_range(now - hours * 3600, now) return {"count": len(rows), "data": [{"ts": ts, "glucose": gl} for ts, gl in rows]} @app.get("/glucose/range") def range_query(start: str, end: str): start_unix = int(start) if start.isdigit() else parse_timestamp_to_unix(start) end_unix = int(end) if end.isdigit() else parse_timestamp_to_unix(end) rows = query_range(start_unix, end_unix) return {"count": len(rows), "data": [{"ts": ts, "glucose": gl} for ts, gl in rows]} @app.get("/glucose/stats") def stats(days: int = 14): now = int(datetime.now(timezone.utc).timestamp()) rows = query_range(now - days * 86400, now) return {"days": days, "stats": compute_stats(rows)} @app.get("/mcp") def mcp_info(): """GET endpoint returns MCP manifest info""" return { "id": "stelo-workaround-mcp", "name": "Dexcom Stelo Workaround MCP", "description": "Imports Dexcom Stelo/Clarity CSV exports into SQLite for glucose tracking", "version": "1.0.0", "protocol": "MCP JSON-RPC 2024-11-05", "endpoints": { "mcp_jsonrpc": "POST /mcp", "rest": ["/import", "/glucose/latest", "/glucose/range", "/glucose/stats"] }, "tools": [t["name"] for t in MCP_TOOLS] } @app.get("/health") def health(): return {"status": "ok", "db_path": DB_PATH}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Orchid1970/stelo-workaround-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server