import os
import time
import base64
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
load_dotenv()
mcp = FastMCP("simple-code-review-assistant")
GITHUB_API = "https://api.github.com"
DOCS_DIR = Path(__file__).parent / "docs"
class GitHubClient:
def __init__(self) -> None:
self.token = os.getenv("GITHUB_TOKEN", "").strip()
self.owner = os.getenv("GITHUB_REPO_OWNER", "").strip()
self.repo = os.getenv("GITHUB_REPO_NAME", "").strip()
if not self.token or not self.owner or not self.repo:
raise ValueError(
"Missing env vars. Please set GITHUB_TOKEN, GITHUB_REPO_OWNER, GITHUB_REPO_NAME in .env"
)
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
"User-Agent": "mcp-code-review-assistant",
}
)
def _request(self, method: str, url: str, **kwargs) -> requests.Response:
resp = self.session.request(method, url, timeout=30, **kwargs)
# Basic rate-limit handling
if resp.status_code == 403 and "X-RateLimit-Remaining" in resp.headers:
remaining = int(resp.headers.get("X-RateLimit-Remaining", "0"))
reset_epoch = int(resp.headers.get("X-RateLimit-Reset", "0"))
if remaining == 0 and reset_epoch:
sleep_s = max(0, reset_epoch - int(time.time()) + 1)
time.sleep(min(sleep_s, 10)) # keep it simple; short wait then retry once
resp = self.session.request(method, url, timeout=30, **kwargs)
return resp
def get_repo(self) -> Dict[str, Any]:
url = f"{GITHUB_API}/repos/{self.owner}/{self.repo}"
resp = self._request("GET", url)
if resp.status_code != 200:
raise RuntimeError(f"GitHub error {resp.status_code}: {resp.text}")
data = resp.json()
return {
"full_name": data.get("full_name"),
"description": data.get("description"),
"default_branch": data.get("default_branch"),
"private": data.get("private"),
"stars": data.get("stargazers_count"),
"forks": data.get("forks_count"),
"open_issues": data.get("open_issues_count"),
"html_url": data.get("html_url"),
}
def get_file_content(self, path: str, ref: Optional[str] = None) -> Dict[str, Any]:
clean_path = path.lstrip("/")
url = f"{GITHUB_API}/repos/{self.owner}/{self.repo}/contents/{clean_path}"
params = {"ref": ref} if ref else None
resp = self._request("GET", url, params=params)
if resp.status_code != 200:
raise RuntimeError(f"GitHub error {resp.status_code}: {resp.text}")
data = resp.json()
# If it's a directory, return list of items
if isinstance(data, list):
return {
"path": clean_path,
"type": "directory",
"items": [{"name": x.get("name"), "path": x.get("path"), "type": x.get("type")} for x in data],
}
if data.get("type") != "file":
return {"path": clean_path, "type": data.get("type"), "raw": data}
content_b64 = data.get("content", "")
encoding = data.get("encoding", "base64")
if encoding == "base64" and content_b64:
decoded = base64.b64decode(content_b64).decode("utf-8", errors="replace")
else:
decoded = ""
return {
"path": data.get("path"),
"type": "file",
"size": data.get("size"),
"sha": data.get("sha"),
"download_url": data.get("download_url"),
"content": decoded,
}
def _search_docs(keyword: str, max_results: int = 5, max_chars_per_hit: int = 800) -> Dict[str, Any]:
if not DOCS_DIR.exists():
return {"keyword": keyword, "results": [], "error": f"docs folder not found at {str(DOCS_DIR)}"}
kw = keyword.strip().lower()
if not kw:
return {"keyword": keyword, "results": [], "error": "keyword is empty"}
hits: List[Dict[str, Any]] = []
for md in DOCS_DIR.rglob("*.md"):
try:
text = md.read_text(encoding="utf-8", errors="replace")
except Exception:
continue
low = text.lower()
if kw not in low and kw not in md.name.lower():
continue
idx = low.find(kw)
start = max(0, idx - 200) if idx >= 0 else 0
end = min(len(text), start + max_chars_per_hit)
snippet = text[start:end].replace("\n", " ").strip()
hits.append(
{
"file": str(md.relative_to(Path(__file__).parent)),
"match_index": idx,
"snippet": snippet,
}
)
if len(hits) >= max_results:
break
return {"keyword": keyword, "results": hits, "count": len(hits)}
@mcp.tool()
def get_repository() -> Dict[str, Any]:
"""
Get repository info from GitHub.
"""
gh = GitHubClient()
return gh.get_repo()
@mcp.tool()
def get_file_content(path: str, ref: Optional[str] = None) -> Dict[str, Any]:
"""
Read file (or directory listing) from GitHub repo using the GitHub Contents API.
- path: "README.md" or "src/main.py" or "/"
- ref: optional branch/tag/sha
"""
gh = GitHubClient()
return gh.get_file_content(path=path, ref=ref)
@mcp.tool()
def search_docs(keyword: str, max_results: int = 5) -> Dict[str, Any]:
"""
Search local markdown files under ./docs with simple keyword matching.
"""
return _search_docs(keyword=keyword, max_results=max_results)
if __name__ == "__main__":
# MCP stdio server
mcp.run()