#!/usr/bin/env python3
"""ADK Agent for querying Cloud SQL and BigQuery databases."""
import os
import asyncio
import json
from typing import Any, Dict, List
from anthropic import Anthropic
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class SalesAnalyticsAgent:
"""
Agent that intelligently routes queries between Cloud SQL and BigQuery.
Cloud SQL: Contains recent transactional data (customers, orders, vendors)
BigQuery: Contains comprehensive e-commerce analytics (thelook_ecommerce dataset)
"""
def __init__(self, mcp_tools: List[Dict[str, Any]]):
"""
Initialize the agent.
Args:
mcp_tools: List of MCP tools from the database server
"""
self.client = Anthropic(api_key=os.getenv('ANTHROPIC_API_KEY'))
self.mcp_tools = mcp_tools
self.conversation_history = []
# System prompt that guides data source selection
self.system_prompt = """You are a sales analytics assistant with access to two data sources:
1. **Cloud SQL Database** (query_cloudsql, get_cloudsql_schema, list_cloudsql_tables):
- Contains recent transactional data
- Tables: customers, orders, vendors
- Use for: specific customer queries, recent order details, vendor information, current sales data
2. **BigQuery thelook_ecommerce Dataset** (query_bigquery, get_bigquery_schema, list_bigquery_tables):
- Public e-commerce analytics dataset
- Tables: products, users, events, inventory_items, order_items, orders, distribution_centers
- Use for: product analytics, user behavior, inventory analysis, historical trends, large-scale analytics
**Decision Guidelines:**
- For questions about "customers", "vendors", or recent "orders" → use Cloud SQL
- For questions about "products", "inventory", "users", "events", or broad analytics → use BigQuery
- When uncertain, first list tables from both sources to understand the data
- Always explain which data source you're using and why
- Combine data from both sources if needed to answer comprehensively
**Important:**
- Use descriptive SQL queries with clear column selections
- Always limit results to reasonable amounts (use LIMIT clause)
- Format numeric results clearly (e.g., currency with $ symbol)
- Provide context and insights, not just raw data"""
async def process_message(self, user_message: str) -> str:
"""
Process a user message and return the response.
Args:
user_message: The user's question or request
Returns:
The agent's response
"""
# Add user message to history
self.conversation_history.append({
"role": "user",
"content": user_message
})
messages = self.conversation_history.copy()
response_text = ""
# Agentic loop: continue until no more tool calls
while True:
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
system=self.system_prompt,
tools=self.mcp_tools,
messages=messages
)
# Extract text and tool use from response
text_content = []
tool_uses = []
for block in response.content:
if block.type == "text":
text_content.append(block.text)
elif block.type == "tool_use":
tool_uses.append(block)
# Accumulate text response
if text_content:
response_text += "\n".join(text_content)
# If no tool calls, we're done
if not tool_uses:
break
# Add assistant's response to messages
messages.append({
"role": "assistant",
"content": response.content
})
# Execute tools and collect results
tool_results = []
for tool_use in tool_uses:
print(f"\n🔧 Using tool: {tool_use.name}")
print(f" Arguments: {json.dumps(tool_use.input, indent=2)}")
try:
result = await self._execute_tool(tool_use.name, tool_use.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result
})
print(f" ✓ Tool executed successfully")
except Exception as e:
error_msg = f"Error: {str(e)}"
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": error_msg,
"is_error": True
})
print(f" ✗ Tool error: {error_msg}")
# Add tool results to messages
messages.append({
"role": "user",
"content": tool_results
})
# Update conversation history
self.conversation_history = messages
return response_text
async def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""
Execute an MCP tool.
This is a placeholder - in a real implementation, this would call
the actual MCP server. For this POC, we'll simulate tool execution.
"""
# In production, this would communicate with the MCP server
# For now, return a placeholder
return json.dumps({
"note": "Tool execution placeholder",
"tool": tool_name,
"arguments": arguments,
"message": "In production, this would call the MCP server"
})
def reset_conversation(self):
"""Reset the conversation history."""
self.conversation_history = []
# Define MCP tools schema
MCP_TOOLS = [
{
"name": "query_cloudsql",
"description": (
"Execute a SQL query against the Cloud SQL PostgreSQL database. "
"This database contains customer, orders, and vendors tables with "
"recent transactional data. Use this for queries about specific "
"customer orders, vendor information, or recent sales transactions."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute (SELECT statements only)"
}
},
"required": ["query"]
}
},
{
"name": "query_bigquery",
"description": (
"Execute a SQL query against BigQuery's thelook_ecommerce public dataset. "
"This dataset contains comprehensive e-commerce analytics data including "
"products, users, events, inventory, and distribution centers. "
"Use this for broader analytical queries, historical trends, and "
"large-scale data analysis."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The BigQuery SQL query to execute"
}
},
"required": ["query"]
}
},
{
"name": "list_cloudsql_tables",
"description": "List all tables in the Cloud SQL database with their schemas.",
"input_schema": {
"type": "object",
"properties": {}
}
},
{
"name": "list_bigquery_tables",
"description": "List all tables in the BigQuery thelook_ecommerce dataset with their schemas.",
"input_schema": {
"type": "object",
"properties": {}
}
},
{
"name": "get_cloudsql_schema",
"description": "Get the detailed schema for a specific Cloud SQL table.",
"input_schema": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table"
}
},
"required": ["table_name"]
}
},
{
"name": "get_bigquery_schema",
"description": "Get the detailed schema for a specific BigQuery table.",
"input_schema": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table in thelook_ecommerce dataset"
}
},
"required": ["table_name"]
}
}
]
async def main():
"""Main entry point for interactive demo."""
print("=" * 70)
print("Sales Analytics Agent - POC")
print("=" * 70)
print("\nThis agent can answer questions about sales data from:")
print(" • Cloud SQL: customers, orders, vendors (transactional data)")
print(" • BigQuery: thelook_ecommerce (analytics data)")
print("\nExample questions:")
print(" - What are the total sales from Cloud SQL?")
print(" - Show me the top 5 products from BigQuery")
print(" - How many customers do we have?")
print(" - What's the average order value?")
print("\nType 'quit' to exit, 'reset' to start a new conversation\n")
agent = SalesAnalyticsAgent(MCP_TOOLS)
while True:
try:
user_input = input("\n📊 You: ").strip()
if not user_input:
continue
if user_input.lower() == 'quit':
print("\nGoodbye!")
break
if user_input.lower() == 'reset':
agent.reset_conversation()
print("\n🔄 Conversation reset")
continue
print("\n🤖 Agent:", end=" ")
response = await agent.process_message(user_input)
print(response)
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"\n❌ Error: {e}")
if __name__ == "__main__":
asyncio.run(main())