MCP Variance Log
by truaxki
- src
- mcp_variance_log
import sqlite3
import logging
from contextlib import closing
from pathlib import Path
import asyncio
from datetime import datetime
from typing import Optional
from pydantic import AnyUrl
from typing import Any
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from .db_utils import LogDatabase
from . import DEFAULT_DB_PATH
# Initialize database connection
db = LogDatabase(DEFAULT_DB_PATH)
server = Server("mcp-variance-log")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools.
Each tool specifies its arguments using JSON Schema validation.
"""
return [
types.Tool(
name="log-query",
description="""
Conversation Variation analysis
Continuously monitor our conversation and automatically log unusual or noteworthy interactions based on the following criteria:
1. Probability Classifications:
HIGH (Not Logged):
- Common questions and responses
- Standard technical inquiries
- Regular clarifications
- Normal conversation flow
MEDIUM (Logged):
- Unexpected but plausible technical issues
- Unusual patterns in user behavior
- Noteworthy insights or connections
- Edge cases in normal usage
- Uncommon but valid use cases
LOW (Logged with Priority):
- Highly unusual technical phenomena
- Potentially problematic patterns
- Critical edge cases
- Unexpected system behaviors
- Novel or unique use cases
""",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": """Unique identifier for the chat session.
Format: <date>_<user>_<sequence>
Example: 20240124_u1_001
Components:
- date: YYYYMMDD
- user: 'u' + user number
- sequence: 3-digit sequential number
Valid examples:
- 20240124_u1_001
- 20240124_u1_002
- 20240125_u2_001""",
"pattern": "^\\d{8}_u\\d+_\\d{3}$" # Regex pattern to validate format
},
"user_id": {
"type": "string",
"description": "Identifier for the user"
},
"interaction_type": {
"type": "string",
"description": "Type of interaction being monitored"
},
"probability_class": {
"type": "string",
"enum": ["HIGH", "MEDIUM", "LOW"],
"description": "Classification of interaction probability"
},
"message_content": {
"type": "string",
"description": "The user's message content"
},
"response_content": {
"type": "string",
"description": "The system's response content"
},
"context_summary": {
"type": "string",
"description": "Summary of interaction context"
},
"reasoning": {
"type": "string",
"description": "Explanation for the probability classification"
}
},
"required": [
"session_id",
"user_id",
"interaction_type",
"probability_class",
"message_content",
"response_content",
"context_summary",
"reasoning"
]
},
),
types.Tool(
name="read-logs",
description="Retrieve logged conversation variations from the database.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of logs to retrieve",
"default": 10,
"minimum": 1,
"maximum": 100
},
"start_date": {
"type": "string",
"description": "Filter logs after this date (ISO format YYYY-MM-DDTHH:MM:SS)"
},
"end_date": {
"type": "string",
"description": "Filter logs before this date (ISO format YYYY-MM-DDTHH:MM:SS)"
},
"full_details": {
"type": "boolean",
"description": "If true, show all fields; if false, show only context summaries",
"default": False
}
},
"required": ["limit"]
}
),
types.Tool(
name="read_query",
description="""Execute a SELECT query on the SQLite database
Schema Reference:
Table: chat_monitoring
Fields:
- log_id (INTEGER PRIMARY KEY)
- timestamp (DATETIME)
- session_id (TEXT)
- user_id (TEXT)
- interaction_type (TEXT)
- probability_class (TEXT: HIGH, MEDIUM, LOW)
- message_content (TEXT)
- response_content (TEXT)
- context_summary (TEXT)
- reasoning (TEXT)
Example:
SELECT timestamp, probability_class, context_summary
FROM chat_monitoring
WHERE probability_class = 'LOW'
LIMIT 5;
""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SELECT SQL query to execute"
}
},
"required": ["query"]
}
),
types.Tool(
name="write_query",
description="Execute an INSERT, UPDATE, or DELETE query",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Non-SELECT SQL query to execute"
}
},
"required": ["query"]
}
),
types.Tool(
name="create_table",
description="Create a new table in the SQLite database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "CREATE TABLE SQL statement"},
},
"required": ["query"],
},
),
types.Tool(
name="list_tables",
description="List all tables in the database",
inputSchema={
"type": "object",
"properties": {}
}
),
types.Tool(
name="describe_table",
description="Show structure of a specific table",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table to describe"
}
},
"required": ["table_name"]
}
),
types.Tool(
name="append_insight",
description="Add a business insight to the memo",
inputSchema={
"type": "object",
"properties": {
"insight": {"type": "string", "description": "Business insight discovered from data analysis"},
},
"required": ["insight"],
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str,
arguments: dict | None
) -> list[types.TextContent]:
"""Handle tool calls."""
try:
if name == "list_tables":
results = db._execute_query(
"SELECT name FROM sqlite_master WHERE type='table'"
)
return [types.TextContent(type="text", text=str(results))]
elif name == "describe_table":
if not arguments or "table_name" not in arguments:
raise ValueError("Missing table_name argument")
table_name = arguments["table_name"]
# Get table creation SQL instead of using PRAGMA
results = db._execute_query(
f"SELECT sql FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
return [types.TextContent(type="text", text=str(results))]
elif name == "read_query":
if not arguments or "query" not in arguments:
raise ValueError("Missing query argument")
query = arguments["query"].strip()
if not query.upper().startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed")
results = db._execute_query(query)
return [types.TextContent(type="text", text=str(results))]
elif name == "read-logs":
if not arguments:
return [types.TextContent(type="text", text="No arguments provided")]
limit = min(max(arguments.get("limit", 10), 1), 100)
full_details = arguments.get("full_details", False)
try:
logs = db.get_logs(limit=limit, full_details=full_details)
if not logs:
return [types.TextContent(type="text", text="No logs found")]
# Create compact table header with adjusted widths
header = ["ID", "Time", "Prob", "Type", "Context"]
separator = "-" * 90 # Increased overall width
table = [separator]
table.append(" | ".join([
f"{h:<4}" if h == "ID" else
f"{h:<12}" if h == "Time" else
f"{h:<6}" if h == "Prob" or h == "Type" else
f"{h:<45}" # Increased context width
for h in header
]))
table.append(separator)
# Create compact rows with adjusted widths
for log in logs:
time_str = str(log[1])[5:16] # Extract MM-DD HH:MM
context = str(log[8])[:42] + "..." if len(str(log[8])) > 42 else str(log[8]) # Increased context length
row = [
str(log[0])[:4], # ID
time_str, # Time
str(log[5])[:6], # Prob
str(log[4])[:6], # Type
context # Truncated context
]
table.append(" | ".join([
f"{str(cell):<4}" if i == 0 else # ID
f"{str(cell):<12}" if i == 1 else # Time
f"{str(cell):<6}" if i in [2, 3] else # Prob and Type
f"{str(cell):<45}" # Context
for i, cell in enumerate(row)
]))
return [types.TextContent(type="text", text="\n".join(table))]
except sqlite3.Error as e:
return [types.TextContent(type="text", text=f"Database error: {str(e)}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
elif name == "log-query":
# Existing log-query logic
session_id = arguments.get("session_id", "")
user_id = arguments.get("user_id", "")
interaction_type = arguments.get("interaction_type", "")
probability_class = arguments.get("probability_class", "")
message_content = arguments.get("message_content", "")
response_content = arguments.get("response_content", "")
context_summary = arguments.get("context_summary", "")
reasoning = arguments.get("reasoning", "")
success = db.add_log(
session_id=session_id,
user_id=user_id,
interaction_type=interaction_type,
probability_class=probability_class,
message_content=message_content,
response_content=response_content,
context_summary=context_summary,
reasoning=reasoning
)
return [types.TextContent(
type="text",
text="Log entry added successfully" if success else "Failed to add log entry"
)],
elif name == "append_insight":
if not arguments or "insight" not in arguments:
raise ValueError("Missing insight argument")
db.insights.append(arguments["insight"])
_ = db._synthesize_memo()
# Notify clients that the memo resource has changed
await server.request_context.session.send_resource_updated(AnyUrl("memo://insights"))
return [types.TextContent(type="text", text="Insight added to memo")]
if not arguments:
raise ValueError("Missing arguments")
if name == "write_query":
if arguments["query"].strip().upper().startswith("SELECT"):
raise ValueError("SELECT queries are not allowed for write_query")
results = db._execute_query(arguments["query"])
return [types.TextContent(type="text", text=str(results))]
elif name == "create_table":
if not arguments["query"].strip().upper().startswith("CREATE TABLE"):
raise ValueError("Only CREATE TABLE statements are allowed")
db._execute_query(arguments["query"])
return [types.TextContent(type="text", text="Table created successfully")]
else:
raise ValueError(f"Unknown tool: {name}")
except sqlite3.Error as e:
return [types.TextContent(type="text", text=f"Database error: {str(e)}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async def main():
# Run the server using stdin/stdout streams
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="mcp-variance-log",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)