#!/usr/bin/env python3
"""
Enhanced Interactive Client with LLM-Based Tool Selection.
Uses real LLM intelligence for tool selection and parameter extraction.
"""
import asyncio
import json
import sys
import os
import logging
from typing import Optional, List, Dict, Any
from datetime import datetime
# Add the src directory to the Python path
current_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.join(current_dir, '..', 'src')
sys.path.insert(0, src_dir)
# Import required modules
from dotenv import load_dotenv
import google.generativeai as genai
from google.generativeai import types
# Load environment variables
load_dotenv("../.env")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Validate API key
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY environment variable not set. Please check your .env file.")
class LLMBrainInteractiveClient:
"""Interactive client with LLM-based tool selection and parameter extraction."""
def __init__(self):
"""Initialize the LLM brain client."""
self.available_tools = []
self.tool_descriptions = {}
self.rate_limited_client = None
self._initialize_llm_client()
def _initialize_llm_client(self):
"""Initialize the rate-limited Gemini client."""
try:
from utils.gemini_client import get_rate_limited_client
self.rate_limited_client = get_rate_limited_client(GEMINI_API_KEY)
logger.info("LLM brain initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize LLM client: {e}")
raise RuntimeError(f"LLM client initialization failed: {str(e)}")
def format_tools_for_gemini(self, tools: List[Any]) -> List[types.Tool]:
"""Format tools for Gemini API with enhanced schema handling."""
gemini_tools = []
self.available_tools = []
self.tool_descriptions = {}
for tool in tools:
try:
# Clean up the input schema by removing unsupported properties
parameters = {
k: v for k, v in tool.input_schema.items()
if k not in ["additionalProperties", "$schema"]
} if hasattr(tool, 'input_schema') and tool.input_schema else {"type": "object"}
# Create tool in Gemini format
gemini_tool = types.Tool(
function_declarations=[{
"name": tool.name,
"description": tool.description or "",
"parameters": parameters
}]
)
gemini_tools.append(gemini_tool)
self.available_tools.append(tool.name)
self.tool_descriptions[tool.name] = tool.description or ""
logger.info(f"Formatted tool: {tool.name} - {tool.description}")
except Exception as e:
logger.warning(f"Failed to format tool {tool.name}: {e}")
continue
return gemini_tools
def _create_tool_selection_prompt(self, query: str) -> str:
"""Create a comprehensive prompt for LLM-based tool selection."""
tool_info = "\n".join([
f"- {name}: {desc}"
for name, desc in self.tool_descriptions.items()
])
prompt = f"""You are an intelligent assistant with access to various tools. Your job is to understand the user's request and select the most appropriate tool to help them.
Available tools:
{tool_info}
User Query: "{query}"
Instructions:
1. Analyze the user's query to understand what they need
2. Select the most appropriate tool from the available options
3. Extract the relevant parameters from the user's query
4. Call the selected tool with the extracted parameters
Guidelines for tool selection:
- Use 'get_knowledge_base' for questions about company policies, benefits, procedures, HR information, employee guidelines, vacation, sick leave, dress code, working hours, training, performance reviews, etc.
- Use 'calculate' for mathematical calculations, arithmetic, formulas, equations, or any numerical computations
- Use 'get_weather' for weather-related queries, temperature, climate, or location-specific weather information
Examples:
- "What is the vacation policy?" → get_knowledge_base with query="vacation policy"
- "What is 15 + 27?" → calculate with expression="15 + 27"
- "What's the weather in New York?" → get_weather with location="New York"
- "How many sick days do employees get?" → get_knowledge_base with query="sick days policy"
- "Calculate the square root of 16" → calculate with expression="sqrt(16)"
- "What's the temperature in Tokyo?" → get_weather with location="Tokyo"
Please select the appropriate tool and extract the necessary parameters from the user's query."""
return prompt
async def process_query_with_llm_brain(self, query: str, tools: List[Any], model: str = "gemini-2.0-flash-lite") -> str:
"""Process a query using LLM-based tool selection and parameter extraction.
Args:
query: The user query
tools: Available tools
model: Gemini model to use
Returns:
The response from tool execution or LLM reasoning
"""
try:
if not tools:
return "Error: No tools available for processing queries."
logger.info(f"Processing query with LLM brain: {query}")
logger.info(f"Available tools: {self.available_tools}")
# Format tools for Gemini
gemini_tools = self.format_tools_for_gemini(tools)
if not gemini_tools:
return "Error: No tools could be formatted for LLM processing."
# Create comprehensive prompt for LLM tool selection
selection_prompt = self._create_tool_selection_prompt(query)
# Generate response from Gemini with rate limiting
response = await self.rate_limited_client.generate_content(
contents=selection_prompt,
generation_config=types.GenerationConfig(
temperature=0.1, # Low temperature for consistent tool selection
),
tools=gemini_tools,
)
# Process the response
if not response.candidates:
return "Error: No response candidates from Gemini"
candidate = response.candidates[0]
# Check for function call (tool selection)
logger.info(f"Checking for function call in response...")
# Look for a function call in any part
function_call = None
if candidate.content and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'function_call') and part.function_call:
function_call = part.function_call
break
if function_call:
logger.info(f"LLM brain selected tool: {function_call.name}")
logger.info(f"LLM brain extracted parameters: {dict(function_call.args)}")
if function_call.name and function_call.name in self.available_tools:
try:
# Find the selected tool
selected_tool = None
for tool in tools:
if tool.name == function_call.name:
selected_tool = tool
break
if selected_tool:
# Call the tool with LLM-extracted parameters
result = await selected_tool.handler(**dict(function_call.args))
return self._format_tool_result(result)
else:
return f"Error: Tool '{function_call.name}' not found"
except Exception as e:
logger.error(f"Error executing tool {function_call.name}: {e}")
return f"Error executing tool {function_call.name}: {str(e)}"
else:
logger.warning(f"LLM brain selected unknown tool: {function_call.name}")
return f"Error: Unknown tool '{function_call.name}' selected by LLM brain"
# If no function call, return LLM's reasoning
if candidate.content and candidate.content.parts:
text_parts = [part.text for part in candidate.content.parts if hasattr(part, 'text') and part.text is not None]
if text_parts:
return '\n'.join(text_parts)
# Fallback to response.text if available
if hasattr(response, 'text') and response.text:
return response.text
return "No meaningful response generated by LLM brain"
except Exception as e:
logger.error(f"Error processing query with LLM brain: {e}")
return f"Error processing query: {str(e)}"
def _format_tool_result(self, result) -> str:
"""Format tool execution result for display."""
try:
if hasattr(result, 'content') and result.content:
content_text = result.content[0].text if hasattr(result.content[0], 'text') else str(result.content[0])
# Try to parse and format as JSON
try:
parsed_json = json.loads(content_text)
return json.dumps(parsed_json, indent=2)
except json.JSONDecodeError:
# Return as plain text if not valid JSON
return content_text
return str(result)
except Exception as e:
logger.error(f"Error formatting result: {e}")
return f"Error formatting result: {str(e)}"
async def run_interactive_session(self):
"""Run an interactive session with LLM brain tool selection."""
print("🧠 LLM Brain Interactive Client")
print("=" * 50)
print("Type 'quit' or 'exit' to end the session")
print("Type 'status' to check rate limiting status")
print("Type 'tools' to see available tools")
print("Type 'brain' to see LLM brain capabilities")
print("Connecting to tools...")
try:
# Initialize tool registry
from tools import ToolRegistry
registry = ToolRegistry()
tools = registry.discover_and_register_tools()
if not tools:
print("Error: No tools available. Please check tool registration.")
return
print(f"\n🧠 LLM Brain initialized! Available tools ({len(tools)}):")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
print("\n🧠 Ready for queries! The LLM brain will intelligently select tools and extract parameters.")
print("No keyword matching - pure LLM intelligence!")
while True:
try:
query = input("\n🧠 > ").strip()
if query.lower() in ['quit', 'exit', 'q']:
break
if query.lower() == 'status':
status = self.rate_limited_client.get_rate_limit_status()
print("\n" + "="*50)
print("🧠 Rate Limiting Status:")
print(f"Current RPM: {status['current_rpm']}/{status['safe_rpm']}")
print(f"Current RPD: {status['current_rpd']}/{status['safe_rpd']}")
print(f"Current TPM: {status['current_tpm']}/{status['safe_tpm']}")
print(f"Available RPM: {status['rpm_available']}")
print(f"Available RPD: {status['rpd_available']}")
print(f"Available TPM: {status['tpm_available']}")
print("="*50)
continue
if query.lower() == 'tools':
print("\n" + "="*50)
print("🧠 Available Tools:")
for name, desc in self.tool_descriptions.items():
print(f" - {name}: {desc}")
print("="*50)
continue
if query.lower() == 'brain':
print("\n" + "="*50)
print("🧠 LLM Brain Capabilities:")
print("✅ Real LLM-based tool selection")
print("✅ Intelligent parameter extraction")
print("✅ No keyword matching")
print("✅ Context-aware decision making")
print("✅ Natural language understanding")
print("✅ Rate-limited API calls")
print("✅ Error handling and fallbacks")
print("="*50)
continue
if not query:
continue
print("\n" + "="*50)
print(f"🧠 Processing: {query}")
print("-" * 50)
response = await self.process_query_with_llm_brain(query, tools)
print("🧠 Response:")
print(response)
print("="*50)
except KeyboardInterrupt:
print("\nUse 'quit' to exit gracefully.")
continue
except Exception as e:
print(f"❌ Error: {e}")
continue
except Exception as e:
print(f"Failed to initialize LLM brain: {e}")
print("Please ensure the tools are properly registered and dependencies are installed.")
async def main():
"""Main entry point for the LLM brain interactive client."""
print("🚀 Starting LLM Brain Interactive Client...")
try:
client = LLMBrainInteractiveClient()
await client.run_interactive_session()
except KeyboardInterrupt:
print("\n\nShutting down LLM brain gracefully...")
except Exception as e:
print(f"Unexpected error: {e}")
if __name__ == "__main__":
asyncio.run(main())