MCPunk
by jurasofish
- mcpunk
import inspect
import json
import logging
import pathlib
import textwrap
from collections.abc import Sequence
from typing import Annotated, Any, Literal, Self, assert_never
import mcp.types as mcp_types
from fastmcp import FastMCP
from git import Repo
from pydantic import (
BaseModel,
ConfigDict,
Field,
model_validator,
)
from pydantic_core import to_jsonable_python
from mcpunk.dependencies import deps
from mcpunk.file_breakdown import (
File,
)
from mcpunk.file_breakdown import (
Project as FileBreakdownProject,
)
from mcpunk.file_chunk import Chunk
from mcpunk.git_analysis import get_recent_branches
from mcpunk.util import create_file_tree, log_inputs_outputs
logger = logging.getLogger(__name__)
PROJECTS: dict[str, "ToolProject"] = {}
mcp = FastMCP("Code Analysis")
ToolResponseSingleItem = mcp_types.TextContent | mcp_types.ImageContent | mcp_types.EmbeddedResource
ToolResponseSequence = Sequence[ToolResponseSingleItem]
ToolResponse = ToolResponseSequence | ToolResponseSingleItem
FilterType = Annotated[
str | list[str] | None,
Field(
description=(
"Match if any of these strings appear. Match all if None/null. "
"Single empty string or empty list will match all."
),
),
]
class ToolProject(BaseModel):
"""A project containing files split into chunks and so on.
These are created by the `configure_project` tool, and can be referenced by name
(which is the key in the `PROJECTS` global dict) when calling other tools.
"""
chunk_project: FileBreakdownProject
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
def root(self) -> pathlib.Path:
return self.chunk_project.root
@property
def git_path(self) -> pathlib.Path:
if str(self.root).endswith(".git"):
git_dir_path = self.root
else:
git_dir_path = self.root / ".git"
if not git_dir_path.exists():
raise ValueError(f"git dir not found at {git_dir_path}")
return git_dir_path
class ProjectFile(BaseModel):
project_name: str
rel_path: Annotated[pathlib.Path, Field(description="Relative to project root")]
@property
def project(self) -> ToolProject:
return _get_project_or_error(self.project_name)
@property
def abs_path(self) -> pathlib.Path:
return self.project.chunk_project.root / self.rel_path
@property
def file(self) -> File:
abs_path = self.abs_path
matching_files = [f for f in self.project.chunk_project.files if f.abs_path == abs_path]
if len(matching_files) != 1:
raise ValueError(f"File {self.abs_path} not found in project {self.project_name}")
return matching_files[0]
@model_validator(mode="after")
def validate_misc(self) -> Self:
assert self.project is not None
assert self.file is not None
return self
class MCPToolOutput(BaseModel):
"""The output of a tool.
You can specify any of the items in here, and they will all be rendered and
returned to the client. If you specify NOTHING then the default response
will be returned.
"""
is_error: bool = False
# Anything that pydantic core to_jsonable_python can handle - that's a lot of stuff!
jsonable: Any | None = None
raw: ToolResponse | None = None
text: str | None = None
# You might like this set to 2/4 for debugging, makes things look nice!
# But that means more token usage I guess.
# an int will do what you expect. None for compact. If unset, will default to
# the value from settings.
indent: int | Literal["no_indent"] = Field(
default_factory=lambda: deps.settings().default_response_indent,
)
default_response: str = "No response provided. This is not an error."
# If the sum of the length of all text responses is greater than this
# then an error will be returned to the caller. non-text responses (image, etc)
# are not counted.
max_chars: int = Field(
default_factory=lambda: deps.settings().default_response_max_chars,
)
# Whether to include the number of characters in the response in the response.
# So like `[DEBUG INFO: Response is 1234 chars]` prefixed to the response.
include_chars_in_response: bool = Field(
default_factory=lambda: deps.settings().include_chars_in_response,
)
def render(self) -> ToolResponse:
indent: int | None
if self.indent == "no_indent":
indent = None
elif isinstance(self.indent, int):
assert isinstance(self.indent, int)
indent = self.indent
else:
assert_never(self.indent)
assert indent is None or isinstance(indent, int)
out: list[ToolResponseSingleItem] = []
if self.is_error:
out.append(mcp_types.TextContent(type="text", text="An error occurred."))
if self.jsonable is not None:
logger.debug(
"Jsonable response\n"
+ textwrap.indent(json.dumps(to_jsonable_python(self.jsonable), indent=2), " "),
)
out.append(
mcp_types.TextContent(
type="text",
text=json.dumps(to_jsonable_python(self.jsonable), indent=indent),
),
)
if self.raw is not None:
if isinstance(self.raw, ToolResponseSingleItem):
out.append(self.raw)
elif isinstance(self.raw, Sequence):
assert all(isinstance(x, ToolResponseSingleItem) for x in self.raw)
out.extend(self.raw)
else:
assert_never(self.raw)
if self.text is not None:
out.append(mcp_types.TextContent(type="text", text=self.text))
if len(out) == 0:
# Use default response if no data was provided
assert not self.is_error # Don't want to say there's an error if there was!
out.append(mcp_types.TextContent(type="text", text=self.default_response))
total_chars = sum(len(x.text) for x in out if isinstance(x, mcp_types.TextContent))
if total_chars > self.max_chars:
msg = (
f"Response is {total_chars} chars which exceed the maximum allowed "
f"of {self.max_chars}. Please adjust your request and try again."
)
logger.warning(msg)
out = [mcp_types.TextContent(type="text", text=msg)]
if self.include_chars_in_response:
len_msg = f"[DEBUG INFO: Response is {total_chars} chars]"
if len(out) == 1 and isinstance(out[0], mcp_types.TextContent):
out[0].text = f"{len_msg}\n\n{out[0].text}"
else:
out.insert(
0,
mcp_types.TextContent(type="text", text=f"Response is {total_chars} chars"),
)
final_out: ToolResponse
if len(out) == 1:
final_out = out[0]
else:
final_out = out
# logger.debug(f"Response {final_out}")
logger.debug(
"Final response\n"
+ textwrap.indent(json.dumps(to_jsonable_python(final_out), indent=2), " "),
)
return final_out
@mcp.tool()
@log_inputs_outputs()
def get_a_joke(animal: Annotated[str, Field(max_length=20)]) -> ToolResponse:
"""Get a really funny joke! For testing :)"""
return MCPToolOutput(
text=(
f"Why did the {animal} cross the road?\n"
f"To get to the other side!\n"
f"Because it was a {animal}."
),
).render()
@mcp.tool()
@log_inputs_outputs()
def configure_project(
root_path: Annotated[pathlib.Path, Field(description="Root path of the project")],
project_name: Annotated[
str,
Field(
description=(
"Name of the project, for you to pick buddy, "
"something short and sweet and memorable and unique"
),
),
],
) -> ToolResponse:
"""Configure a new project containing files.
Each file in the project is split into 'chunks' - logical sections like functions,
classes, markdown sections, and import blocks.
After configuring, a common workflow is:
1. list_all_files_in_project to get an overview of the project (with
an initial limit on the depth of the search)
2. Find files by function/class definition:
find_files_by_chunk_content(... ["def my_funk"])
3. Find files by function/class usage:
find_files_by_chunk_content(... ["my_funk"])
4. Determine which chunks in the found files are relevant:
find_matching_chunks_in_file(...)
5. Get details about the chunks:
chunk_details(...)
Use ~ (tilde) literally if the user specifies it in paths.
"""
path = root_path.expanduser().absolute()
if project_name in PROJECTS:
raise ValueError(f"Project {project_name} already exists")
project = ToolProject(
chunk_project=FileBreakdownProject(
root=path,
file_watch_refresh_freq_seconds=deps.settings().file_watch_refresh_freq_seconds,
max_chunk_size=deps.settings().max_chunk_size,
),
)
PROJECTS[project_name] = project
return MCPToolOutput(
text=(
inspect.cleandoc(f"""\
Project {path} configured with {len(project.chunk_project.files)} files.
Files are split into 'chunks' - logical sections like:
- Functions (e.g. 'def my_function')
- Classes (e.g. 'class MyClass')
- Markdown sections (e.g. '# Section')
- Import blocks
After configuring, a common workflow is:
1. list_all_files_in_project to get an overview of the project (with
an initial limit on the depth of the search)
2. Find files by function/class definition:
find_files_by_chunk_content(... ["def my_funk"])
3. Find files by function/class usage:
find_files_by_chunk_content(... ["my_funk"])
4. Determine which chunks in the found files are relevant:
find_matching_chunks_in_file(...)
5. Get details about the chunks:
chunk_details(...)
Do not immediately list files or otherwise use the project
unless explicitly told to do so.
""")
),
).render()
@mcp.tool()
@log_inputs_outputs()
def list_all_files_in_project(
project_name: str,
path_filter: FilterType = None,
limit_depth_from_root: Annotated[
int | None,
Field(
description=(
"Limit the depth of the search to this many directories from the root. "
"Typically,start with 1 to get an overview of the project."
"If None, search all directories from the root."
),
),
] = None,
) -> ToolResponse:
"""List all files in a project, returning a file tree.
This is useful for getting an overview of the project, or specific
subdirectories of the project.
A project may have many files, so you are suggested
to start with a depth limit to get an overview, and then continue increasing
the depth limit with a filter to look at specific subdirectories.
"""
project = _get_project_or_error(project_name)
data = create_file_tree(
project_root=project.root,
paths={x.abs_path for x in project.chunk_project.files},
limit_depth_from_root=limit_depth_from_root,
filter_=path_filter,
)
if data is None:
return MCPToolOutput(text="No paths").render()
elif isinstance(data, str):
return MCPToolOutput(text=data).render()
else:
assert_never(data)
@mcp.tool()
@log_inputs_outputs()
def find_files_by_chunk_content(
project_name: str,
chunk_contents_filter: FilterType,
) -> ToolResponse:
"""Step 1: Find files containing chunks with matching text.
Returns file tree only showing which files contain matches.
You must use find_matching_chunks_in_file on each relevant file
to see the actual matches.
Example workflow:
1. Find files:
files = find_files_by_chunk_content(project, ["MyClass"])
2. For each file, find actual matches:
matches = find_matching_chunks_in_file(file, ["MyClass"])
3. Get content:
content = chunk_details(file, match_id)
"""
return _filter_files_by_chunk(project_name, chunk_contents_filter, "name_or_content").render()
@mcp.tool()
@log_inputs_outputs()
def find_matching_chunks_in_file(
project_name: str,
rel_path: Annotated[pathlib.Path, Field(description="Relative to project root")],
filter_: FilterType,
) -> ToolResponse:
"""Step 2: Find the actual matching chunks in a specific file.
Required after find_files_by_chunk_content or list_all_files_in_project to see
matches, as those tools only show files, not their contents.
This can be used for things like:
- Finding all chunks in a file that make reference to a specific function
(e.g. find_matching_chunks_in_file(..., ["my_funk"])
- Finding a chunk where a specific function is defined
(e.g. find_matching_chunks_in_file(..., ["def my_funk"])
Some chunks are split into multiple parts, because they are too large. This
will look like 'chunkx_part1', 'chunkx_part2', ...
"""
proj_file = ProjectFile(project_name=project_name, rel_path=rel_path)
return _list_chunks_in_file(proj_file, filter_, "name_or_content").render()
@mcp.tool()
@log_inputs_outputs()
def chunk_details(
chunk_id: str,
) -> ToolResponse:
"""Get full content of a specific chunk.
Returns chunk content as string.
Common patterns:
1. Final step after find_matching_chunks_in_file finds relevant chunks
2. Examining implementations after finding definitions/uses
"""
# Yeah this is an awful brute force "search" - if it is even deserving of the
# name "search"! Ah well.
the_chunk: Chunk | None = None
for project in PROJECTS.values():
for file in project.chunk_project.files:
for chunk in file.chunks:
if chunk.id_(file.abs_path) == chunk_id:
the_chunk = chunk
break
if the_chunk is None:
return MCPToolOutput(
text="No matching chunks. Please use other tools to find available chunks.",
).render()
return MCPToolOutput(text=inspect.cleandoc(the_chunk.content)).render()
@mcp.tool()
@log_inputs_outputs()
def list_most_recently_checked_out_branches(
project_name: str,
n: Annotated[int, Field(ge=20, le=50)] = 20,
) -> ToolResponse:
"""List the n most recently checked out branches in the project"""
project = _get_project_or_error(project_name)
return MCPToolOutput(text="\n".join(get_recent_branches(project.git_path, n))).render()
@mcp.tool()
@log_inputs_outputs()
def diff_with_ref(
project_name: str,
ref: Annotated[str, Field(max_length=100)],
) -> ToolResponse:
"""Return a summary of the diff between HEAD and the given ref.
You probably want the ref to be the 'base' branch like develop or main, off which
PRs are made - and you can likely determine this by viewing the most recently
checked out branches.
"""
project = _get_project_or_error(project_name)
repo = Repo(project.git_path)
# head = repo.head.commit
# compare_from = repo.commit(ref)
# diffs = compare_from.diff(head, create_patch=True)
# print(repo.git.diff(f"{ref}s...HEAD", ignore_blank_lines=True, ignore_space_at_eol=True))
diff = repo.git.diff(
f"{ref}...HEAD",
ignore_blank_lines=True,
ignore_space_at_eol=True,
)
return MCPToolOutput(
text=diff,
max_chars=deps.settings().default_git_diff_response_max_chars,
).render()
def _get_project_or_error(project_name: str) -> ToolProject:
if project_name not in PROJECTS:
raise ValueError(
f"Project {project_name} not configured. Either double check the project name "
f"or run the tool to set up a new project. The server may have been restarted "
f"causing it to no longer be configured.",
)
return PROJECTS[project_name]
def _list_chunks_in_file(
proj_file: ProjectFile,
filter_: FilterType,
filter_on: Literal["name", "name_or_content"],
) -> MCPToolOutput:
target_file = proj_file.file
chunks = [x for x in target_file.chunks if x.matches_filter(filter_, filter_on)]
resp_data = [
f"id={x.id_(path=target_file.abs_path)} (category={x.category} chars={len(x.content)})"
for x in chunks
]
resp_text = "\n".join(resp_data)
chunk_info = f"({len(chunks)} of {len(target_file.chunks)} chunks)"
return MCPToolOutput(text=f"{chunk_info}\n{resp_text}")
def _filter_files_by_chunk(
project_name: str,
filter_: FilterType,
filter_on: Literal["name", "name_or_content"],
) -> MCPToolOutput:
project = _get_project_or_error(project_name)
matching_files: set[pathlib.Path] = set()
for file in project.chunk_project.files:
if any(c.matches_filter(filter_, filter_on) for c in file.chunks):
matching_files.add(file.abs_path)
data = create_file_tree(project_root=project.root, paths=matching_files)
if data is None:
return MCPToolOutput(text="No files found")
elif isinstance(data, str):
return MCPToolOutput(text=data)
else:
assert_never(data)
if __name__ == "__main__":
import time
t1 = time.monotonic()
configure_project(
root_path=pathlib.Path("~/git/mcpunk"),
project_name="mcpunk",
)
t2 = time.monotonic()
print(f"Configured project in {(t2 - t1) * 1000:.2f}ms")
_proj = PROJECTS["mcpunk"].chunk_project
print(len([f for f in _proj.files if f.ext == ".py"]), "files")
print(sum(len(f.contents.splitlines()) for f in _proj.files if f.ext == ".py"), "lines")
print(sum(len(f.contents) for f in _proj.files if f.ext == ".py"), "chars")
find_files_by_chunk_content(
project_name="mcpunk",
chunk_contents_filter=["desktop"],
)
_list_chunks_in_file(
proj_file=ProjectFile(
project_name="mcpunk",
rel_path=pathlib.Path("README.md"),
),
filter_=None,
filter_on="name",
)
chunk_details(chunk_id="xxx")
# f = [
# x
# for x in PROJECTS["mcpunk"].chunk_project.files
# if x.abs_path == pathlib.Path(PROJECTS["mcpunk"].root / "docs/infrastructure.md")
# ][0]
diff_with_ref(
project_name="mcpunk",
ref="main",
)