#!/usr/bin/env python3
"""
Integrated ADK Agent with MCP Server connection.
This version actually calls the MCP server tools.
"""
import os
import asyncio
import json
import subprocess
from typing import Any, Dict, List
from anthropic import Anthropic
from google.cloud import bigquery
import psycopg2
from psycopg2.extras import RealDictCursor
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class IntegratedSalesAgent:
"""
Agent that directly executes queries against Cloud SQL and BigQuery.
Implements the MCP tool interface internally.
"""
def __init__(self):
"""Initialize the agent."""
self.client = Anthropic(api_key=os.getenv('ANTHROPIC_API_KEY'))
self.bq_client = None
self.cloudsql_config = {
'host': os.getenv('DB_HOST'),
'database': os.getenv('CLOUDSQL_DATABASE', 'salesdb'),
'user': os.getenv('CLOUDSQL_USER', 'salesuser'),
'password': os.getenv('CLOUDSQL_PASSWORD')
}
self.conversation_history = []
# System prompt
self.system_prompt = """You are a sales analytics assistant with access to two data sources:
1. **Cloud SQL Database** (query_cloudsql, get_cloudsql_schema, list_cloudsql_tables):
- Contains recent transactional data
- Tables: customers, orders, vendors
- Use for: specific customer queries, recent order details, vendor information, current sales data
2. **BigQuery thelook_ecommerce Dataset** (query_bigquery, get_bigquery_schema, list_bigquery_tables):
- Public e-commerce analytics dataset
- Tables: products, users, events, inventory_items, order_items, orders, distribution_centers
- Use for: product analytics, user behavior, inventory analysis, historical trends, large-scale analytics
**Decision Guidelines:**
- For questions about "customers", "vendors", or recent "orders" → use Cloud SQL
- For questions about "products", "inventory", "users", "events", or broad analytics → use BigQuery
- When uncertain, first list tables from both sources to understand the data
- Always explain which data source you're using and why
- Combine data from both sources if needed to answer comprehensively
**Important:**
- Use descriptive SQL queries with clear column selections
- Always limit results to reasonable amounts (use LIMIT clause)
- Format numeric results clearly (e.g., currency with $ symbol)
- Provide context and insights, not just raw data
- When showing data, format it in a readable way (tables, lists, etc.)"""
# Define tools
self.tools = [
{
"name": "query_cloudsql",
"description": (
"Execute a SQL query against the Cloud SQL PostgreSQL database. "
"This database contains customer, orders, and vendors tables with "
"recent transactional data. Use this for queries about specific "
"customer orders, vendor information, or recent sales transactions."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute (SELECT statements only)"
}
},
"required": ["query"]
}
},
{
"name": "query_bigquery",
"description": (
"Execute a SQL query against BigQuery's thelook_ecommerce public dataset. "
"This dataset contains comprehensive e-commerce analytics data including "
"products, users, events, inventory, and distribution centers. "
"Use this for broader analytical queries, historical trends, and "
"large-scale data analysis."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The BigQuery SQL query to execute"
}
},
"required": ["query"]
}
},
{
"name": "list_cloudsql_tables",
"description": "List all tables in the Cloud SQL database with their schemas.",
"input_schema": {
"type": "object",
"properties": {}
}
},
{
"name": "list_bigquery_tables",
"description": "List all tables in the BigQuery thelook_ecommerce dataset.",
"input_schema": {
"type": "object",
"properties": {}
}
},
{
"name": "get_cloudsql_schema",
"description": "Get the detailed schema for a specific Cloud SQL table.",
"input_schema": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table"
}
},
"required": ["table_name"]
}
}
]
def _get_cloudsql_connection(self):
"""Get Cloud SQL database connection."""
return psycopg2.connect(**self.cloudsql_config)
def _get_bigquery_client(self):
"""Get BigQuery client."""
if not self.bq_client:
self.bq_client = bigquery.Client(project=os.getenv('GCP_PROJECT_ID'))
return self.bq_client
async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Execute a tool and return the result."""
try:
if tool_name == "query_cloudsql":
return await self._query_cloudsql(arguments["query"])
elif tool_name == "query_bigquery":
return await self._query_bigquery(arguments["query"])
elif tool_name == "list_cloudsql_tables":
return await self._list_cloudsql_tables()
elif tool_name == "list_bigquery_tables":
return await self._list_bigquery_tables()
elif tool_name == "get_cloudsql_schema":
return await self._get_cloudsql_schema(arguments["table_name"])
else:
return json.dumps({"error": f"Unknown tool: {tool_name}"})
except Exception as e:
return json.dumps({"error": str(e)})
async def _query_cloudsql(self, query: str) -> str:
"""Execute query against Cloud SQL."""
query = query.strip()
if not query.upper().startswith('SELECT'):
return json.dumps({"error": "Only SELECT queries are allowed"})
conn = self._get_cloudsql_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query)
rows = cur.fetchall()
return json.dumps({
"source": "CloudSQL",
"row_count": len(rows),
"data": [dict(row) for row in rows]
}, indent=2, default=str)
finally:
conn.close()
async def _query_bigquery(self, query: str) -> str:
"""Execute query against BigQuery."""
client = self._get_bigquery_client()
# Ensure query references the public dataset correctly
if 'thelook_ecommerce' in query and 'bigquery-public-data' not in query:
# Add dataset prefix if not present
query = query.replace('FROM ', 'FROM `bigquery-public-data.thelook_ecommerce`.')
query = query.replace('FROM `bigquery-public-data.thelook_ecommerce`.`bigquery-public-data.thelook_ecommerce`.',
'FROM `bigquery-public-data.thelook_ecommerce`.')
query_job = client.query(query)
results = query_job.result()
rows = [dict(row) for row in results]
return json.dumps({
"source": "BigQuery",
"row_count": len(rows),
"total_bytes_processed": results.total_bytes_processed,
"data": rows
}, indent=2, default=str)
async def _list_cloudsql_tables(self) -> str:
"""List all tables in Cloud SQL."""
query = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
"""
conn = self._get_cloudsql_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query)
rows = cur.fetchall()
tables = {}
for row in rows:
table = row['table_name']
if table not in tables:
tables[table] = []
tables[table].append({
'column': row['column_name'],
'type': row['data_type']
})
return json.dumps({"source": "CloudSQL", "tables": tables}, indent=2)
finally:
conn.close()
async def _list_bigquery_tables(self) -> str:
"""List all tables in BigQuery dataset."""
client = self._get_bigquery_client()
dataset_ref = client.dataset('thelook_ecommerce', project='bigquery-public-data')
tables = {}
for table in client.list_tables(dataset_ref):
tables[table.table_id] = f"{table.num_rows:,} rows" if table.num_rows else "unknown"
return json.dumps({
"source": "BigQuery",
"dataset": "thelook_ecommerce",
"tables": tables
}, indent=2)
async def _get_cloudsql_schema(self, table_name: str) -> str:
"""Get schema for a specific Cloud SQL table."""
query = """
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
ORDER BY ordinal_position
"""
conn = self._get_cloudsql_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, (table_name,))
rows = cur.fetchall()
return json.dumps({
"source": "CloudSQL",
"table": table_name,
"columns": [dict(row) for row in rows]
}, indent=2)
finally:
conn.close()
async def process_message(self, user_message: str) -> str:
"""Process a user message and return the response."""
# Add user message to history
self.conversation_history.append({
"role": "user",
"content": user_message
})
messages = self.conversation_history.copy()
response_text = ""
# Agentic loop
max_iterations = 10
iteration = 0
while iteration < max_iterations:
iteration += 1
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
system=self.system_prompt,
tools=self.tools,
messages=messages
)
# Extract content
text_content = []
tool_uses = []
for block in response.content:
if block.type == "text":
text_content.append(block.text)
elif block.type == "tool_use":
tool_uses.append(block)
# Accumulate text
if text_content:
current_text = "\n".join(text_content)
if response_text:
response_text += "\n\n" + current_text
else:
response_text = current_text
# If no tool calls, we're done
if not tool_uses:
break
# Add assistant's response to messages
messages.append({
"role": "assistant",
"content": response.content
})
# Execute tools
tool_results = []
for tool_use in tool_uses:
print(f"\n 🔧 {tool_use.name}: {json.dumps(tool_use.input, indent=2)}")
result = await self._execute_tool(tool_use.name, tool_use.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result
})
# Add tool results
messages.append({
"role": "user",
"content": tool_results
})
# Update history
self.conversation_history = messages
return response_text
def reset_conversation(self):
"""Reset the conversation history."""
self.conversation_history = []
async def main():
"""Main interactive loop."""
print("=" * 70)
print("Integrated Sales Analytics Agent")
print("=" * 70)
print("\nConnected to:")
print(f" • Cloud SQL: {os.getenv('DB_HOST')}")
print(f" • BigQuery: bigquery-public-data.thelook_ecommerce")
print("\nExample questions:")
print(" - What are the total sales from Cloud SQL?")
print(" - Show me the top 5 customers by order value")
print(" - How many products are in BigQuery?")
print(" - What's the average order amount?")
print("\nType 'quit' to exit, 'reset' to start a new conversation\n")
agent = IntegratedSalesAgent()
while True:
try:
user_input = input("\n📊 You: ").strip()
if not user_input:
continue
if user_input.lower() == 'quit':
print("\nGoodbye!")
break
if user_input.lower() == 'reset':
agent.reset_conversation()
print("\n🔄 Conversation reset")
continue
print("\n🤖 Agent:")
response = await agent.process_message(user_input)
print(response)
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())