Skip to main content
Glama
projects.py29.1 kB
"""Project-related tools""" import asyncio import json import sys from typing import List, Optional, Union from fastmcp import Context from ..server import mcp, api_client, init_api_client, detect_profile_from_context from ._base import build_params # Timeout for elicitation calls (in seconds) # If elicitation times out, we'll return an error asking for the parameter ELICITATION_TIMEOUT = 30.0 @mcp.tool() async def list_projects(ctx: Context, page_size: Optional[int] = None, include: Optional[str] = None, expand: Optional[str] = None) -> str: """List all projects in SD Elements""" global api_client if api_client is None: api_client = init_api_client() params = build_params({"page_size": page_size, "include": include, "expand": expand}) result = api_client.list_projects(params) return json.dumps(result, indent=2) @mcp.tool() async def get_project(ctx: Context, project_id: int, page_size: Optional[int] = None, include: Optional[str] = None, expand: Optional[str] = None) -> str: """Get details of a specific project. Use list_countermeasures to see countermeasures for a project, not this tool.""" global api_client if api_client is None: api_client = init_api_client() params = build_params({"page_size": page_size, "include": include, "expand": expand}) result = api_client.get_project(project_id, params) return json.dumps(result, indent=2) @mcp.tool() async def list_profiles(ctx: Context, page_size: Optional[int] = None) -> str: """List all available profiles in SD Elements""" global api_client if api_client is None: api_client = init_api_client() params = {"page_size": page_size} if page_size is not None else {} result = api_client.list_profiles(params) return json.dumps(result, indent=2) @mcp.tool() async def list_risk_policies(ctx: Context, page_size: Optional[int] = None) -> str: """List all available risk policies in SD Elements""" global api_client if api_client is None: api_client = init_api_client() params = {"page_size": page_size} if page_size is not None else {} result = api_client.list_risk_policies(params) return json.dumps(result, indent=2) @mcp.tool() async def get_risk_policy(ctx: Context, risk_policy_id: int, page_size: Optional[int] = None) -> str: """Get details of a specific risk policy""" global api_client if api_client is None: api_client = init_api_client() params = {"page_size": page_size} if page_size is not None else {} result = api_client.get_risk_policy(risk_policy_id, params) return json.dumps(result, indent=2) @mcp.tool() async def create_project( ctx: Context, application_id: int, name: Optional[str] = None, description: Optional[str] = None, phase_id: Optional[int] = None, profile_id: Optional[str] = None, ) -> str: """Create a new project in SD Elements. If name is not specified, prompts user to provide it. If profile is not specified, attempts to detect it from project name/description (e.g., 'mobile project' → Mobile profile). If detection fails, prompts user to select from available profiles.""" global api_client if api_client is None: api_client = init_api_client() # Elicitation for name if not provided if not name: try: name_result = await asyncio.wait_for( ctx.elicit( "What is the name of the project you want to create?", response_type=str ), timeout=ELICITATION_TIMEOUT ) if name_result.action != "accept": return json.dumps({"error": "Project creation cancelled: project name is required"}) name = name_result.data except asyncio.TimeoutError: return json.dumps({"error": "Elicitation timeout: project name is required. Please provide the 'name' parameter."}) # Elicitation for profile_id if not provided if not profile_id: profiles_response = api_client.list_profiles({"page_size": 1000}) profiles = profiles_response.get("results", []) if not profiles: return json.dumps({"error": "No profiles available. Cannot create project without a profile."}) # Try to detect profile from project name/description detected_profile_id = detect_profile_from_context(name, description or "", profiles) if detected_profile_id: # Profile detected automatically - use it profile_id = detected_profile_id else: # No profile detected - prompt user to select from available profiles profile_options = [] profile_id_map = {} default_profile_id = None for profile in profiles: profile_name_val = profile.get("name", "Unnamed Profile") profile_id_val = profile.get("id") profile_options.append(profile_name_val) profile_id_map[profile_name_val] = profile_id_val # Track default profile to highlight it in the prompt if profile.get("default", False): default_profile_id = profile_id_val # Always prompt user to select a profile when detection fails prompt_message = "Select a profile:" if default_profile_id: # Find default profile name for the prompt default_profile_name = next( (name for name, pid in profile_id_map.items() if pid == default_profile_id), None ) if default_profile_name: prompt_message = f"Select a profile (default: {default_profile_name}):" try: profile_result = await asyncio.wait_for( ctx.elicit(prompt_message, response_type=profile_options), timeout=ELICITATION_TIMEOUT ) if profile_result.action != "accept": return json.dumps({"error": "Project creation cancelled: profile selection is required"}) profile_id = profile_id_map.get(profile_result.data) if not profile_id: return json.dumps({"error": f"Could not find profile ID for selection: {profile_result.data}"}) except asyncio.TimeoutError: return json.dumps({"error": "Elicitation timeout: profile selection is required. Please provide the 'profile_id' parameter."}) # Ensure profile_id is set - API requires it if not profile_id: return json.dumps({"error": "Profile is required but could not be determined. Please specify a profile_id."}) data = {"name": name, "application": application_id} if description: data["description"] = description if phase_id: data["phase_id"] = phase_id # Profile is required, so always include it data["profile"] = profile_id result = api_client.create_project(data) return json.dumps(result, indent=2) @mcp.tool() async def update_project(ctx: Context, project_id: int, name: Optional[str] = None, description: Optional[str] = None, status: Optional[str] = None, risk_policy: Optional[Union[int, str]] = None) -> str: """Update an existing project (name, description, status, or risk_policy). Use when user says 'update', 'change', 'modify', or 'rename'. Do NOT use for 'archive', 'delete', or 'remove' - use delete_project instead. IMPORTANT: risk_policy must be the numeric ID of the risk policy (e.g., 1, 2, 3), not the name. Use list_risk_policies to find the correct ID. According to the API documentation (https://docs.sdelements.com/master/api/docs/projects/), risk_policy is an optional field that accepts the ID of the Risk Policy that applies to this project.""" global api_client if api_client is None: api_client = init_api_client() # Validate risk_policy is an integer if provided if risk_policy is not None: # Handle string-to-int conversion (MCP framework may pass as string) if isinstance(risk_policy, str): try: risk_policy = int(risk_policy) except ValueError: return json.dumps({ "error": f"risk_policy must be an integer ID, got string that cannot be converted: {risk_policy}", "suggestion": "Use list_risk_policies to find the correct risk policy ID (numeric value)" }, indent=2) elif not isinstance(risk_policy, int): return json.dumps({ "error": f"risk_policy must be an integer ID, got {type(risk_policy).__name__}: {risk_policy}", "suggestion": "Use list_risk_policies to find the correct risk policy ID (numeric value)" }, indent=2) data = {} if name is not None: data["name"] = name if description is not None: data["description"] = description if status is not None: data["status"] = status if risk_policy is not None: data["risk_policy"] = risk_policy if not data: return json.dumps({"error": "No update data provided. Specify at least one field to update (name, description, status, or risk_policy)."}, indent=2) try: result = api_client.update_project(project_id, data) return json.dumps(result, indent=2) except Exception as e: error_msg = str(e) # Check if it's a risk_policy related error if "risk_policy" in error_msg.lower() or "risk policy" in error_msg.lower(): return json.dumps({ "error": f"Failed to update risk_policy: {error_msg}", "suggestion": "Verify the risk_policy ID exists using list_risk_policies. Risk policy must be a valid numeric ID." }, indent=2) return json.dumps({"error": f"Failed to update project: {error_msg}"}, indent=2) @mcp.tool() async def delete_project(ctx: Context, project_id: int) -> str: """Delete a project. Use when user says 'delete', 'remove', 'archive', or wants to permanently remove a project. Do NOT use update_project for archiving.""" global api_client if api_client is None: api_client = init_api_client() result = api_client.delete_project(project_id) return json.dumps(result, indent=2) @mcp.tool() async def create_project_from_code( ctx: Context, project_name: Optional[str] = None, application_id: Optional[int] = None, application_name: Optional[str] = None, business_unit_id: Optional[int] = None, business_unit_name: Optional[str] = None, project_description: Optional[str] = None, profile_id: Optional[str] = None, reuse_existing_project: bool = False, application_description: Optional[str] = None, ) -> str: """ Create application and project in SD Elements. Returns the project survey structure with all available questions and answers. IMPORTANT: Before determining survey answers, the AI client MUST search the workspace codebase for evidence of survey answers and security requirements. Analyze code files, configuration files, documentation, and comments to identify implementations that correspond to survey answers. Look for patterns such as: - Security controls and countermeasures mentioned in survey responses - Configuration settings related to survey answers (authentication, encryption, etc.) - Code comments referencing security requirements - Test files that validate survey-related implementations - Documentation that maps to survey answer topics - Dependency files (package.json, requirements.txt, etc.) indicating technologies - Framework and library usage that informs survey answers Search across all file types including source code, config files, README files, and test files. When evidence is found, extract relevant code snippets, file paths, and line numbers to inform survey answer selection. After gathering evidence from the codebase, review the survey structure, determine appropriate answers based on both the project context AND the evidence found in the codebase, set them using add_survey_answers_by_text or set_project_survey_by_text, and then commit the survey draft using commit_survey_draft to publish the survey and generate countermeasures. """ global api_client if api_client is None: api_client = init_api_client() try: # Elicitation 1: Project name if not project_name: try: name_result = await asyncio.wait_for( ctx.elicit( "What is the name of the project you want to create?", response_type=str ), timeout=ELICITATION_TIMEOUT ) if name_result.action != "accept": return json.dumps({"error": "Project creation cancelled: project name is required"}) project_name = name_result.data except asyncio.TimeoutError: return json.dumps({"error": "Elicitation timeout: project name is required. Please provide the 'project_name' parameter."}) # Elicitation 2: Application selection application_id_resolved = application_id app_result = None application_was_existing = False if not application_id_resolved and not application_name: # Get available applications for elicitation apps_response = api_client.list_applications({"page_size": 100}) apps = apps_response.get("results", []) if not apps: # No apps available, must create new try: app_name_result = await asyncio.wait_for( ctx.elicit( "No existing applications found. What name would you like for the new application?", response_type=str ), timeout=ELICITATION_TIMEOUT ) if app_name_result.action != "accept": return json.dumps({"error": "Project creation cancelled: application name is required"}) application_name = app_name_result.data except asyncio.TimeoutError: return json.dumps({"error": "Elicitation timeout: application name is required. Please provide the 'application_name' parameter."}) else: # Create a list of application options for selection app_options = [] app_id_map = {} for app in apps[:50]: # Limit to 50 for selection list app_display_name = app.get("name", "Unnamed Application") bu_name = app.get("business_unit", {}).get("name", "Unknown") if isinstance(app.get("business_unit"), dict) else "Unknown" display_text = f"{app_display_name} ({bu_name})" app_options.append(display_text) app_id_map[display_text] = app.get("id") # Add option to create new application app_options.append("Create a new application...") try: app_choice_result = await asyncio.wait_for( ctx.elicit( "Please select an existing application or choose to create a new one:", response_type=app_options # List of strings for selection ), timeout=ELICITATION_TIMEOUT ) if app_choice_result.action != "accept": return json.dumps({"error": "Project creation cancelled: application selection is required"}) selected_option = app_choice_result.data if selected_option == "Create a new application...": # User wants to create new - ask for name try: app_name_result = await asyncio.wait_for( ctx.elicit( "What name would you like for the new application?", response_type=str ), timeout=ELICITATION_TIMEOUT ) if app_name_result.action != "accept": return json.dumps({"error": "Project creation cancelled: application name is required"}) application_name = app_name_result.data except asyncio.TimeoutError: return json.dumps({"error": "Elicitation timeout: application name is required. Please provide the 'application_name' parameter."}) except asyncio.TimeoutError: return json.dumps({"error": "Elicitation timeout: application selection is required. Please provide the 'application_id' or 'application_name' parameter."}) else: # User selected an existing application application_id_resolved = app_id_map.get(selected_option) if not application_id_resolved: return json.dumps({"error": f"Could not find application ID for selection: {selected_option}"}) # Handle application creation/resolution if not application_id_resolved: if application_name: # Check if app exists apps_response = api_client.list_applications({"page_size": 1000}) apps = apps_response.get("results", []) existing_app = None for app in apps: if app.get("name", "").strip().lower() == application_name.strip().lower(): existing_app = app break if existing_app: application_id_resolved = existing_app.get("id") app_result = existing_app application_was_existing = True else: # Create new application business_unit_id_resolved = business_unit_id if not business_unit_id_resolved: if business_unit_name: bus_response = api_client.list_business_units({"page_size": 1000}) business_units = bus_response.get("results", []) for bu in business_units: if bu.get("name", "").strip().lower() == business_unit_name.strip().lower(): business_unit_id_resolved = bu.get("id") break if not business_unit_id_resolved: try: current_user = api_client.get_current_user() user_business_unit = current_user.get("business_unit") if user_business_unit: business_unit_id_resolved = user_business_unit.get("id") if isinstance(user_business_unit, dict) else user_business_unit except: pass if not business_unit_id_resolved: bus_response = api_client.list_business_units({"page_size": 1000}) business_units = bus_response.get("results", []) if business_units: business_unit_id_resolved = business_units[0].get("id") if not business_unit_id_resolved: return json.dumps({"error": "Cannot create application: No business unit found"}) app_data = {"name": application_name, "business_unit": business_unit_id_resolved} if application_description: app_data["description"] = application_description app_result = api_client.create_application(app_data) application_id_resolved = app_result.get("id") else: return json.dumps({"error": "Either application_id or application_name must be provided"}) else: application_was_existing = True # Elicitation 3: Profile selection profile_id_resolved = profile_id profile_detected = False profile_name = None if not profile_id_resolved: # Try to detect profile profiles_response = api_client.list_profiles({"page_size": 1000}) profiles = profiles_response.get("results", []) if profiles: detected_profile_id = detect_profile_from_context( project_name, project_description or "", profiles ) if detected_profile_id: profile_id_resolved = detected_profile_id profile_detected = True for profile in profiles: if profile.get("id") == profile_id_resolved: profile_name = profile.get("name", "Unknown") break else: # Elicitation: Ask user to select profile from a list profile_options = [] profile_id_map = {} for profile in profiles: profile_name_val = profile.get("name", "Unnamed Profile") profile_id_val = profile.get("id") profile_options.append(profile_name_val) profile_id_map[profile_name_val] = profile_id_val try: profile_result = await asyncio.wait_for( ctx.elicit( "Select a profile:", response_type=profile_options ), timeout=ELICITATION_TIMEOUT ) if profile_result.action != "accept": return json.dumps({"error": "Project creation cancelled: profile selection is required"}) selected_profile = profile_result.data profile_id_resolved = profile_id_map.get(selected_profile) if not profile_id_resolved: return json.dumps({"error": f"Could not find profile ID for selection: {selected_profile}"}) except asyncio.TimeoutError: return json.dumps({"error": "Elicitation timeout: profile selection is required. Please provide the 'profile_id' parameter."}) else: return json.dumps({"error": "No profiles available. Cannot create project without a profile."}) # Check for existing project existing_project = None try: projects_response = api_client.list_projects({"page_size": 1000}) projects = projects_response.get("results", []) for proj in projects: proj_app = proj.get("application") proj_app_id = proj_app.get("id") if isinstance(proj_app, dict) else proj_app if (proj_app_id == application_id_resolved and proj.get("name", "").strip().lower() == project_name.strip().lower()): existing_project = proj break except Exception as list_error: print(f"Warning: Could not list existing projects: {list_error}", file=sys.stderr) if existing_project: if reuse_existing_project: project_result = existing_project project_id = project_result.get("id") project_was_existing = True else: return json.dumps({ "error": f"A project with the name '{project_name}' already exists in this application (ID: {existing_project.get('id')}).", "existing_project_id": existing_project.get("id"), "suggestion": "Either provide a different project_name, or set reuse_existing_project=true to reuse the existing project." }) else: # Create project project_data = { "name": project_name, "application": application_id_resolved } if project_description: project_data["description"] = project_description if profile_id_resolved: project_data["profile"] = profile_id_resolved project_result = api_client.create_project(project_data) project_id = project_result.get("id") project_was_existing = False # Get survey structure survey_structure = api_client.get_project_survey(project_id) if api_client._library_answers_cache is None: api_client.load_library_answers() available_answers_summary = [] if api_client._library_answers_cache: for answer in api_client._library_answers_cache: available_answers_summary.append({ 'id': answer.get('id'), 'text': answer.get('text', ''), 'question': answer.get('display_text', ''), 'section': answer.get('section', '') }) draft_state = None selected_answers_count = 0 try: draft_state = api_client.get(f'projects/{project_id}/survey/draft/') selected_answers = [a for a in draft_state.get('answers', []) if a.get('selected', False)] selected_answers_count = len(selected_answers) except Exception as draft_error: draft_state = {"error": str(draft_error)} result = { "success": True, "application": { "id": application_id_resolved, "name": app_result.get("name") if app_result else application_name or "unknown", "was_existing": application_was_existing }, "project": { "id": project_id, "name": project_result.get("name"), "url": project_result.get("url"), "was_existing": project_was_existing }, "profile": { "id": profile_id_resolved, "name": profile_name, "detected": profile_detected } if profile_id_resolved else None, "survey_structure": { "note": "This contains all available survey questions and answers. Review these options and use your AI knowledge to determine which answers are appropriate for this project.", "total_questions": len([q for s in survey_structure.get('sections', []) for q in s.get('questions', [])]), "total_answers": len(available_answers_summary), "available_answers": available_answers_summary[:100], "full_survey": survey_structure }, "survey_draft_state": { "selected_answers_count": selected_answers_count, "has_answers": selected_answers_count > 0, "draft_available": draft_state is not None and "error" not in draft_state }, "next_steps": { "step_1": "Review the survey_structure to see all available questions and answers", "step_2": "Use your AI knowledge to determine appropriate answers based on the project context", "step_3": "Call add_survey_answers_by_text or set_project_survey_by_text to set the answers", "step_4": "Call commit_survey_draft to publish the survey and generate countermeasures", "important": "The survey draft is NOT committed automatically. You must commit it after setting answers." } } return json.dumps(result, indent=2) except Exception as e: return json.dumps({ "error": str(e), "error_type": type(e).__name__ })

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/geoffwhittington/sde-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server