import os
import sys
import argparse
import requests
import uvicorn
from datetime import date, timedelta
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import Response
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.types import Tool, TextContent, EmbeddedResource, ImageContent
# --- 1. Startup Validation ---
TENANT_ID = os.getenv("AGILEDAY_TENANT_ID")
API_TOKEN = os.getenv("AGILEDAY_API_TOKEN")
if __name__ == "__main__":
if not TENANT_ID:
print("❌ ERROR: AGILEDAY_TENANT_ID is missing.", file=sys.stderr)
sys.exit(1)
if not API_TOKEN:
print("❌ ERROR: AGILEDAY_API_TOKEN is missing.", file=sys.stderr)
sys.exit(1)
# --- 2. Server Logic ---
server = Server("Agileday Competence Service")
BASE_URL = f"https://{TENANT_ID}.agileday.io/api" if TENANT_ID else ""
def get_headers():
return {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json"
}
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="find_experts_with_skill",
description="Finds employees who have a specific skill or competence.",
inputSchema={
"type": "object",
"properties": {
"skill_name": {"type": "string", "description": "Skill name (e.g. 'Python')"}
},
"required": ["skill_name"]
}
),
Tool(
name="get_employee_competence_profile",
description="Fetches the full skill profile for a specific employee.",
inputSchema={
"type": "object",
"properties": {
"name_query": {"type": "string", "description": "Partial name of the employee"}
},
"required": ["name_query"]
}
),
Tool(
name="list_organization_skills",
description="Retrieves a full list of all available skills/competences defined in the organization.",
inputSchema={
"type": "object",
"properties": {},
}
),
# NEW TOOL: Availability Checker
Tool(
name="check_staff_availability",
description="Checks who is free and views current allocations for a date range. Can filter by skill.",
inputSchema={
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "Start date in YYYY-MM-DD format"},
"end_date": {"type": "string", "description": "End date in YYYY-MM-DD format"},
"skill_filter": {"type": "string", "description": "Optional: Filter for employees with this skill (e.g. 'React')"},
"min_availability": {"type": "integer", "description": "Optional: Minimum free percentage to show (0-100). Default 0."}
},
"required": ["start_date", "end_date"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent | ImageContent | EmbeddedResource]:
if name == "find_experts_with_skill":
return [TextContent(type="text", text=find_experts_logic(arguments["skill_name"]))]
if name == "get_employee_competence_profile":
return [TextContent(type="text", text=profile_logic(arguments["name_query"]))]
if name == "list_organization_skills":
return [TextContent(type="text", text=list_skills_logic())]
# NEW HANDLER
if name == "check_staff_availability":
return [TextContent(type="text", text=check_availability_logic(
arguments["start_date"],
arguments["end_date"],
arguments.get("skill_filter"),
arguments.get("min_availability", 0)
))]
raise ValueError(f"Unknown tool: {name}")
# --- Helper Functions ---
def find_experts_logic(skill_name: str) -> str:
try:
url = f"{BASE_URL}/v1/employee"
response = requests.get(url, headers=get_headers(), timeout=15)
response.raise_for_status()
employees = response.json()
except Exception as e:
return f"Error: {str(e)}"
matches = []
search_term = skill_name.lower()
for emp in employees:
skills = emp.get('skills', [])
if not skills: continue
relevant = [s for s in skills if s.get('name') and search_term in s.get('name', '').lower()]
if relevant:
name = f"{emp.get('firstName', '')} {emp.get('lastName', '')}".strip()
title = emp.get('title', 'No Title')
details_list = []
for s in relevant:
prof = s.get('proficiency', '?')
# The API returns ISO duration (e.g. P1Y) or string. We pass it raw to the LLM.
exp = s.get('experience')
exp_str = f", Exp: {exp}" if exp else ""
details_list.append(f"{s['name']} (Lvl {prof}/5{exp_str})")
details = ", ".join(details_list)
matches.append(f"👤 **{name}** - {title}\n └─ {details}")
return "\n\n".join(matches) if matches else f"No experts found for '{skill_name}'."
def profile_logic(name_query: str) -> str:
try:
url = f"{BASE_URL}/v1/employee"
response = requests.get(url, headers=get_headers(), timeout=15)
response.raise_for_status()
employees = response.json()
except Exception as e:
return f"Error: {str(e)}"
results = []
search_term = name_query.lower()
for emp in employees:
first = emp.get('firstName', '').lower()
last = emp.get('lastName', '').lower()
if search_term in first or search_term in last:
name = f"{emp.get('firstName', '')} {emp.get('lastName', '')}"
skills = sorted(emp.get('skills', []), key=lambda x: x.get('proficiency', 0) or 0, reverse=True)
formatted_skills = []
for s in skills:
motivation = s.get('motivation', 0) or 0
icon = "🔥" if motivation > 8 else ""
formatted_skills.append(f"- {s.get('name')}: Level {s.get('proficiency')} {icon}")
skill_block = "\n".join(formatted_skills) or "No skills"
results.append(f"### {name}\n{skill_block}")
return "\n\n".join(results) if results else f"No employee found for '{name_query}'."
def list_skills_logic() -> str:
try:
url = f"{BASE_URL}/v1/skill"
response = requests.get(url, headers=get_headers(), timeout=15)
response.raise_for_status()
skills = response.json()
except Exception as e:
return f"Error fetching skills: {str(e)}"
if not skills:
return "No skills defined in the library."
categories = {}
for s in skills:
cat = s.get('category') or "Uncategorized"
if cat not in categories: categories[cat] = []
categories[cat].append(s.get('name', 'Unnamed Skill'))
output = ["**Organization Skill Library**\n"]
for cat in sorted(categories.keys()):
skill_names = sorted(categories[cat])
output.append(f"### {cat}")
output.append(", ".join(skill_names))
output.append("")
return "\n".join(output)
def check_availability_logic(start_date: str, end_date: str, skill_filter: str = None, min_availability: int = 0) -> str:
# 1. Fetch all employees first
try:
emp_url = f"{BASE_URL}/v1/employee"
emp_response = requests.get(emp_url, headers=get_headers(), timeout=15)
emp_response.raise_for_status()
all_employees = emp_response.json()
except Exception as e:
return f"Error fetching employees: {str(e)}"
# 2. Filter employees if skill_filter is active
relevant_employee_ids = set()
relevant_employees = {}
for emp in all_employees:
emp_id = emp.get('id')
if not emp_id: continue
# If skill filter applies, check skills
if skill_filter:
skills = emp.get('skills', [])
has_skill = any(skill_filter.lower() in s.get('name', '').lower() for s in skills)
if not has_skill:
continue
relevant_employee_ids.add(emp_id)
relevant_employees[emp_id] = emp
if not relevant_employee_ids:
return f"No employees found matching skill: {skill_filter}" if skill_filter else "No employees found."
# 3. Fetch allocations for the period
try:
alloc_url = f"{BASE_URL}/v1/allocation_reporting"
params = {"startDate": start_date, "endDate": end_date}
alloc_response = requests.get(alloc_url, headers=get_headers(), params=params, timeout=15)
alloc_response.raise_for_status()
allocations = alloc_response.json()
except Exception as e:
return f"Error fetching allocations: {str(e)}"
# 4. Aggregate allocations by employee
# Map: employee_id -> list of project allocations
emp_workload = {eid: [] for eid in relevant_employee_ids}
emp_total_alloc = {eid: 0.0 for eid in relevant_employee_ids}
for alloc in allocations:
eid = alloc.get('employeeId')
# Only process if this employee is in our filtered list
if eid in relevant_employee_ids:
# According to schema, 'allocation' is the percentage
percentage = alloc.get('allocation', 0)
project_name = alloc.get('projectName', 'Unknown Project')
emp_total_alloc[eid] += percentage
emp_workload[eid].append(f"{project_name} ({percentage}%)")
# 5. Format Output
output = []
output.append(f"**Availability Report ({start_date} to {end_date})**")
if skill_filter:
output.append(f"Filter: Skill matching '{skill_filter}'\n")
# Sort by availability (most free first)
sorted_eids = sorted(relevant_employee_ids, key=lambda x: emp_total_alloc[x])
for eid in sorted_eids:
total_load = emp_total_alloc[eid]
availability = 100 - total_load
# Skip if below requested availability
if availability < min_availability:
continue
emp = relevant_employees[eid]
full_name = f"{emp.get('firstName', '')} {emp.get('lastName', '')}".strip()
title = emp.get('title', 'No Title')
# Status Icon
if availability >= 80:
icon = "🟢" # Mostly free
elif availability >= 20:
icon = "🟡" # Partially booked
else:
icon = "🔴" # Busy
# Work details
work_desc = ", ".join(emp_workload[eid]) if emp_workload[eid] else "No allocations"
output.append(f"{icon} **{full_name}** ({title})")
output.append(f" Availability: **{availability:.0f}%** (Allocated: {total_load:.0f}%)")
output.append(f" Current Work: {work_desc}")
output.append("")
return "\n".join(output)
# --- 3. Transport Layer (Raw ASGI) ---
sse_transport = SseServerTransport("/mcp")
class MCPHandler:
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return
if scope["method"] == "POST":
await sse_transport.handle_post_message(scope, receive, send)
elif scope["method"] == "GET":
async with sse_transport.connect_sse(scope, receive, send) as streams:
await server.run(streams[0], streams[1], server.create_initialization_options())
else:
response = Response("Method Not Allowed", status_code=405)
await response(scope, receive, send)
mcp_asgi_app = MCPHandler()
routes = [
Route("/mcp", endpoint=mcp_asgi_app, methods=["GET", "POST"]),
Route("/sse", endpoint=mcp_asgi_app, methods=["GET"]),
Route("/messages", endpoint=mcp_asgi_app, methods=["POST"]),
Route("/health", endpoint=lambda r: Response("OK"))
]
starlette_app = Starlette(routes=routes)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--transport", default="stdio", choices=["stdio", "http"])
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
print(f"✅ Agileday Server initialized for tenant: {TENANT_ID}", file=sys.stderr)
if args.transport == "http":
print(f"📡 Starting Streamable HTTP Server", file=sys.stderr)
print(f" Endpoint: http://{args.host}:{args.port}/mcp", file=sys.stderr)
uvicorn.run(starlette_app, host=args.host, port=args.port)
else:
import asyncio
from mcp.server.stdio import stdio_server
async def run_stdio():
async with stdio_server() as streams:
await server.run(streams[0], streams[1], server.create_initialization_options())
print("🔌 Starting Stdio Server", file=sys.stderr)
asyncio.run(run_stdio())