Skip to main content
Glama
Bin4yi

MCP Authentication Example

by Bin4yi
agent.py17.6 kB
import os import asyncio from typing import TypedDict, Annotated, Literal from langgraph.graph import StateGraph, END from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage from langchain_core.prompts import ChatPromptTemplate import operator from datetime import datetime import httpx import secrets import hashlib import base64 import webbrowser from urllib.parse import urlencode, parse_qs from aiohttp import web import threading # MCP Integration imports from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client from mcp.client.sse import sse_client from langchain_mcp_adapters.tools import load_mcp_tools from dotenv import load_dotenv load_dotenv() # Initialize OpenAI LLM llm = ChatOpenAI( model="gpt-4o-mini", temperature=0.7, api_key=os.getenv("OPENAI_API_KEY") ) # Simple in-memory conversation storage conversation_memory = {} class ChatbotState(TypedDict): messages: Annotated[list, operator.add] user_intent: str extracted_info: dict needs_human: bool conversation_history: list mcp_tools: list mcp_results: dict session_id: str # MCP Server Configuration MCP_SERVER_URL = "http://127.0.0.1:8000/mcp" # Asgardeo OAuth2 Configuration - FIXED ENDPOINTS AUTH_ISSUER = os.getenv("AUTH_ISSUER", "https://api.asgardeo.io/t/pasansanjiiwa") CLIENT_ID = os.getenv("CLIENT_ID", "") REDIRECT_URI = "http://localhost:8080/callback" # Correct endpoint construction AUTHORIZATION_ENDPOINT = f"{AUTH_ISSUER}/oauth2/authorize" TOKEN_ENDPOINT = f"{AUTH_ISSUER}/oauth2/token" # auth_code_future = asyncio.Future() KNOWLEDGE_BASE = """ === PAWSOME PET CARE CENTER === SERVICES & PRICING: 1. Grooming: - Small pets (under 25 lbs): $45 - Medium pets (25-50 lbs): $65 - Large pets (50-80 lbs): $85 - Extra-large pets (80+ lbs): $110 - Duration: 1.5-3 hours - Includes: Bath, haircut, nail trimming, ear cleaning, teeth brushing 2. Boarding (per night): - Small: $35/night - Medium: $45/night - Large: $55/night - Extra-large: $65/night - 24/7 supervision, comfortable accommodations, playtime, meals included 3. Daycare: - Full day (7am-7pm): $30 - Half day: $20 - Includes: Socialization, supervised playtime, snacks 4. Training: - Single session (1 hour): $75 - 5-session package: $350 - 10-session package: $650 - Services: Obedience training, behavior modification, puppy training 5. Veterinary Services: - General checkup: $65 - Vaccinations: $25 each - Dental cleaning: $150 - Minor treatments available REQUIREMENTS: - All pets must be up-to-date on rabies, distemper, and bordetella vaccines - Puppies/kittens must be at least 4 months old for daycare and boarding - Flea and tick prevention required - Temperament evaluation required for group activities HOURS: - Monday-Friday: 7:00 AM - 7:00 PM - Saturday: 8:00 AM - 6:00 PM - Sunday: 9:00 AM - 5:00 PM CONTACT: - Phone: (555) 123-4567 - Email: info@pawsomepetcare.com - Address: 123 Pet Care Lane, Anytown, USA - Website: www.pawsomepetcare.com POLICIES: - Cancellation: 24-hour notice required - Payment: Cash, credit cards, pet insurance accepted - Emergency: 24/7 emergency line available for boarding guests """ def generate_pkce(): """Generate PKCE code verifier and challenge.""" code_verifier = secrets.token_urlsafe(64) code_challenge = hashlib.sha256(code_verifier.encode('ascii')).digest() code_challenge = base64.urlsafe_b64encode(code_challenge).decode('ascii').rstrip('=') return code_verifier, code_challenge async def start_callback_server(auth_future): """Starts a temporary local server to catch the OAuth callback.""" async def callback_handler(request): query = request.rel_url.query if 'code' in query: # Check if future is already done to avoid errors if not auth_future.done(): auth_future.set_result(query['code']) return web.Response(text="Authentication successful! You can close this window and return to the terminal.") return web.Response(text="No code found in callback.", status=400) app = web.Application() app.router.add_get('/callback', callback_handler) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() return runner async def authenticate_with_asgardeo(): """Performs the full OAuth2 Authorization Code flow.""" print("\n🔐 Initiating Asgardeo Authentication...") # 1. Create the Future INSIDE the async function (Event loop is running now) auth_code_future = asyncio.Future() code_verifier, code_challenge = generate_pkce() # 2. Start local server, passing the future to it runner = await start_callback_server(auth_code_future) # 3. Construct Auth URL params = { "response_type": "code", "client_id": CLIENT_ID, "redirect_uri": REDIRECT_URI, "scope": "openid profile internal_login", "code_challenge": code_challenge, "code_challenge_method": "S256" } auth_url = f"{AUTHORIZATION_ENDPOINT}?{urlencode(params)}" # 4. Open Browser print(f"👉 Opening browser: {auth_url}") webbrowser.open(auth_url) # 5. Wait for code try: print("⏳ Waiting for callback...") # Wait for the future to be set by the web server handler code = await asyncio.wait_for(auth_code_future, timeout=120) except asyncio.TimeoutError: raise Exception("Authentication timed out.") finally: await runner.cleanup() # 6. Exchange code for token print("🔄 Exchanging code for token...") async with httpx.AsyncClient() as client: response = await client.post( TOKEN_ENDPOINT, data={ "grant_type": "authorization_code", "code": code, "redirect_uri": REDIRECT_URI, "client_id": CLIENT_ID, "code_verifier": code_verifier } ) response.raise_for_status() token_data = response.json() print("✅ Authentication successful!") return token_data["access_token"] def classify_node(state: ChatbotState): """Node: Classify user intent""" last_message = state["messages"][-1].content classification_prompt = ChatPromptTemplate.from_messages([ ("system", """Classify the user's message into ONE category: - greeting - services - pricing - mcp_action (booking, cancellation, vaccination info, pet names) - general Respond ONLY with the category name."""), ("user", "{message}") ]) result = (classification_prompt | llm).invoke({"message": last_message}) intent = result.content.strip().lower() print(f"🎯 Node Classified: {intent}") return {"user_intent": intent} def extract_node(state: ChatbotState): """Node: Extract entities""" last_message = state["messages"][-1].content extraction_prompt = ChatPromptTemplate.from_messages([ ("system", """Extract JSON info (pet_name, pet_type, date, service_interest, etc). Return JSON only. If nothing found, return {{}}."""), ("user", "{message}") ]) # Simplified for brevity - add your regex cleaning logic here if needed result = (extraction_prompt | llm).invoke({"message": last_message}) try: import json # Basic cleanup to ensure JSON content = result.content.replace('```json', '').replace('```', '').strip() extracted = json.loads(content) except: extracted = {} print(f"📋 Node Extracted: {extracted}") return {"extracted_info": extracted} def greeting_node(state: ChatbotState): """Node: Handle Greetings""" msg = "🐾 Welcome to Pawsome Pet Care! I can help with services, booking, and vet info. How can I help?" return {"messages": [AIMessage(content=msg)]} def service_node(state: ChatbotState): """Node: Handle Services""" interest = state["extracted_info"].get("service_interest", "general services") # Include conversation context context_messages = state["messages"][:-1] # All messages except current context_str = "\n".join([f"{m.type}: {m.content}" for m in context_messages[-5:]]) prompt = f"Previous conversation:\n{context_str}\n\nUser is interested in {interest}. Summarize available services from: {KNOWLEDGE_BASE}" response = llm.invoke(prompt) return {"messages": [response]} def pricing_node(state: ChatbotState): """Node: Handle Pricing""" context_messages = state["messages"][:-1] context_str = "\n".join([f"{m.type}: {m.content}" for m in context_messages[-5:]]) prompt = f"Previous conversation:\n{context_str}\n\nSummarize pricing from: {KNOWLEDGE_BASE}" response = llm.invoke(prompt) return {"messages": [response]} def general_node(state: ChatbotState): """Node: General Q&A""" last_msg = state["messages"][-1].content # Include full conversation history for context all_messages = state["messages"] response = llm.invoke(all_messages) return {"messages": [response]} async def mcp_agent_node(state: ChatbotState): """Node: Execute MCP Tools""" print("🔧 Entering MCP Node") last_message = state["messages"][-1] tools = state["mcp_tools"] # Bind tools to LLM llm_with_tools = llm.bind_tools(tools) # Ask LLM what to do # We pass the conversation history to give it context response = await llm_with_tools.ainvoke(state["messages"]) # If LLM wants to call a tool if response.tool_calls: tool_outputs = [] for tool_call in response.tool_calls: tool_name = tool_call['name'] args = tool_call['args'] print(f"🛠️ Invoking {tool_name} with {args}") selected_tool = next((t for t in tools if t.name == tool_name), None) if selected_tool: try: # Execute Tool res = await selected_tool.ainvoke(args) # Create ToolMessage (Required for OpenAI tool flow) tool_outputs.append(ToolMessage( content=str(res), tool_call_id=tool_call['id'], name=tool_name )) except Exception as e: tool_outputs.append(ToolMessage( content=f"Error: {str(e)}", tool_call_id=tool_call['id'], name=tool_name )) # Append the assistant's tool call request AND the tool outputs to history # Then ask LLM to generate final natural language answer new_messages = [response] + tool_outputs # Get final response based on tool outputs # We verify the chain: History -> ToolCall -> ToolResult -> FinalAnswer final_chain = state["messages"] + new_messages final_response = await llm_with_tools.ainvoke(final_chain) # Return all new messages to update state return {"messages": new_messages + [final_response]} else: # LLM decided not to call a tool return {"messages": [response]} # --- GRAPH BUILDER --- def route_intent(state: ChatbotState): """Conditional Logic to determine next node""" intent = state["user_intent"] if intent == "greeting": return "greeting" if intent == "services": return "services" if intent == "pricing": return "pricing" if intent == "mcp_action": return "mcp_agent" return "general" def build_pet_care_graph(mcp_tools): """Constructs the LangGraph""" workflow = StateGraph(ChatbotState) # 1. Add Nodes workflow.add_node("classify", classify_node) workflow.add_node("extract", extract_node) workflow.add_node("greeting", greeting_node) workflow.add_node("services", service_node) workflow.add_node("pricing", pricing_node) workflow.add_node("general", general_node) workflow.add_node("mcp_agent", mcp_agent_node) # 2. Define Entry Point workflow.set_entry_point("classify") # 3. Add Edges # Classify -> Extract -> Router workflow.add_edge("classify", "extract") # Conditional Edges from Extract to specific handlers workflow.add_conditional_edges( "extract", route_intent, { "greeting": "greeting", "services": "services", "pricing": "pricing", "mcp_agent": "mcp_agent", "general": "general" } ) # 4. All handlers go to END workflow.add_edge("greeting", END) workflow.add_edge("services", END) workflow.add_edge("pricing", END) workflow.add_edge("general", END) workflow.add_edge("mcp_agent", END) return workflow.compile() # --- WEB SERVER --- async def handle_chat_request(request): """API: Runs the Compiled Graph""" try: data = await request.json() user_input = data.get("message", "") session_id = data.get("session_id", "default") if not user_input: return web.json_response({"error": "No message"}, status=400) # Get the compiled graph and tools from app context app_graph = request.app['graph'] mcp_tools = request.app['mcp_tools'] # Retrieve conversation history from memory if session_id not in conversation_memory: conversation_memory[session_id] = [] history = conversation_memory[session_id].copy() # Prepare Initial State with full history initial_state = { "messages": history + [HumanMessage(content=user_input)], "user_intent": "", "extracted_info": {}, "mcp_tools": mcp_tools, "session_id": session_id } # Run the Graph! print("⚡ Invoking Graph...") final_state = await app_graph.ainvoke(initial_state) # Get all new messages (everything after the history) new_messages = final_state["messages"][len(history):] # Extract the last AI message last_ai_message = None for msg in reversed(new_messages): if isinstance(msg, AIMessage): last_ai_message = msg.content break if not last_ai_message: last_ai_message = "I'm sorry, I couldn't process that request." # Update conversation memory with new user message and AI response conversation_memory[session_id].append(HumanMessage(content=user_input)) conversation_memory[session_id].append(AIMessage(content=last_ai_message)) # Keep only last 20 messages to prevent memory overflow if len(conversation_memory[session_id]) > 20: conversation_memory[session_id] = conversation_memory[session_id][-20:] return web.json_response({"response": last_ai_message, "session_id": session_id}) except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() return web.json_response({"error": str(e)}, status=500) async def run_interactive_chat(): print("=" * 60) print("🐾 PAWSOME PET CARE - GRAPH SERVER 🐾") print("=" * 60) # 1. Auth try: access_token = await authenticate_with_asgardeo() except Exception as e: print(f"Auth failed (continuing for testing): {e}") access_token = "test" # Remove for prod headers = {"Authorization": f"Bearer {access_token}"} # 2. Connect MCP & Build Graph async with streamablehttp_client(MCP_SERVER_URL, headers=headers) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() print("✅ MCP Connected") # Load tools tools = await load_mcp_tools(session) print(f"🛠️ Tools: {[t.name for t in tools]}") # BUILD THE GRAPH compiled_graph = build_pet_care_graph(tools) # 3. Start Web Server app = web.Application() app['graph'] = compiled_graph app['mcp_tools'] = tools app.router.add_post('/chat', handle_chat_request) app.router.add_get('/', lambda r: web.FileResponse('index.html')) # CORS Setup import aiohttp_cors cors = aiohttp_cors.setup(app, defaults={ "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", allow_methods="*" ) }) for route in list(app.router.routes()): cors.add(route) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) print("🚀 Server running at http://localhost:8080") await site.start() await asyncio.Event().wait() if __name__ == "__main__": if not CLIENT_ID: print("⚠️ Please set CLIENT_ID in .env") else: asyncio.run(run_interactive_chat())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Bin4yi/MCP_Authendication_example'

If you have feedback or need assistance with the MCP directory API, please join our Discord server