CrewAI Enterprise MCP Server

Official
import dotenv import os import httpx from mcp.server.fastmcp import FastMCP from typing import Any dotenv.load_dotenv() mcp = FastMCP("crewai_enterprise_server") CREWAI_ENTERPRISE_SERVER_URL = os.getenv("MCP_CREWAI_ENTERPRISE_SERVER_URL") CREWAI_ENTERPRISE_BEARER_TOKEN = os.getenv("MCP_CREWAI_ENTERPRISE_BEARER_TOKEN") @mcp.tool() async def kickoff_crew(inputs: dict[str, Any]) -> dict[str, Any]: """Start a new crew task Args: inputs: Dictionary containing the query and other input parameters Returns: Dictionary containing the crew task response. The response will contain the crew id which needs to be returned to check the status of the crew. """ async with httpx.AsyncClient() as client: response = await client.post( f"{CREWAI_ENTERPRISE_SERVER_URL}/kickoff", headers={ "Authorization": f"Bearer {CREWAI_ENTERPRISE_BEARER_TOKEN}", "Content-Type": "application/json", }, json={"inputs": inputs}, ) response_json = response.json() return response_json @mcp.tool() async def get_crew_status(crew_id: str) -> dict[str, Any]: """Get the status of a crew task Args: crew_id: The ID of the crew task to check Returns: Dictionary containing the crew task status """ async with httpx.AsyncClient() as client: response = await client.get( f"{CREWAI_ENTERPRISE_SERVER_URL}/status/{crew_id}", headers={ "Authorization": f"Bearer {CREWAI_ENTERPRISE_BEARER_TOKEN}", "Content-Type": "application/json", }, ) return response.json() if __name__ == "__main__": import mcp mcp.run()