__init__.py•8.59 kB
"""
MCP Code Package - Agentic Architecture with LangGraph
Provides both server and client capabilities for Snowflake data operations.
"""
import argparse
import asyncio
import os
import sys
from typing import Dict, Any, Optional
try:
import dotenv
except ImportError:
dotenv = None
try:
import snowflake.connector
except ImportError:
snowflake = None
# Import the client system
from .client import (
SnowflakeAgenticSystem,
AgentArchetype,
FewShotTrainingStore,
FewShotExample,
interactive_session,
batch_process
)
# Import server if available
try:
from . import server
SERVER_AVAILABLE = True
except ImportError:
SERVER_AVAILABLE = False
def parse_server_args():
"""Parse arguments for server mode"""
parser = argparse.ArgumentParser()
# Add server-specific arguments
parser.add_argument(
"--allow_write",
required=False,
default=False,
action="store_true",
help="Allow write operations on the database"
)
parser.add_argument("--log_dir", required=False, default=None, help="Directory to log to")
parser.add_argument("--log_level", required=False, default="INFO", help="Logging level")
parser.add_argument(
"--prefetch",
action="store_true",
dest="prefetch",
default=False,
help="Prefetch table descriptions (when enabled, list_tables and describe_table are disabled)",
)
parser.add_argument(
"--no-prefetch",
action="store_false",
dest="prefetch",
help="Don't prefetch table descriptions",
)
parser.add_argument(
"--exclude_tools",
required=False,
default=[],
nargs="+",
help="List of tools to exclude",
)
# First, get all the arguments we don't know about
args, unknown = parser.parse_known_args()
# Create a dictionary to store our key-value pairs
connection_args = {}
# Iterate through unknown args in pairs
for i in range(0, len(unknown), 2):
if i + 1 >= len(unknown):
break
key = unknown[i]
value = unknown[i + 1]
# Make sure it's a keyword argument (starts with --)
if key.startswith("--"):
key = key[2:] # Remove the '--'
connection_args[key] = value
# Now we can add the known args to kwargs
server_args = {
"allow_write": args.allow_write,
"log_dir": args.log_dir,
"log_level": args.log_level,
"prefetch": args.prefetch,
"exclude_tools": args.exclude_tools,
}
return server_args, connection_args
def parse_client_args():
"""Parse arguments for client mode"""
parser = argparse.ArgumentParser(description="Snowflake MCP Agent with LangGraph")
parser.add_argument(
"--mode",
choices=["interactive", "batch", "query", "train"],
default="interactive",
help="Execution mode"
)
parser.add_argument(
"--query",
type=str,
help="Direct query to process (for 'query' mode)"
)
parser.add_argument(
"--file",
type=str,
help="File containing queries (for 'batch' mode)"
)
parser.add_argument(
"--training-file",
type=str,
default="./agent_training_data.json",
help="Path to training data file"
)
parser.add_argument(
"--jwt-token",
type=str,
help="JWT token for authentication (overrides environment variable)"
)
parser.add_argument(
"--session-id",
type=str,
help="Session ID for tracking (auto-generated if not provided)"
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose output"
)
return parser.parse_args()
async def run_server():
"""Run the MCP server"""
if not SERVER_AVAILABLE:
print("Server module not available. Please ensure the server module is properly configured.")
return
print("Starting MCP Snowflake Server...")
if dotenv:
dotenv.load_dotenv()
# Start server with default configuration
try:
await server.main()
except Exception as e:
print(f"Error starting server: {e}")
raise
async def run_client():
"""Run the agentic client system"""
args = parse_client_args()
print("Initializing Snowflake Agentic System...")
# Load environment variables
if dotenv:
dotenv.load_dotenv()
# Initialize the system with JWT token
jwt_token = args.jwt_token or os.getenv("JWT_TOKEN")
if not jwt_token:
print("Error: JWT token not provided. Set JWT_TOKEN environment variable or use --jwt-token")
return
system = SnowflakeAgenticSystem(jwt_token=jwt_token)
# Handle different modes
if args.mode == "interactive":
print("\n" + "="*60)
print("Snowflake MCP Agent - LangGraph Architecture")
print("Type 'exit' to quit, 'feedback' to provide feedback")
print("="*60 + "\n")
await interactive_session()
elif args.mode == "query":
if not args.query:
print("Error: --query required for query mode")
return
session_id = args.session_id or None
response = await system.process_query(args.query, session_id)
print("\n" + "="*60)
print("Response:")
print("="*60)
print(response)
elif args.mode == "batch":
if not args.file:
print("Error: --file required for batch mode")
return
await batch_process(args.file)
elif args.mode == "train":
print("Training mode: Collecting examples for agent improvement")
print("All queries in this session will be saved as training examples")
# Enable training mode in the system
system.training_mode = True
await interactive_session()
print("\nTraining session completed. Examples saved to:", args.training_file)
def main():
"""Main entry point for the package"""
# Check first argument to determine mode
if len(sys.argv) > 1 and sys.argv[1] == "server":
# Remove 'server' from argv for argument parsing
sys.argv.pop(1)
asyncio.run(run_server())
else:
# Default to client mode
asyncio.run(run_client())
def run():
"""Legacy run function for compatibility"""
main()
# Create convenience functions for direct imports
def create_agent_system(jwt_token: Optional[str] = None) -> SnowflakeAgenticSystem:
"""
Create and return a configured SnowflakeAgenticSystem instance.
Args:
jwt_token: Optional JWT token for authentication. If not provided,
will attempt to load from JWT_TOKEN environment variable.
Returns:
SnowflakeAgenticSystem: Configured agent system ready for use.
Example:
>>> from mcp_code import create_agent_system
>>> system = create_agent_system()
>>> response = await system.process_query("What are the top tables?")
"""
if dotenv:
dotenv.load_dotenv()
token = jwt_token or os.getenv("JWT_TOKEN")
if not token:
raise ValueError("JWT token required. Set JWT_TOKEN environment variable or pass jwt_token parameter.")
return SnowflakeAgenticSystem(jwt_token=token)
def create_training_store(storage_path: str = "./agent_training_data.json") -> FewShotTrainingStore:
"""
Create a training store for managing few-shot examples.
Args:
storage_path: Path to the JSON file for storing training examples.
Returns:
FewShotTrainingStore: Training store instance.
Example:
>>> from mcp_code import create_training_store, FewShotExample, AgentArchetype
>>> store = create_training_store()
>>> example = FewShotExample(
... query="What tables are most used?",
... context={"dataset": "usage"},
... response="The top tables are...",
... agent_type=AgentArchetype.ANALYST
... )
>>> store.add_example(example)
"""
return FewShotTrainingStore(storage_path)
# Package-level exports
__all__ = [
# Main functions
"main",
"run",
"create_agent_system",
"create_training_store",
# Core classes from refactored client
"SnowflakeAgenticSystem",
"AgentArchetype",
"FewShotTrainingStore",
"FewShotExample",
# Utility functions
"interactive_session",
"batch_process",
]
if __name__ == "__main__":
main()