"""Gradle wrapper interface for executing Gradle commands."""
import asyncio
import logging
import re
import select
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fastmcp import Context
# Configure logger for Gradle output
logger = logging.getLogger("gradle_mcp.gradle")
logger.setLevel(logging.DEBUG)
# Add handler to stderr if not already configured
if not logger.handlers:
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
@dataclass
class GradleProject:
"""Represents a Gradle project."""
name: str
path: str
description: str | None = None
@dataclass
class GradleTask:
"""Represents a Gradle task."""
name: str
project: str
description: str | None = None
group: str | None = None
@dataclass
class TaskWithDescription:
"""Represents a task with its description."""
name: str
description: str
@dataclass
class GroupedTasks:
"""Represents tasks grouped by their group name.
When include_descriptions is True, tasks is a list of TaskWithDescription.
When include_descriptions is False, tasks is a list of task names (strings).
"""
group: str
tasks: list[TaskWithDescription] | list[str]
class GradleWrapper:
"""Interface for executing Gradle commands using the Gradle wrapper."""
# Cleaning task patterns that should not be allowed in run_task
CLEANING_TASK_PATTERNS = [
r"^clean.*",
r".*clean$",
r"^cleanBuild.*",
r"^cleanTest.*",
]
# Allow-list of safe Gradle arguments that can be passed to run_task
# These are carefully selected to avoid command injection vulnerabilities
SAFE_GRADLE_ARGS = {
# Logging options
"--debug",
"-d",
"--info",
"-i",
"--warn",
"-w",
"--quiet",
"-q",
"--stacktrace",
"-s",
"--full-stacktrace",
"-S",
"--scan",
"--no-scan",
# Performance options
"--build-cache",
"--no-build-cache",
"--configure-on-demand",
"--no-configure-on-demand",
"--max-workers",
"--parallel",
"--no-parallel",
# Execution options
"--continue",
"--dry-run",
"-m",
"--refresh-dependencies",
"--rerun-tasks",
"--profile",
# Task exclusion (safe as it only limits what runs)
"-x",
"--exclude-task",
# Daemon options
"--daemon",
"--no-daemon",
"--foreground",
"--stop",
"--status",
}
# Dangerous arguments that should never be allowed
# These can lead to arbitrary code execution or file system access
DANGEROUS_GRADLE_ARGS = {
"--init-script",
"-I", # Can execute arbitrary Groovy/Kotlin code
"--project-prop",
"-P", # Can inject properties
"--system-prop",
"-D", # Can set system properties
"--settings-file",
"-c", # Can load arbitrary settings
"--build-file",
"-b", # Can load arbitrary build files
"--gradle-user-home",
"-g", # Can access arbitrary directories
"--project-dir",
"-p", # Can access arbitrary directories
"--include-build", # Can include arbitrary builds
"--write-verification-metadata", # Can write files to arbitrary locations
}
def __init__(self, project_root: str | None = None) -> None:
"""Initialize Gradle wrapper.
Args:
project_root: Root directory of the Gradle project. Defaults to current directory.
"""
self.project_root = Path(project_root or ".")
self.wrapper_script = self._find_wrapper_script()
def _find_wrapper_script(self) -> Path:
"""Find the Gradle wrapper script.
Returns:
Path to the gradlew script.
Raises:
FileNotFoundError: If Gradle wrapper is not found.
"""
gradle_wrapper = self.project_root / "gradlew"
if not gradle_wrapper.exists():
raise FileNotFoundError(
f"Gradle wrapper not found at {gradle_wrapper}. "
"Please ensure gradlew script exists in the project root."
)
return gradle_wrapper
def _is_cleaning_task(self, task: str) -> bool:
"""Check if a task is a cleaning task.
Args:
task: Task name to check.
Returns:
True if the task is a cleaning task, False otherwise.
"""
task_lower = task.lower()
for pattern in self.CLEANING_TASK_PATTERNS:
if re.match(pattern, task_lower):
return True
return False
def _validate_gradle_args(self, args: list[str], tasks: list[str] | None = None) -> None:
"""Validate that all provided Gradle arguments are safe.
This method prevents command injection by ensuring that only safe,
pre-approved arguments can be passed to Gradle. Any dangerous arguments
that could lead to arbitrary code execution or file system access are blocked.
Args:
args: List of arguments to validate.
Raises:
ValueError: If any dangerous or unknown arguments are detected.
"""
if not args:
return
i = 0
while i < len(args):
arg = args[i]
# Check if this is a dangerous argument
if arg in self.DANGEROUS_GRADLE_ARGS:
raise ValueError(
f"Argument '{arg}' is not allowed due to security concerns. "
f"It could enable arbitrary code execution or unauthorized file access."
)
# Check for dangerous arguments that might be prefix of a longer string
# This catches both --arg=value and -Xvalue patterns
for dangerous in self.DANGEROUS_GRADLE_ARGS:
if arg.startswith(dangerous + "=") or (
len(dangerous) == 2 and arg.startswith(dangerous) and len(arg) > 2
):
raise ValueError(
f"Argument '{arg}' is not allowed due to security concerns. "
f"It could enable arbitrary code execution or unauthorized file access."
)
# Check if this is a safe argument
if arg in self.SAFE_GRADLE_ARGS:
# Some arguments take values, skip the next arg if it doesn't start with -
if arg in {"--max-workers", "-x", "--exclude-task"}:
i += 1 # Skip next arg (the value)
if i < len(args) and args[i].startswith("-"):
i -= 1 # Actually it was another flag, don't skip
i += 1
continue
# Check for arguments with = syntax (e.g., --max-workers=4)
base_arg = arg.split("=")[0]
if base_arg in self.SAFE_GRADLE_ARGS:
i += 1
continue
# Special-case: allow --tests (and --tests=...) only when running test tasks
if base_arg == "--tests":
# Allow only if the requested tasks include at least one test task
if tasks:
def _contains_test_task(task_name: str) -> bool:
# consider ":module:test" or "test" as test tasks
parts = task_name.split(":")
return "test" in parts
if any(_contains_test_task(t) for t in tasks):
# If --tests is provided without '=' the next arg is its value
if arg == "--tests":
i += 1
if i < len(args) and args[i].startswith("-"):
i -= 1
i += 1
continue
# Not a test task -> reject
raise ValueError(
f"Argument '{arg}' is not in the allow-list of safe Gradle arguments. "
f"Allowed arguments are: {', '.join(sorted(self.SAFE_GRADLE_ARGS))}"
)
# Unknown argument - reject it for safety
raise ValueError(
f"Argument '{arg}' is not in the allow-list of safe Gradle arguments. "
f"Allowed arguments are: {', '.join(sorted(self.SAFE_GRADLE_ARGS))}"
)
def _extract_error_message(
self, stdout: str, stderr: str, default_message: str = "Task failed"
) -> str:
"""Extract comprehensive error message from Gradle output.
This method searches for failed tasks and captures all error details
by searching backwards from FAILURE: or BUILD FAILED markers.
Args:
stdout: Standard output from Gradle.
stderr: Standard error from Gradle.
default_message: Default message if no error markers found.
Returns:
Extracted error message with full context.
"""
# Combine stdout and stderr since Gradle splits output between them
# Task failures and error details go to stdout
# FAILURE: summary goes to stderr
combined_output = (
stdout + "\n" + stderr if stdout and stderr else (stdout or stderr or default_message)
)
error_lines = combined_output.strip().split("\n")
# Strategy: Find where actual errors start by searching backwards
# Gradle output structure for failures:
# 1. Failed tasks with their errors appear first (in stdout)
# 2. Then "FAILURE:" section with summaries (in stderr)
# 3. Finally "BUILD FAILED" summary (in stderr)
# We want to capture from the first failed task onwards
first_failure_idx = -1
failure_marker_idx = -1
build_failed_idx = -1
# Find key markers
for i, line in enumerate(error_lines):
if "FAILED" in line and "> Task" in line:
# Track first failed task
if first_failure_idx == -1:
first_failure_idx = i
if "FAILURE:" in line or "* What went wrong:" in line:
if failure_marker_idx == -1:
failure_marker_idx = i
if "BUILD FAILED" in line:
build_failed_idx = i
# If we found FAILURE: or BUILD FAILED, search backwards for the first task failure
if failure_marker_idx >= 0 or build_failed_idx >= 0:
marker_idx = failure_marker_idx if failure_marker_idx >= 0 else build_failed_idx
# Search backwards from the marker to find ALL failed tasks
# Keep updating first_failure_idx to get the earliest one
for i in range(marker_idx - 1, -1, -1):
line = error_lines[i]
if "FAILED" in line and "> Task" in line:
# Update to capture the earliest failed task
first_failure_idx = i
# Stop if we hit successful/skipped tasks (but not failed ones)
elif "> Task" in line and "FAILED" not in line:
# Hit a non-failed task (UP-TO-DATE, NO-SOURCE, FROM-CACHE, etc.)
# Stop searching backwards
break
elif any(marker in line for marker in ["Configuration cache", "BUILD SUCCESSFUL"]):
# Hit build start indicators (but NOT "Reusing configuration" which appears at the top)
break
# Use the first failure we found
if first_failure_idx >= 0:
return "\n".join(error_lines[first_failure_idx:])
# Fallback: include substantial context before BUILD FAILED or from the end
elif build_failed_idx >= 0:
start_idx = max(0, build_failed_idx - 100)
return "\n".join(error_lines[start_idx:])
else:
# Last resort: include last 50 lines
return "\n".join(error_lines[-50:]) if len(error_lines) > 50 else combined_output
def list_projects(self) -> list[GradleProject]:
"""List all Gradle projects in the workspace.
Returns:
List of GradleProject objects.
Raises:
subprocess.CalledProcessError: If Gradle command fails.
"""
try:
result = subprocess.run(
[str(self.wrapper_script), "projects", "-q"],
cwd=str(self.project_root),
capture_output=True,
text=True,
check=True,
)
projects = []
root_added = False
for line in result.stdout.split("\n"):
line = line.strip()
# Add root project (only once)
if "Root project" in line and not root_added:
projects.append(
GradleProject(
name=":", path=str(self.project_root), description="Root project"
)
)
root_added = True
continue
# Look for subproject lines like "+--- Project ':app'" or "Project ':app'"
# But skip if it's the root project line we already handled
if "Project '" in line and "Root project" not in line:
# Extract project name from various formats
match = re.search(r"Project '([^']+)'", line)
if match:
project_name = match.group(1)
# Skip root project if it appears again
if project_name != ":":
projects.append(
GradleProject(
name=project_name,
path=str(self.project_root),
)
)
return projects
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to list projects: {e.stderr}") from e
def list_tasks(
self,
project: str = ":",
include_descriptions: bool = True,
group: str | None = None,
) -> list[GroupedTasks]:
"""List all available tasks for a specific Gradle project.
Returns a nested structure grouped by task group. When include_descriptions
is True, each task includes its description. When False, only task names
are returned for a more compact response.
Args:
project: Project name (e.g., ':app'). Use ':' or empty string for root project.
include_descriptions: If True, include task descriptions. If False, return
only task names for a more compact response.
group: Optional group name to filter tasks. If provided, only tasks from
this group will be returned. Case-insensitive matching.
Returns:
List of GroupedTasks objects, each containing a group name and its tasks.
Tasks are either TaskWithDescription objects (if include_descriptions=True)
or plain task name strings (if include_descriptions=False).
Raises:
subprocess.CalledProcessError: If Gradle command fails.
"""
try:
# Use tasks --all to get all tasks including inherited ones
# For root project (: or empty), use just "tasks", for subprojects use "project:tasks"
is_root = project == ":" or project == "" or project is None
task_cmd = "tasks" if is_root else f"{project}:tasks"
result = subprocess.run(
[str(self.wrapper_script), task_cmd, "--all"],
cwd=str(self.project_root),
capture_output=True,
text=True,
check=True,
)
# Use dict to group tasks by group name, preserving order
groups: dict[str, list[TaskWithDescription] | list[str]] = {}
in_task_section = False
current_group: str | None = None
for line in result.stdout.split("\n"):
line_stripped = line.strip()
# Skip empty lines
if not line_stripped:
continue
# Look for task group headers (end with "tasks")
if line_stripped.endswith(" tasks") and line_stripped[0].isupper():
in_task_section = True
current_group = line_stripped.replace(" tasks", "").strip()
if current_group not in groups:
groups[current_group] = []
continue
# Skip separators and rules
if line_stripped.startswith("-") or "Pattern:" in line_stripped:
continue
# Stop at help text
if "To see all tasks" in line_stripped or line_stripped.startswith("BUILD"):
break
# Parse task lines when in a task section
if in_task_section and current_group is not None:
# Task lines format: "taskName - description"
task_match = re.match(r"^(\w+)\s+-\s+(.+)$", line_stripped)
if task_match:
task_name = task_match.group(1)
description = task_match.group(2)
if include_descriptions:
groups[current_group].append(
TaskWithDescription(name=task_name, description=description)
)
else:
groups[current_group].append(task_name)
# Also handle tasks without description
elif re.match(r"^(\w+)$", line_stripped):
task_name = line_stripped
if include_descriptions:
groups[current_group].append(
TaskWithDescription(name=task_name, description="")
)
else:
groups[current_group].append(task_name)
# Convert to list of GroupedTasks, optionally filtering by group name
result = []
for group_name, task_list in groups.items():
# Filter by group if specified (case-insensitive)
if group is not None and group_name.lower() != group.lower():
continue
result.append(GroupedTasks(group=group_name, tasks=task_list))
return result
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to list tasks for project {project}: {e.stderr}") from e
async def run_task(
self, tasks: str | list[str], args: list[str] | None = None, ctx: "Context | None" = None
) -> dict:
"""Run one or more Gradle tasks with real-time progress reporting.
Args:
tasks: Task(s) to run. Single string or list of task names.
args: Additional Gradle arguments. Daemon is enabled by default.
ctx: Optional FastMCP Context for progress reporting.
Returns:
Dictionary with 'success' (bool) and 'error' (str or None).
Raises:
ValueError: If any task is a cleaning task.
"""
# Normalize to list
task_list = tasks if isinstance(tasks, list) else [tasks]
# Check all tasks for cleaning patterns
for task in task_list:
if self._is_cleaning_task(task):
raise ValueError(
f"Task '{task}' is a cleaning task and cannot be run via run_task. "
"Please use the clean tool instead."
)
# Validate arguments to prevent command injection
if args:
self._validate_gradle_args(args, task_list)
# Build command with all tasks - daemon is enabled by default in Gradle
cmd = [str(self.wrapper_script)] + task_list
if args:
cmd.extend(args)
logger.info(f"Executing: {' '.join(cmd)}")
try:
process = subprocess.Popen(
cmd,
cwd=str(self.project_root),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout_lines: list[str] = []
stderr_lines: list[str] = []
# Pattern to match Gradle progress: <============-> 93% EXECUTING [19s]
progress_pattern = re.compile(r"(\d+)%")
# Read output in real-time using non-blocking approach
while process.poll() is None:
# Use select to check if data is available (Unix-only)
if sys.platform != "win32" and process.stdout and process.stderr:
readable, _, _ = select.select([process.stdout, process.stderr], [], [], 0.1)
for stream in readable:
line = stream.readline()
if line:
is_stderr = stream == process.stderr
if is_stderr:
stderr_lines.append(line)
logger.debug(line.rstrip())
else:
stdout_lines.append(line)
logger.debug(line.rstrip())
# Parse progress
if ctx and "%" in line:
match = progress_pattern.search(line)
if match:
progress = int(match.group(1))
await ctx.report_progress(progress=progress, total=100)
else:
# Fallback for Windows or if streams are not available
await asyncio.sleep(0.1)
# Read any remaining output after process completes
if process.stdout:
remaining_out = process.stdout.read()
if remaining_out:
stdout_lines.append(remaining_out)
for line in remaining_out.split("\n"):
if line.strip():
logger.debug(line.rstrip())
if process.stderr:
remaining_err = process.stderr.read()
if remaining_err:
stderr_lines.append(remaining_err)
for line in remaining_err.split("\n"):
if line.strip():
logger.debug(line.rstrip())
stdout = "".join(stdout_lines)
stderr = "".join(stderr_lines)
if process.returncode == 0:
logger.info(f"Task {task} completed successfully")
return {"success": True, "error": None}
else:
# Extract comprehensive error message using helper method
error_message = self._extract_error_message(stdout, stderr, "Task failed")
logger.error(f"Task {task} failed: {error_message}")
return {"success": False, "error": error_message}
except Exception as e:
logger.error(f"Task {task} failed with exception: {e}")
return {"success": False, "error": str(e)}
async def clean(self, project: str | None = None, ctx: "Context | None" = None) -> dict:
"""Run the clean task for a project.
Args:
project: Project path (e.g., ':app'). Use ':' or empty string or None for root project.
ctx: FastMCP context for progress reporting and logging.
Returns:
Dictionary with 'success', 'error' keys.
- success (bool): True if clean completed successfully
- error (str or None): Error message if clean failed, None otherwise
Raises:
subprocess.CalledProcessError: If Gradle command fails.
"""
# Root project if project is None, empty, or ":"
is_root = project is None or project == "" or project == ":"
project_arg = "" if is_root else f"{project}:"
# Build command - daemon is enabled by default
cmd = [str(self.wrapper_script), f"{project_arg}clean"]
logger.info(f"Executing: {' '.join(cmd)}")
try:
progress_pattern = re.compile(r"(\d+)%")
process = subprocess.Popen(
cmd,
cwd=str(self.project_root),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout_lines: list[str] = []
stderr_lines: list[str] = []
# Read output in real-time using non-blocking approach
while process.poll() is None:
# Use select to check if data is available (Unix-only)
if sys.platform != "win32" and process.stdout and process.stderr:
readable, _, _ = select.select([process.stdout, process.stderr], [], [], 0.1)
for stream in readable:
line = stream.readline()
if line:
is_stderr = stream == process.stderr
if is_stderr:
stderr_lines.append(line)
logger.error(line.rstrip())
else:
stdout_lines.append(line)
logger.debug(line.rstrip())
# Parse progress
if ctx and "%" in line:
match = progress_pattern.search(line)
if match:
progress = int(match.group(1))
await ctx.report_progress(progress=progress, total=100)
else:
# Fallback for Windows or if streams are not available
await asyncio.sleep(0.1)
# Read any remaining output after process completes
if process.stdout:
remaining_out = process.stdout.read()
if remaining_out:
stdout_lines.append(remaining_out)
for line in remaining_out.split("\n"):
if line.strip():
logger.debug(line.rstrip())
if process.stderr:
remaining_err = process.stderr.read()
if remaining_err:
stderr_lines.append(remaining_err)
for line in remaining_err.split("\n"):
if line.strip():
logger.error(line.rstrip())
stdout = "".join(stdout_lines)
stderr = "".join(stderr_lines)
if process.returncode == 0:
logger.info(f"Clean completed successfully for project {project or 'root'}")
return {"success": True, "error": None}
else:
# Extract comprehensive error message using helper method
error_message = self._extract_error_message(stdout, stderr, "Clean failed")
logger.error(f"Clean failed: {error_message}")
return {"success": False, "error": error_message}
except Exception as e:
logger.error(f"Clean failed with exception: {e}")
return {"success": False, "error": str(e)}