Gitingest-MCP
by puravparab
- Gitingest-MCP
- src
- gitingest_mcp
import re
import asyncio
from gitingest import ingest
from typing import Any, Dict, List, Optional
class GitIngester:
def __init__(self, url: str, branch: Optional[str] = None):
"""Initialize the GitIngester with a repository URL."""
self.url: str = url
self.branch: Optional[str] = branch
if branch:
self.url = f"{url}/tree/{branch}"
self.summary: Optional[Dict[str, Any]] = None
self.tree: Optional[Any] = None
self.content: Optional[Any] = None
async def fetch_repo_data(self) -> None:
"""Asynchronously fetch and process repository data."""
# Run the synchronous ingest function in a thread pool
loop = asyncio.get_event_loop()
summary, self.tree, self.content = await loop.run_in_executor(
None, lambda: ingest(self.url)
)
self.summary = self._parse_summary(summary)
def _parse_summary(self, summary_str: str) -> Dict[str, Any]:
"""Parse the summary string into a structured dictionary."""
summary_dict = {}
try:
# Extract repository name
repo_match = re.search(r"Repository: (.+)", summary_str)
if repo_match:
summary_dict["repository"] = repo_match.group(1).strip()
else:
summary_dict["repository"] = ""
# Extract files analyzed
files_match = re.search(r"Files analyzed: (\d+)", summary_str)
if files_match:
summary_dict["num_files"] = int(files_match.group(1))
else:
summary_dict["num_files"] = None
# Extract estimated tokens
tokens_match = re.search(r"Estimated tokens: (.+)", summary_str)
if tokens_match:
summary_dict["token_count"] = tokens_match.group(1).strip()
else:
summary_dict["token_count"] = ""
except Exception:
# If any regex operation fails, set default values
summary_dict["repository"] = ""
summary_dict["num_files"] = None
summary_dict["token_count"] = ""
# Store the original string as well
summary_dict["raw"] = summary_str
return summary_dict
def get_summary(self) -> str:
"""Returns the repository summary."""
return self.summary["raw"]
def get_tree(self) -> Any:
"""Returns the repository tree structure."""
return self.tree
def get_content(self, file_paths: Optional[List[str]] = None) -> str:
"""Returns the repository content."""
if file_paths is None:
return self.content
return self._get_files_content(file_paths)
def _get_files_content(self, file_paths: List[str]) -> str:
"""Helper function to extract specific files from repository content."""
result = {}
for path in file_paths:
result[path] = None
if not self.content:
return result
# Get the content as a string
content_str = str(self.content)
# Try multiple patterns to match file content sections
patterns = [
# Standard pattern with exactly 50 equals signs
r"={50}\nFile: ([^\n]+)\n={50}",
# More flexible pattern with varying number of equals signs
r"={10,}\nFile: ([^\n]+)\n={10,}",
# Extra flexible pattern
r"=+\s*File:\s*([^\n]+)\s*\n=+",
]
for pattern in patterns:
# Find all matches in the content
matches = re.finditer(pattern, content_str)
matched = False
for match in matches:
matched = True
# Get the position of the match
start_pos = match.end()
filename = match.group(1).strip()
# Find the next file header or end of string
next_match = re.search(pattern, content_str[start_pos:])
if next_match:
end_pos = start_pos + next_match.start()
file_content = content_str[start_pos:end_pos].strip()
else:
file_content = content_str[start_pos:].strip()
# Check if this file matches any of the requested paths
for path in file_paths:
basename = path.split("/")[-1]
if path == filename or basename == filename or path.endswith("/" + filename):
result[path] = file_content
# If we found matches with this pattern, no need to try others
if matched:
break
# Concatenate all found file contents with file headers
concatenated = ""
for path, content in result.items():
if content is not None:
if concatenated:
concatenated += "\n\n"
concatenated += f"==================================================\nFile: {path}\n==================================================\n{content}"
return concatenated