Skip to main content
Glama
eficode
by eficode
agileday_server.py13.2 kB
import os import sys import argparse import requests import uvicorn from datetime import date, timedelta from starlette.applications import Starlette from starlette.routing import Route from starlette.responses import Response from mcp.server import Server from mcp.server.sse import SseServerTransport from mcp.types import Tool, TextContent, EmbeddedResource, ImageContent # --- 1. Startup Validation --- TENANT_ID = os.getenv("AGILEDAY_TENANT_ID") API_TOKEN = os.getenv("AGILEDAY_API_TOKEN") if __name__ == "__main__": if not TENANT_ID: print("❌ ERROR: AGILEDAY_TENANT_ID is missing.", file=sys.stderr) sys.exit(1) if not API_TOKEN: print("❌ ERROR: AGILEDAY_API_TOKEN is missing.", file=sys.stderr) sys.exit(1) # --- 2. Server Logic --- server = Server("Agileday Competence Service") BASE_URL = f"https://{TENANT_ID}.agileday.io/api" if TENANT_ID else "" def get_headers(): return { "Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json", "Accept": "application/json" } @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="find_experts_with_skill", description="Finds employees who have a specific skill or competence.", inputSchema={ "type": "object", "properties": { "skill_name": {"type": "string", "description": "Skill name (e.g. 'Python')"} }, "required": ["skill_name"] } ), Tool( name="get_employee_competence_profile", description="Fetches the full skill profile for a specific employee.", inputSchema={ "type": "object", "properties": { "name_query": {"type": "string", "description": "Partial name of the employee"} }, "required": ["name_query"] } ), Tool( name="list_organization_skills", description="Retrieves a full list of all available skills/competences defined in the organization.", inputSchema={ "type": "object", "properties": {}, } ), # NEW TOOL: Availability Checker Tool( name="check_staff_availability", description="Checks who is free and views current allocations for a date range. Can filter by skill.", inputSchema={ "type": "object", "properties": { "start_date": {"type": "string", "description": "Start date in YYYY-MM-DD format"}, "end_date": {"type": "string", "description": "End date in YYYY-MM-DD format"}, "skill_filter": {"type": "string", "description": "Optional: Filter for employees with this skill (e.g. 'React')"}, "min_availability": {"type": "integer", "description": "Optional: Minimum free percentage to show (0-100). Default 0."} }, "required": ["start_date", "end_date"] } ) ] @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent | ImageContent | EmbeddedResource]: if name == "find_experts_with_skill": return [TextContent(type="text", text=find_experts_logic(arguments["skill_name"]))] if name == "get_employee_competence_profile": return [TextContent(type="text", text=profile_logic(arguments["name_query"]))] if name == "list_organization_skills": return [TextContent(type="text", text=list_skills_logic())] # NEW HANDLER if name == "check_staff_availability": return [TextContent(type="text", text=check_availability_logic( arguments["start_date"], arguments["end_date"], arguments.get("skill_filter"), arguments.get("min_availability", 0) ))] raise ValueError(f"Unknown tool: {name}") # --- Helper Functions --- def find_experts_logic(skill_name: str) -> str: try: url = f"{BASE_URL}/v1/employee" response = requests.get(url, headers=get_headers(), timeout=15) response.raise_for_status() employees = response.json() except Exception as e: return f"Error: {str(e)}" matches = [] search_term = skill_name.lower() for emp in employees: skills = emp.get('skills', []) if not skills: continue relevant = [s for s in skills if s.get('name') and search_term in s.get('name', '').lower()] if relevant: name = f"{emp.get('firstName', '')} {emp.get('lastName', '')}".strip() title = emp.get('title', 'No Title') details_list = [] for s in relevant: prof = s.get('proficiency', '?') # The API returns ISO duration (e.g. P1Y) or string. We pass it raw to the LLM. exp = s.get('experience') exp_str = f", Exp: {exp}" if exp else "" details_list.append(f"{s['name']} (Lvl {prof}/5{exp_str})") details = ", ".join(details_list) matches.append(f"👤 **{name}** - {title}\n └─ {details}") return "\n\n".join(matches) if matches else f"No experts found for '{skill_name}'." def profile_logic(name_query: str) -> str: try: url = f"{BASE_URL}/v1/employee" response = requests.get(url, headers=get_headers(), timeout=15) response.raise_for_status() employees = response.json() except Exception as e: return f"Error: {str(e)}" results = [] search_term = name_query.lower() for emp in employees: first = emp.get('firstName', '').lower() last = emp.get('lastName', '').lower() if search_term in first or search_term in last: name = f"{emp.get('firstName', '')} {emp.get('lastName', '')}" skills = sorted(emp.get('skills', []), key=lambda x: x.get('proficiency', 0) or 0, reverse=True) formatted_skills = [] for s in skills: motivation = s.get('motivation', 0) or 0 icon = "🔥" if motivation > 8 else "" formatted_skills.append(f"- {s.get('name')}: Level {s.get('proficiency')} {icon}") skill_block = "\n".join(formatted_skills) or "No skills" results.append(f"### {name}\n{skill_block}") return "\n\n".join(results) if results else f"No employee found for '{name_query}'." def list_skills_logic() -> str: try: url = f"{BASE_URL}/v1/skill" response = requests.get(url, headers=get_headers(), timeout=15) response.raise_for_status() skills = response.json() except Exception as e: return f"Error fetching skills: {str(e)}" if not skills: return "No skills defined in the library." categories = {} for s in skills: cat = s.get('category') or "Uncategorized" if cat not in categories: categories[cat] = [] categories[cat].append(s.get('name', 'Unnamed Skill')) output = ["**Organization Skill Library**\n"] for cat in sorted(categories.keys()): skill_names = sorted(categories[cat]) output.append(f"### {cat}") output.append(", ".join(skill_names)) output.append("") return "\n".join(output) def check_availability_logic(start_date: str, end_date: str, skill_filter: str = None, min_availability: int = 0) -> str: # 1. Fetch all employees first try: emp_url = f"{BASE_URL}/v1/employee" emp_response = requests.get(emp_url, headers=get_headers(), timeout=15) emp_response.raise_for_status() all_employees = emp_response.json() except Exception as e: return f"Error fetching employees: {str(e)}" # 2. Filter employees if skill_filter is active relevant_employee_ids = set() relevant_employees = {} for emp in all_employees: emp_id = emp.get('id') if not emp_id: continue # If skill filter applies, check skills if skill_filter: skills = emp.get('skills', []) has_skill = any(skill_filter.lower() in s.get('name', '').lower() for s in skills) if not has_skill: continue relevant_employee_ids.add(emp_id) relevant_employees[emp_id] = emp if not relevant_employee_ids: return f"No employees found matching skill: {skill_filter}" if skill_filter else "No employees found." # 3. Fetch allocations for the period try: alloc_url = f"{BASE_URL}/v1/allocation_reporting" params = {"startDate": start_date, "endDate": end_date} alloc_response = requests.get(alloc_url, headers=get_headers(), params=params, timeout=15) alloc_response.raise_for_status() allocations = alloc_response.json() except Exception as e: return f"Error fetching allocations: {str(e)}" # 4. Aggregate allocations by employee # Map: employee_id -> list of project allocations emp_workload = {eid: [] for eid in relevant_employee_ids} emp_total_alloc = {eid: 0.0 for eid in relevant_employee_ids} for alloc in allocations: eid = alloc.get('employeeId') # Only process if this employee is in our filtered list if eid in relevant_employee_ids: # According to schema, 'allocation' is the percentage percentage = alloc.get('allocation', 0) project_name = alloc.get('projectName', 'Unknown Project') emp_total_alloc[eid] += percentage emp_workload[eid].append(f"{project_name} ({percentage}%)") # 5. Format Output output = [] output.append(f"**Availability Report ({start_date} to {end_date})**") if skill_filter: output.append(f"Filter: Skill matching '{skill_filter}'\n") # Sort by availability (most free first) sorted_eids = sorted(relevant_employee_ids, key=lambda x: emp_total_alloc[x]) for eid in sorted_eids: total_load = emp_total_alloc[eid] availability = 100 - total_load # Skip if below requested availability if availability < min_availability: continue emp = relevant_employees[eid] full_name = f"{emp.get('firstName', '')} {emp.get('lastName', '')}".strip() title = emp.get('title', 'No Title') # Status Icon if availability >= 80: icon = "🟢" # Mostly free elif availability >= 20: icon = "🟡" # Partially booked else: icon = "🔴" # Busy # Work details work_desc = ", ".join(emp_workload[eid]) if emp_workload[eid] else "No allocations" output.append(f"{icon} **{full_name}** ({title})") output.append(f" Availability: **{availability:.0f}%** (Allocated: {total_load:.0f}%)") output.append(f" Current Work: {work_desc}") output.append("") return "\n".join(output) # --- 3. Transport Layer (Raw ASGI) --- sse_transport = SseServerTransport("/mcp") class MCPHandler: async def __call__(self, scope, receive, send): if scope["type"] != "http": return if scope["method"] == "POST": await sse_transport.handle_post_message(scope, receive, send) elif scope["method"] == "GET": async with sse_transport.connect_sse(scope, receive, send) as streams: await server.run(streams[0], streams[1], server.create_initialization_options()) else: response = Response("Method Not Allowed", status_code=405) await response(scope, receive, send) mcp_asgi_app = MCPHandler() routes = [ Route("/mcp", endpoint=mcp_asgi_app, methods=["GET", "POST"]), Route("/sse", endpoint=mcp_asgi_app, methods=["GET"]), Route("/messages", endpoint=mcp_asgi_app, methods=["POST"]), Route("/health", endpoint=lambda r: Response("OK")) ] starlette_app = Starlette(routes=routes) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--transport", default="stdio", choices=["stdio", "http"]) parser.add_argument("--host", default="0.0.0.0") parser.add_argument("--port", type=int, default=8000) args = parser.parse_args() print(f"✅ Agileday Server initialized for tenant: {TENANT_ID}", file=sys.stderr) if args.transport == "http": print(f"📡 Starting Streamable HTTP Server", file=sys.stderr) print(f" Endpoint: http://{args.host}:{args.port}/mcp", file=sys.stderr) uvicorn.run(starlette_app, host=args.host, port=args.port) else: import asyncio from mcp.server.stdio import stdio_server async def run_stdio(): async with stdio_server() as streams: await server.run(streams[0], streams[1], server.create_initialization_options()) print("🔌 Starting Stdio Server", file=sys.stderr) asyncio.run(run_stdio())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eficode/mcp-agileday'

If you have feedback or need assistance with the MCP directory API, please join our Discord server