Skip to main content
Glama
divar-ir

Sourcegraph MCP Server

by divar-ir
fetcher.py8.64 kB
"""Sourcegraph content fetcher implementation.""" import json from typing import Any, List, Optional from urllib.parse import urljoin import requests from .content_fetcher_protocol import MAX_FILE_SIZE, ContentFetcherProtocol class SourcegraphContentFetcher(ContentFetcherProtocol): """Fetches content from Sourcegraph repositories.""" def __init__(self, endpoint: str, token: str = ""): """Initialize Sourcegraph content fetcher. Args: endpoint: Sourcegraph API endpoint token: Authentication token (optional for public instances) Raises: ValueError: If endpoint is not provided """ if not endpoint: raise ValueError("Sourcegraph endpoint is required") self.endpoint = endpoint self.token = token self.src_url = urljoin(self.endpoint, ".api/graphql") def get_content(self, repository: str, path: str = "", depth: int = 2, ref: str = "HEAD") -> str: """Get content from Sourcegraph repository. Args: repository: Repository path (e.g., "github.com/org/project") path: File or directory path (e.g., "src/main.py") depth: Tree depth for directory listings ref: Git reference (branch, tag, or commit SHA) Returns: File content if path is a file, directory tree if path is a directory Raises: ValueError: If repository or path does not exist """ # Clean repository path for Sourcegraph repository = self._clean_repository_path(repository) if not path: try: tree = self._get_sourcegraph_tree(repository, ".", depth) return tree except ValueError: raise ValueError("invalid arguments the given path or repository does not exist") # Try file first, then directory try: file_content = self._get_sourcegraph_file_content(repository, path) if file_content: return file_content except ValueError: pass # File not found, try as directory try: tree = self._get_sourcegraph_tree(repository, path, depth) return tree except ValueError: # Only raise "not found" if both file and directory lookups fail raise ValueError("invalid arguments the given path or repository does not exist") def _get_sourcegraph_file_content(self, repo_name: str, path: str) -> Optional[str]: """Get file content from Sourcegraph.""" query = """ query GetFileContent($name: String!, $path: String!) { repository(name: $name) { commit(rev: "HEAD") { file(path: $path) { path name content totalLines binary contentType richHTML languages } } } } """ variables = {"name": repo_name, "path": path} headers = { "Content-Type": "application/json", } if self.token: headers["Authorization"] = f"token {self.token}" payload = {"query": query, "variables": variables} try: response = requests.post(self.src_url, json=payload, headers=headers) response.raise_for_status() data = response.json() if "errors" in data: raise ValueError("invalid arguments the given path or repository does not exist") content = self._safe_get(data, ["data", "repository", "commit", "file", "content"], default=None) if content and len(content) > MAX_FILE_SIZE: total_lines = self._safe_get( data, ["data", "repository", "commit", "file", "totalLines"], default="unknown" ) truncated_content = content[:MAX_FILE_SIZE] # Find last complete line last_newline = truncated_content.rfind("\n") if last_newline > 0: truncated_content = truncated_content[:last_newline] return ( f"{truncated_content}\n\n" f"[FILE TRUNCATED: File too large ({len(content):,} chars, {total_lines} lines). " f"Showing first {len(truncated_content):,} chars]" ) return content except (requests.exceptions.RequestException, json.JSONDecodeError): raise ValueError("invalid arguments the given path or repository does not exist") def _get_sourcegraph_tree(self, repo_name: str, path: str, depth: int) -> str: """Get directory tree from Sourcegraph.""" query = """ query GetRepositoryTree($name: String!, $path: String = ".") { repository(name: $name) { name commit(rev: "HEAD") { message tree(path: $path) { entries { name isDirectory ... on GitTree { entries { name isDirectory } } } } languages } } } """ variables = {"name": repo_name, "path": path} headers = { "Content-Type": "application/json", } if self.token: headers["Authorization"] = f"token {self.token}" payload = {"query": query, "variables": variables} try: response = requests.post(self.src_url, json=payload, headers=headers) response.raise_for_status() data = response.json() if "errors" in data: raise ValueError("invalid arguments the given path or repository does not exist") # Check if repository exists repository = self._safe_get(data, ["data", "repository"], default=None) if repository is None: raise ValueError("invalid arguments the given path or repository does not exist") tree_data = self._safe_get(data, ["data", "repository", "commit", "tree"], default=None) if tree_data is None: raise ValueError("invalid arguments the given path or repository does not exist") entries = self._safe_get(tree_data, ["entries"], default=[]) return self._format_sourcegraph_tree(entries, depth, 0) except (requests.exceptions.RequestException, json.JSONDecodeError): raise ValueError("invalid arguments the given path or repository does not exist") def _format_sourcegraph_tree(self, entries: list, max_depth: int, current_depth: int) -> str: """Format Sourcegraph tree entries into a string representation.""" if current_depth >= max_depth: return "" output_lines = [] for entry in sorted(entries, key=lambda x: x.get("name", "")): indent = " " * current_depth name = entry.get("name", "") is_dir = entry.get("isDirectory", False) if is_dir: name += "/" output_lines.append(f"{indent}{name}") # If it's a directory and has nested entries, process them recursively if is_dir and "entries" in entry and entry["entries"] and current_depth < max_depth - 1: nested_output = self._format_sourcegraph_tree(entry["entries"], max_depth, current_depth + 1) if nested_output: output_lines.append(nested_output) return "\n".join(output_lines) def _clean_repository_path(self, repository: str) -> str: """Clean repository path for Sourcegraph.""" # Remove protocol prefixes repository = repository.replace("https://", "").replace("http://", "") return repository def _safe_get(self, data: dict, keys: List[str], default: Any = None) -> Any: result = data for key in keys: if not isinstance(result, dict): return default if key in result: result = result[key] else: return default return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/divar-ir/sourcegraph-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server