Skip to main content
Glama
chat.py6.22 kB
"""AI chat routes with SSE streaming and tool execution""" import json import uuid from typing import Optional, Dict, List, Any from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from ..database import get_db from ..schemas import ChatMessage from ..dependencies import get_optional_active_user from ..models import User from ..services.claude_service import ClaudeService, KNOWLEDGE_BASE_TOOLS from ..services.tool_executor import ToolExecutor router = APIRouter(prefix="/chat", tags=["chat"]) # In-memory conversation storage conversations: Dict[str, List[Dict[str, Any]]] = {} SYSTEM_PROMPT = """You are a helpful assistant with access to the user's knowledge base. Use the tools to search and retrieve notes to answer questions. When the user asks about their notes or knowledge base, use the available tools: - search_notes: Search for notes by query - get_note: Get full content of a specific note - create_note: Create a new note - update_note: Update an existing note - list_categories: List all categories with note counts Always be helpful and provide accurate information based on the user's knowledge base.""" @router.post("/") async def chat( message: ChatMessage, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_optional_active_user) ): """ Send a message to the AI assistant with SSE streaming The AI assistant can: - Search your knowledge base for relevant information - Answer questions about your notes - Help create, update, or organize notes - Provide summaries and insights Returns Server-Sent Events stream with chat chunks. """ # Generate or use existing conversation ID conversation_id = message.conversation_id or str(uuid.uuid4()) # Get or create conversation history if conversation_id not in conversations: conversations[conversation_id] = [] # Add user message to history conversations[conversation_id].append({ "role": "user", "content": message.message }) async def generate_stream(): try: # Initialize services claude_service = ClaudeService() tool_executor = ToolExecutor(db) # Send conversation_id first yield f"data: {json.dumps({'type': 'conversation_id', 'conversation_id': conversation_id})}\n\n" # Get current messages messages = conversations[conversation_id].copy() # Tool loop while True: # Call Claude API response = await claude_service.send_message( messages=messages, tools=KNOWLEDGE_BASE_TOOLS, system=SYSTEM_PROMPT, max_tokens=4096 ) # Process response content assistant_content = response["content"] text_response = "" tool_uses = [] for block in assistant_content: if hasattr(block, 'type'): if block.type == "text": text_response += block.text # Stream text chunks yield f"data: {json.dumps({'type': 'text', 'content': block.text})}\n\n" elif block.type == "tool_use": tool_uses.append({ "id": block.id, "name": block.name, "input": block.input }) # Add assistant message to history conversations[conversation_id].append({ "role": "assistant", "content": assistant_content }) messages = conversations[conversation_id].copy() # If no tool use, we're done if response["stop_reason"] != "tool_use" or not tool_uses: break # Execute tools and add results tool_results = [] for tool_use in tool_uses: # Notify about tool execution yield f"data: {json.dumps({'type': 'tool_call', 'tool': tool_use['name'], 'input': tool_use['input']})}\n\n" # Execute tool result = tool_executor.execute(tool_use["name"], tool_use["input"]) # Notify about tool result yield f"data: {json.dumps({'type': 'tool_result', 'tool': tool_use['name'], 'result': result})}\n\n" tool_results.append({ "type": "tool_result", "tool_use_id": tool_use["id"], "content": json.dumps(result) }) # Add tool results to conversation conversations[conversation_id].append({ "role": "user", "content": tool_results }) messages = conversations[conversation_id].copy() # Send done event yield f"data: {json.dumps({'type': 'done', 'conversation_id': conversation_id})}\n\n" except ValueError as e: yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n" except Exception as e: yield f"data: {json.dumps({'type': 'error', 'error': f'Internal error: {str(e)}'})}\n\n" return StreamingResponse( generate_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } ) @router.delete("/{conversation_id}") async def clear_conversation( conversation_id: str, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_optional_active_user) ): """ Clear a conversation history This removes the conversation from memory. Use this to start fresh. """ if conversation_id in conversations: del conversations[conversation_id] return {"message": "Conversation cleared"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cwente25/KnowledgeBaseMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server