#!/usr/bin/env python3
"""
Audit Messages Script
Queries both Claude and Gemini CLI, compares responses, and generates audit reports
"""
import asyncio
import json
import os
import re
import subprocess
import sys
from datetime import datetime
from typing import Dict, Any, Optional, List, Tuple
import aiohttp
import difflib
import argparse
# Uncertainty patterns that trigger automatic Gemini consultation
UNCERTAINTY_PATTERNS = [
r"i'm not sure",
r"i am not sure",
r"maybe",
r"perhaps",
r"possibly",
r"might be",
r"could be",
r"uncertain",
r"i think",
r"it seems",
r"appears to be",
r"my understanding is",
r"to the best of my knowledge"
]
class AuditSystem:
"""System for auditing responses from Claude and Gemini"""
def __init__(self, mcp_url: str = "http://localhost:8080"):
self.mcp_url = mcp_url
self.audit_log_file = "audit_log.txt"
self.session = None
async def __aenter__(self):
"""Async context manager entry"""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()
def detect_uncertainty(self, text: str) -> bool:
"""Detect uncertainty in response text"""
text_lower = text.lower()
for pattern in UNCERTAINTY_PATTERNS:
if re.search(pattern, text_lower):
return True
return False
async def query_claude(self, query: str, context: str = '') -> Dict[str, Any]:
"""Query Claude CLI and return response"""
try:
# Build command
cmd_parts = ['claude']
# Combine context and query
full_query = query
if context:
full_query = f"Context: {context}\n\nQuery: {query}"
# Execute Claude CLI command
process = await asyncio.create_subprocess_exec(
*cmd_parts,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Send query via stdin
stdout, stderr = await process.communicate(full_query.encode())
if process.returncode != 0:
error_msg = stderr.decode('utf-8').strip()
return {
'error': True,
'message': error_msg,
'response': None,
'uncertain': False
}
response_text = stdout.decode('utf-8').strip()
# Check for uncertainty
uncertain = self.detect_uncertainty(response_text)
return {
'error': False,
'response': response_text,
'uncertain': uncertain,
'timestamp': datetime.utcnow().isoformat()
}
except FileNotFoundError:
# If claude CLI not found, try alternative approach
return await self.query_claude_alternative(query, context)
except Exception as e:
return {
'error': True,
'message': str(e),
'response': None,
'uncertain': False
}
async def query_claude_alternative(self, query: str, context: str = '') -> Dict[str, Any]:
"""Alternative method to query Claude (simulated for demo)"""
# In a real scenario, this could use an API or different method
return {
'error': True,
'message': 'Claude CLI not found. Please ensure it is installed.',
'response': None,
'uncertain': False
}
async def query_gemini(self, query: str, context: str = '') -> Dict[str, Any]:
"""Query Gemini via MCP server"""
try:
if not self.session:
self.session = aiohttp.ClientSession()
# Prepare request
payload = {
'query': query,
'context': context
}
# Send request to MCP server
async with self.session.post(
f"{self.mcp_url}/consult",
json=payload
) as response:
data = await response.json()
if response.status == 429:
# Rate limit exceeded
return {
'error': True,
'message': data.get('message', 'Rate limit exceeded'),
'retry_after': data.get('retry_after', 60),
'response': None
}
elif response.status != 200:
return {
'error': True,
'message': data.get('message', 'Unknown error'),
'response': None
}
# Extract response
gemini_response = data.get('response', {})
if gemini_response.get('error'):
return {
'error': True,
'message': gemini_response.get('message', 'Gemini error'),
'response': None
}
return {
'error': False,
'response': gemini_response.get('text', ''),
'timestamp': data.get('timestamp')
}
except aiohttp.ClientError as e:
return {
'error': True,
'message': f"Failed to connect to MCP server: {str(e)}",
'response': None
}
except Exception as e:
return {
'error': True,
'message': str(e),
'response': None
}
def compare_responses(self, claude_response: str, gemini_response: str) -> Dict[str, Any]:
"""Compare and analyze differences between responses"""
# Basic metrics
claude_lines = claude_response.splitlines()
gemini_lines = gemini_response.splitlines()
# Calculate similarity
matcher = difflib.SequenceMatcher(None, claude_response, gemini_response)
similarity_ratio = matcher.ratio()
# Get unified diff
diff = list(difflib.unified_diff(
claude_lines,
gemini_lines,
fromfile='Claude',
tofile='Gemini',
lineterm=''
))
# Analyze key differences
key_differences = []
for line in diff:
if line.startswith('+') and not line.startswith('+++'):
key_differences.append(f"Gemini adds: {line[1:].strip()}")
elif line.startswith('-') and not line.startswith('---'):
key_differences.append(f"Claude only: {line[1:].strip()}")
return {
'similarity_ratio': similarity_ratio,
'similarity_percentage': round(similarity_ratio * 100, 2),
'claude_length': len(claude_response),
'gemini_length': len(gemini_response),
'diff': '\n'.join(diff) if diff else 'No differences',
'key_differences': key_differences[:5] # Top 5 differences
}
def generate_audit_report(
self,
query: str,
context: str,
claude_result: Dict[str, Any],
gemini_result: Dict[str, Any],
comparison: Optional[Dict[str, Any]] = None
) -> str:
"""Generate a comprehensive audit report"""
report = []
report.append("=" * 80)
report.append(f"AUDIT REPORT - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 80)
report.append("")
# Query information
report.append("QUERY INFORMATION:")
report.append(f"Query: {query}")
if context:
report.append(f"Context: {context}")
report.append("")
# Claude response
report.append("CLAUDE RESPONSE:")
if claude_result['error']:
report.append(f"ERROR: {claude_result['message']}")
else:
report.append(f"Response: {claude_result['response']}")
report.append(f"Uncertainty detected: {'Yes' if claude_result.get('uncertain') else 'No'}")
report.append("")
# Gemini response
report.append("GEMINI RESPONSE:")
if gemini_result['error']:
report.append(f"ERROR: {gemini_result['message']}")
else:
report.append(f"Response: {gemini_result['response']}")
report.append("")
# Comparison
if comparison and not (claude_result['error'] or gemini_result['error']):
report.append("COMPARISON ANALYSIS:")
report.append(f"Similarity: {comparison['similarity_percentage']}%")
report.append(f"Claude response length: {comparison['claude_length']} characters")
report.append(f"Gemini response length: {comparison['gemini_length']} characters")
if comparison['key_differences']:
report.append("\nKey Differences:")
for diff in comparison['key_differences']:
report.append(f" - {diff}")
else:
report.append("No significant differences found.")
if comparison['similarity_percentage'] < 70:
report.append("\nWARNING: Responses show significant differences!")
report.append("")
report.append("=" * 80)
report.append("")
return '\n'.join(report)
def save_audit_log(self, report: str):
"""Save audit report to log file"""
with open(self.audit_log_file, 'a', encoding='utf-8') as f:
f.write(report)
f.write('\n')
async def audit_query(self, query: str, context: str = '') -> Dict[str, Any]:
"""Execute full audit process for a query"""
print(f"\nš Auditing query: {query}")
# Query Claude
print("š Querying Claude...")
claude_result = await self.query_claude(query, context)
# Check if we should consult Gemini
should_consult_gemini = True
if claude_result.get('uncertain'):
print("ā ļø Uncertainty detected in Claude's response. Consulting Gemini...")
# Query Gemini
gemini_result = {'error': True, 'message': 'Not queried', 'response': None}
if should_consult_gemini:
print("š¤ Querying Gemini...")
gemini_result = await self.query_gemini(query, context)
# Compare responses if both succeeded
comparison = None
if not claude_result['error'] and not gemini_result['error']:
print("š Comparing responses...")
comparison = self.compare_responses(
claude_result['response'],
gemini_result['response']
)
# Generate report
report = self.generate_audit_report(
query, context, claude_result, gemini_result, comparison
)
# Save to log
self.save_audit_log(report)
# Print report
print("\n" + report)
return {
'claude': claude_result,
'gemini': gemini_result,
'comparison': comparison,
'report': report
}
async def check_mcp_server(url: str = "http://localhost:8080") -> bool:
"""Check if MCP server is running"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{url}/health") as response:
return response.status == 200
except:
return False
async def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description='Audit responses from Claude and Gemini')
parser.add_argument('--query', type=str, help='Query to audit')
parser.add_argument('--context', type=str, default='', help='Optional context')
parser.add_argument('--interactive', action='store_true', help='Run in interactive mode')
parser.add_argument('--mcp-url', type=str, default='http://localhost:8080', help='MCP server URL')
args = parser.parse_args()
# Check if MCP server is running
print("š Checking MCP server connection...")
if not await check_mcp_server(args.mcp_url):
print("ā MCP server is not running!")
print("Please start it with: python tools/mcp/mcp-server.py")
return
print("ā
MCP server is running")
# Create audit system
async with AuditSystem(args.mcp_url) as audit_system:
if args.query:
# Single query mode
await audit_system.audit_query(args.query, args.context)
else:
# Interactive mode
print("\nšÆ Claude-Gemini Audit System")
print("Type 'quit' to exit\n")
while True:
try:
query = input("Enter query: ").strip()
if query.lower() in ['quit', 'exit', 'q']:
break
if not query:
continue
context = input("Enter context (optional): ").strip()
await audit_system.audit_query(query, context)
except KeyboardInterrupt:
print("\n\nExiting...")
break
except Exception as e:
print(f"Error: {e}")
print(f"\nš Audit log saved to: {audit_system.audit_log_file}")
if __name__ == '__main__':
asyncio.run(main())