Skip to main content
Glama

Claude Slack

api_server.pyโ€ข19.3 kB
#!/usr/bin/env python3 """ FastAPI Server for Claude-Slack Provides REST API and SSE streaming for the unified claude-slack system. This server: 1. Wraps the existing Python API (no rewrite needed) 2. Provides REST endpoints for Next.js 3. Streams events via SSE 4. Acts as single writer to SQLite (no concurrency issues) 5. Bridges MCP tools via HTTP """ import os import sys import asyncio import time from contextlib import asynccontextmanager from typing import Dict, List, Optional, Any from datetime import datetime from fastapi import FastAPI, HTTPException, Query, Header, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from api.unified_api import ClaudeSlackAPI from api.models import DMPolicy, Discoverability # ============================================================================== # Pydantic Models for Request/Response # ============================================================================== class MessageCreate(BaseModel): """Request model for creating a message""" channel_id: str content: str sender_id: str sender_project_id: Optional[str] = None metadata: Optional[Dict[str, Any]] = None thread_id: Optional[str] = None class MessageUpdate(BaseModel): """Request model for updating a message""" content: Optional[str] = None metadata: Optional[Dict[str, Any]] = None is_edited: bool = True class ChannelCreate(BaseModel): """Request model for creating a channel""" name: str description: Optional[str] = None scope: str = "global" # "global" or "project" project_id: Optional[str] = None created_by: str created_by_project_id: Optional[str] = None is_default: bool = False class AgentRegister(BaseModel): """Request model for registering an agent""" name: str project_id: Optional[str] = None description: Optional[str] = None status: str = "active" dm_policy: str = "open" # open, restricted, closed discoverable: str = "public" # public, project, private class ChannelJoin(BaseModel): """Request model for joining a channel""" agent_name: str agent_project_id: Optional[str] = None class NoteCreate(BaseModel): """Request model for creating a note""" content: str agent_name: str agent_project_id: Optional[str] = None session_context: Optional[str] = None tags: Optional[List[str]] = None class SearchRequest(BaseModel): """Request model for search operations""" query: Optional[str] = None channel_ids: Optional[List[str]] = None project_ids: Optional[List[str]] = None limit: int = 50 ranking_profile: str = "balanced" # recent, quality, balanced, similarity metadata_filters: Optional[Dict[str, Any]] = None class MCPToolCall(BaseModel): """Request model for MCP tool invocations via HTTP""" tool_name: str params: Dict[str, Any] agent_name: str agent_project_id: Optional[str] = None # ============================================================================== # Application Lifecycle # ============================================================================== @asynccontextmanager async def lifespan(app: FastAPI): """ Manage application lifecycle - startup and shutdown. Ensures proper initialization and cleanup of resources. """ # Startup print("๐Ÿš€ Starting Claude-Slack API Server...") # Initialize API from environment or defaults app.state.api = ClaudeSlackAPI.from_env() await app.state.api.db.initialize() await app.state.api.events.start() print(f"โœ… API Server ready on http://localhost:8000") print(f"๐Ÿ“Š Database: {app.state.api.db.sqlite.db_path}") yield # Shutdown print("๐Ÿ›‘ Shutting down Claude-Slack API Server...") await app.state.api.events.stop() await app.state.api.db.close() print("โœ… Server shutdown complete") # ============================================================================== # FastAPI Application # ============================================================================== app = FastAPI( title="Claude-Slack API", description="Unified API for claude-slack messaging system with semantic search", version="5.0.0", lifespan=lifespan ) # Configure CORS for Next.js app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000", "http://localhost:3001"], # Next.js dev ports allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["X-Total-Count", "X-Event-Stream-Version"] ) # ============================================================================== # Health & Status Endpoints # ============================================================================== @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "timestamp": time.time(), "version": "5.0.0" } @app.get("/api/stats") async def get_stats(): """Get system statistics""" api: ClaudeSlackAPI = app.state.api # Get various stats event_stats = api.get_event_stats() # Get database stats via direct query db_stats = await api.db.sqlite.get_stats() return { "events": event_stats, "database": db_stats, "timestamp": time.time() } # ============================================================================== # Message Endpoints # ============================================================================== @app.get("/api/messages") async def get_messages( channel_id: Optional[str] = None, limit: int = Query(50, ge=1, le=500), offset: int = Query(0, ge=0), since: Optional[str] = None, before: Optional[str] = None ): """Get messages with optional filtering""" api: ClaudeSlackAPI = app.state.api messages = await api.get_messages( channel_id=channel_id, limit=limit, offset=offset, since=since, before=before ) return messages @app.post("/api/messages", status_code=201) async def create_message(message: MessageCreate): """Create a new message""" api: ClaudeSlackAPI = app.state.api msg_id = await api.send_message( channel_id=message.channel_id, sender_id=message.sender_id, sender_project_id=message.sender_project_id, content=message.content, metadata=message.metadata, thread_id=message.thread_id ) # Fetch and return the created message messages = await api.get_messages(message_ids=[msg_id]) return messages[0] if messages else {"id": msg_id} @app.put("/api/messages/{message_id}") async def update_message(message_id: int, update: MessageUpdate): """Update an existing message""" api: ClaudeSlackAPI = app.state.api success = await api.update_message( message_id=message_id, content=update.content, metadata=update.metadata, is_edited=update.is_edited ) if not success: raise HTTPException(status_code=404, detail="Message not found") return {"success": True, "message_id": message_id} @app.delete("/api/messages/{message_id}") async def delete_message(message_id: int): """Delete a message""" api: ClaudeSlackAPI = app.state.api success = await api.delete_message(message_id) if not success: raise HTTPException(status_code=404, detail="Message not found") return {"success": True, "message_id": message_id} # ============================================================================== # Search Endpoints # ============================================================================== @app.post("/api/search") async def search_messages(request: SearchRequest): """ Search messages with semantic search and filtering. Uses Qdrant for vector search when available. """ api: ClaudeSlackAPI = app.state.api results = await api.search_messages( query=request.query, channel_ids=request.channel_ids, project_ids=request.project_ids, limit=request.limit, ranking_profile=request.ranking_profile, metadata_filters=request.metadata_filters ) return results # ============================================================================== # Channel Endpoints # ============================================================================== @app.get("/api/channels") async def list_channels( agent_name: Optional[str] = None, project_id: Optional[str] = None, include_archived: bool = False, is_default: Optional[bool] = None ): """List channels with filtering""" api: ClaudeSlackAPI = app.state.api channels = await api.list_channels( agent_name=agent_name, project_id=project_id, include_archived=include_archived, is_default=is_default ) return channels @app.post("/api/channels", status_code=201) async def create_channel(channel: ChannelCreate): """Create a new channel""" api: ClaudeSlackAPI = app.state.api # Determine full channel ID based on scope if channel.scope == "project" and not channel.project_id: raise HTTPException( status_code=400, detail="Project ID required for project-scoped channels" ) channel_id = await api.create_channel( name=channel.name, description=channel.description, scope=channel.scope, project_id=channel.project_id, created_by=channel.created_by, created_by_project_id=channel.created_by_project_id, is_default=channel.is_default ) return {"channel_id": channel_id, "name": channel.name} @app.post("/api/channels/{channel_id}/join") async def join_channel(channel_id: str, request: ChannelJoin): """Join a channel""" api: ClaudeSlackAPI = app.state.api success = await api.join_channel( agent_name=request.agent_name, agent_project_id=request.agent_project_id, channel_id=channel_id ) if not success: raise HTTPException(status_code=400, detail="Failed to join channel") return {"success": True, "channel_id": channel_id} @app.post("/api/channels/{channel_id}/leave") async def leave_channel(channel_id: str, request: ChannelJoin): """Leave a channel""" api: ClaudeSlackAPI = app.state.api success = await api.leave_channel( agent_name=request.agent_name, agent_project_id=request.agent_project_id, channel_id=channel_id ) if not success: raise HTTPException(status_code=400, detail="Failed to leave channel") return {"success": True, "channel_id": channel_id} @app.get("/api/channels/{channel_id}/members") async def get_channel_members(channel_id: str): """Get members of a channel""" api: ClaudeSlackAPI = app.state.api members = await api.get_channel_members(channel_id) return members # ============================================================================== # Agent Endpoints # ============================================================================== @app.get("/api/agents") async def list_agents( project_id: Optional[str] = None, include_descriptions: bool = True ): """List all agents""" api: ClaudeSlackAPI = app.state.api agents = await api.list_agents( project_id=project_id, include_descriptions=include_descriptions ) return agents @app.post("/api/agents", status_code=201) async def register_agent(agent: AgentRegister): """Register a new agent""" api: ClaudeSlackAPI = app.state.api success = await api.register_agent( name=agent.name, project_id=agent.project_id, description=agent.description, status=agent.status, dm_policy=agent.dm_policy, discoverable=agent.discoverable ) if not success: raise HTTPException(status_code=400, detail="Failed to register agent") return {"success": True, "agent": agent.name} @app.get("/api/agents/{agent_name}") async def get_agent(agent_name: str, project_id: Optional[str] = None): """Get agent details""" api: ClaudeSlackAPI = app.state.api agent = await api.get_agent(agent_name, project_id) if not agent: raise HTTPException(status_code=404, detail="Agent not found") return agent # ============================================================================== # Notes Endpoints # ============================================================================== @app.post("/api/notes", status_code=201) async def create_note(note: NoteCreate): """Create a note""" api: ClaudeSlackAPI = app.state.api note_id = await api.notes.write_note( agent_name=note.agent_name, agent_project_id=note.agent_project_id, content=note.content, session_context=note.session_context, tags=note.tags ) return {"note_id": note_id} @app.get("/api/notes") async def get_notes( agent_name: str, agent_project_id: Optional[str] = None, limit: int = Query(20, ge=1, le=100), query: Optional[str] = None, tags: Optional[List[str]] = Query(None) ): """Get notes for an agent""" api: ClaudeSlackAPI = app.state.api if query or tags: notes = await api.notes.search_notes( agent_name=agent_name, agent_project_id=agent_project_id, query=query, tags=tags, limit=limit ) else: notes = await api.notes.get_recent_notes( agent_name=agent_name, agent_project_id=agent_project_id, limit=limit ) return notes # ============================================================================== # Event Streaming (SSE) # ============================================================================== @app.get("/api/events") async def stream_events( client_id: Optional[str] = Query(None), topics: Optional[str] = Query(None), # Comma-separated list last_event_id: Optional[str] = Header(None, alias="Last-Event-ID") ): """ Server-Sent Events endpoint for real-time updates. Topics can be: messages, channels, members, agents, notes, system, or * for all """ api: ClaudeSlackAPI = app.state.api # Generate client ID if not provided if not client_id: client_id = f"web_{datetime.now().timestamp()}" # Parse topics topic_list = None if topics: topic_list = [t.strip() for t in topics.split(",")] async def event_generator(): """Generate SSE formatted events""" try: async for sse_data in api.subscribe_sse( client_id=client_id, topics=topic_list, last_event_id=last_event_id ): yield sse_data except asyncio.CancelledError: # Client disconnected await api.unsubscribe_events(client_id) except Exception as e: # Send error event yield f"event: error\ndata: {{'error': '{str(e)}'}}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # Disable Nginx buffering "X-Event-Stream-Version": "2.0" } ) # ============================================================================== # MCP Bridge Endpoints # ============================================================================== @app.post("/api/mcp/tool") async def execute_mcp_tool(request: MCPToolCall): """ Bridge endpoint for MCP tools to call via HTTP. This allows MCP server to delegate to the FastAPI server, ensuring single writer to SQLite. """ api: ClaudeSlackAPI = app.state.api # Map tool names to API methods tool_map = { "send_message": api.send_message, "send_channel_message": api.send_message, "get_messages": api.get_messages, "search_messages": api.search_messages, "list_channels": api.list_channels, "create_channel": api.create_channel, "join_channel": api.join_channel, "leave_channel": api.leave_channel, "list_agents": api.list_agents, "register_agent": api.register_agent, "write_note": api.notes.write_note, "search_notes": api.notes.search_notes, "get_recent_notes": api.notes.get_recent_notes, "create_dm": api.create_or_get_dm_channel, "send_direct_message": api.send_direct_message, } if request.tool_name not in tool_map: raise HTTPException( status_code=400, detail=f"Unknown tool: {request.tool_name}" ) try: # Execute the tool with provided parameters result = await tool_map[request.tool_name](**request.params) return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} # ============================================================================== # Project & Session Management # ============================================================================== @app.get("/api/projects") async def list_projects(): """List all projects""" api: ClaudeSlackAPI = app.state.api projects = await api.list_projects() return projects @app.get("/api/projects/{project_id}") async def get_project(project_id: str): """Get project details""" api: ClaudeSlackAPI = app.state.api project = await api.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") return project @app.get("/api/projects/{project_id}/links") async def get_project_links(project_id: str): """Get linked projects""" api: ClaudeSlackAPI = app.state.api links = await api.get_linked_projects(project_id) return links @app.post("/api/sessions") async def register_session( agent_name: str, agent_project_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ): """Register a new session""" api: ClaudeSlackAPI = app.state.api session_id = await api.register_session( agent_name=agent_name, agent_project_id=agent_project_id, metadata=metadata ) return {"session_id": session_id} # ============================================================================== # Development Helpers # ============================================================================== if __name__ == "__main__": import uvicorn # Development server with auto-reload uvicorn.run( "api_server:app", host="0.0.0.0", port=8000, reload=True, log_level="info" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/theo-nash/claude-slack'

If you have feedback or need assistance with the MCP directory API, please join our Discord server