Skip to main content
Glama

AnyDocs MCP Server

by funky1688
confluence.pyโ€ข22.9 kB
#!/usr/bin/env python3 """ Confluence Document Adapter Adapter for integrating with Atlassian Confluence spaces. """ import asyncio import json from typing import Any, Dict, List, Optional from urllib.parse import urljoin, urlparse from datetime import datetime import base64 import httpx from bs4 import BeautifulSoup from mcp.types import Resource from .base import BaseDocumentAdapter, DocumentContent, SearchResult from ..utils import get_logger logger = get_logger(__name__) class ConfluenceAdapter(BaseDocumentAdapter): """Adapter for Confluence spaces. Supports both Confluence Cloud and Server instances. Required configuration: - base_url: Base URL of the Confluence instance - username: Confluence username or email - api_token: Confluence API token (Cloud) or password (Server) - space_key: Confluence space key (optional, for specific space) """ def __init__(self, config: Dict[str, Any]): super().__init__(config) self.client: Optional[httpx.AsyncClient] = None self.is_cloud: bool = False self.space_info: Dict[str, Any] = {} self.pages_cache: List[Dict[str, Any]] = [] async def initialize(self) -> None: """Initialize the Confluence adapter.""" logger.info(f"Initializing Confluence adapter for {self.config.get('base_url')}") # Validate required configuration self._validate_config(["base_url", "username", "api_token"]) # Determine if this is Confluence Cloud or Server self.is_cloud = "atlassian.net" in self.config["base_url"] # Setup HTTP client with authentication auth_header = self._get_auth_header() self.client = httpx.AsyncClient( base_url=self.config["base_url"].rstrip("/"), headers={ "Authorization": auth_header, "Accept": "application/json", "Content-Type": "application/json", "User-Agent": "AnyDocs-MCP/0.1.0" }, timeout=30.0, follow_redirects=True ) # Test connection await self._test_connection() # Get space information await self._fetch_space_info() # Build pages cache await self._build_pages_cache() self._initialized = True logger.info(f"Confluence adapter initialized successfully for space: {self.space_info.get('name', 'All Spaces')}") async def list_resources(self) -> List[Resource]: """List all pages in the Confluence space(s).""" self._ensure_initialized() resources = [] for page in self.pages_cache: resource = Resource( uri=self.get_resource_uri(page["id"]), name=page["title"], description=self._get_page_excerpt(page), mimeType="text/markdown" ) resources.append(resource) logger.debug(f"Listed {len(resources)} resources from Confluence") return resources async def get_content(self, resource_path: str) -> DocumentContent: """Get content for a specific Confluence page.""" self._ensure_initialized() # Find page in cache page = self._find_page_by_id(resource_path) if not page: raise FileNotFoundError(f"Page not found: {resource_path}") try: # Fetch full page content full_page = await self._fetch_page_content(resource_path) # Convert Confluence storage format to markdown content = self._storage_to_markdown(full_page.get("body", {}).get("storage", {}).get("value", "")) return DocumentContent( title=full_page["title"], content=content, url=self._get_page_url(full_page), source=self.name, content_type="text/markdown", last_modified=full_page.get("version", {}).get("when"), metadata={ "page_id": full_page["id"], "space_key": full_page.get("space", {}).get("key"), "version": full_page.get("version", {}).get("number"), "created_by": full_page.get("version", {}).get("by", {}).get("displayName"), "created_date": full_page.get("createdDate"), "parent_id": full_page.get("ancestors", [{}])[-1].get("id") if full_page.get("ancestors") else None } ) except Exception as e: logger.error(f"Error fetching content for page {resource_path}: {e}") raise async def search(self, query: str, limit: int = 10) -> List[SearchResult]: """Search Confluence pages using CQL (Confluence Query Language).""" self._ensure_initialized() if not query.strip(): raise ValueError("Search query cannot be empty") try: # Use Confluence search API with CQL cql_query = f'text ~ "{query}"' if "space_key" in self.config: cql_query += f' AND space = "{self.config["space_key"]}"' params = { "cql": cql_query, "limit": limit, "expand": "content.space,content.version,content.body.view" } response = await self.client.get("/rest/api/content/search", params=params) response.raise_for_status() data = response.json() results = [] for item in data.get("results", []): content = item.get("content", {}) if content.get("type") == "page": # Get content snippet body_view = content.get("body", {}).get("view", {}).get("value", "") snippet = self._html_to_text(body_view)[:200] if len(snippet) == 200: snippet += "..." results.append(SearchResult( title=content["title"], content=snippet, url=self._get_page_url(content), source=self.name, score=1.0, # Confluence API doesn't provide relevance scores metadata={ "page_id": content["id"], "space_key": content.get("space", {}).get("key"), "version": content.get("version", {}).get("number"), "last_modified": content.get("version", {}).get("when") } )) return results except Exception as e: logger.error(f"Error searching Confluence: {e}") # Fallback to local cache search return await self._search_local_cache(query, limit) async def get_structure(self) -> str: """Get the structure of the Confluence space.""" self._ensure_initialized() structure_lines = [f"# {self.space_info.get('name', 'Confluence Space')}"] structure_lines.append("") if self.space_info.get("description"): structure_lines.append(self.space_info["description"]) structure_lines.append("") # Build hierarchical structure root_pages = [p for p in self.pages_cache if not p.get("ancestors")] def add_page_to_structure(page: Dict[str, Any], indent: int = 0): prefix = " " * indent + "- " url = self._get_page_url(page) structure_lines.append(f"{prefix}[{page['title']}]({url})") # Add child pages child_pages = [p for p in self.pages_cache if p.get("ancestors") and p["ancestors"][-1]["id"] == page["id"]] for child in child_pages: add_page_to_structure(child, indent + 1) for page in root_pages: add_page_to_structure(page) return "\n".join(structure_lines) def _get_auth_header(self) -> str: """Get the appropriate authentication header.""" username = self.config["username"] api_token = self.config["api_token"] # For Confluence Cloud, use email:api_token # For Confluence Server, use username:password credentials = f"{username}:{api_token}" encoded_credentials = base64.b64encode(credentials.encode()).decode() return f"Basic {encoded_credentials}" async def _test_connection(self) -> None: """Test the Confluence API connection.""" try: response = await self.client.get("/rest/api/user/current") response.raise_for_status() logger.debug("Confluence API connection test successful") except Exception as e: logger.error(f"Confluence API connection test failed: {e}") raise ConnectionError(f"Failed to connect to Confluence API: {e}") async def _fetch_space_info(self) -> None: """Fetch information about the Confluence space.""" try: if "space_key" in self.config: # Get specific space info response = await self.client.get(f"/rest/api/space/{self.config['space_key']}") response.raise_for_status() self.space_info = response.json() else: # Get all accessible spaces response = await self.client.get("/rest/api/space") response.raise_for_status() data = response.json() spaces = data.get("results", []) if spaces: self.space_info = { "name": f"All Spaces ({len(spaces)} spaces)", "key": "all", "description": f"Access to {len(spaces)} Confluence spaces" } else: self.space_info = {"name": "No Accessible Spaces", "key": "none"} except Exception as e: logger.error(f"Error fetching space info: {e}") self.space_info = {"name": "Confluence Space", "key": "unknown"} async def _build_pages_cache(self) -> None: """Build cache of all pages in the space(s).""" try: params = { "limit": 100, "expand": "space,version,ancestors" } # Add space filter if specified if "space_key" in self.config: params["spaceKey"] = self.config["space_key"] all_pages = [] start = 0 while True: params["start"] = start response = await self.client.get("/rest/api/content", params=params) response.raise_for_status() data = response.json() pages = data.get("results", []) if not pages: break # Filter for pages only (exclude blog posts, etc.) page_results = [p for p in pages if p.get("type") == "page"] all_pages.extend(page_results) # Check if there are more pages if len(pages) < params["limit"]: break start += params["limit"] # Safety limit to prevent infinite loops if len(all_pages) > 1000: logger.warning("Reached page limit of 1000, stopping cache build") break self.pages_cache = all_pages logger.debug(f"Cached {len(self.pages_cache)} pages") except Exception as e: logger.error(f"Error building pages cache: {e}") self.pages_cache = [] async def _fetch_page_content(self, page_id: str) -> Dict[str, Any]: """Fetch full content for a specific page.""" try: params = { "expand": "body.storage,version,space,ancestors" } response = await self.client.get(f"/rest/api/content/{page_id}", params=params) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error fetching page content: {e}") raise def _storage_to_markdown(self, storage_content: str) -> str: """Convert Confluence storage format to markdown.""" if not storage_content: return "" try: soup = BeautifulSoup(storage_content, 'html.parser') # Convert common Confluence elements to markdown markdown_lines = [] for element in soup.find_all(recursive=False): if element.name == 'h1': markdown_lines.append(f"# {element.get_text()}") elif element.name == 'h2': markdown_lines.append(f"## {element.get_text()}") elif element.name == 'h3': markdown_lines.append(f"### {element.get_text()}") elif element.name == 'h4': markdown_lines.append(f"#### {element.get_text()}") elif element.name == 'h5': markdown_lines.append(f"##### {element.get_text()}") elif element.name == 'h6': markdown_lines.append(f"###### {element.get_text()}") elif element.name == 'p': text = self._process_inline_elements(element) if text.strip(): markdown_lines.append(text) elif element.name == 'ul': for li in element.find_all('li', recursive=False): text = self._process_inline_elements(li) markdown_lines.append(f"- {text}") elif element.name == 'ol': for i, li in enumerate(element.find_all('li', recursive=False), 1): text = self._process_inline_elements(li) markdown_lines.append(f"{i}. {text}") elif element.name == 'blockquote': text = self._process_inline_elements(element) for line in text.split('\n'): if line.strip(): markdown_lines.append(f"> {line}") elif element.name in ['ac:structured-macro', 'structured-macro']: # Handle Confluence macros macro_name = element.get('ac:name') or element.get('name') if macro_name == 'code': # Code block macro code_content = element.find('ac:plain-text-body') or element.find('plain-text-body') if code_content: language = element.get('ac:parameter') or '' markdown_lines.append(f"```{language}") markdown_lines.append(code_content.get_text()) markdown_lines.append("```") elif macro_name == 'info': # Info panel info_content = element.find('ac:rich-text-body') or element.find('rich-text-body') if info_content: text = self._process_inline_elements(info_content) markdown_lines.append(f"> โ„น๏ธ {text}") elif macro_name == 'warning': # Warning panel warning_content = element.find('ac:rich-text-body') or element.find('rich-text-body') if warning_content: text = self._process_inline_elements(warning_content) markdown_lines.append(f"> โš ๏ธ {text}") else: # Generic macro macro_body = element.find('ac:rich-text-body') or element.find('rich-text-body') if macro_body: text = self._process_inline_elements(macro_body) markdown_lines.append(f"**{macro_name.title()}:** {text}") elif element.name == 'table': # Convert table to markdown table_md = self._table_to_markdown(element) if table_md: markdown_lines.append(table_md) else: # Fallback: get text content text = element.get_text(strip=True) if text: markdown_lines.append(text) markdown_lines.append("") # Add spacing between elements return "\n".join(markdown_lines).strip() except Exception as e: logger.error(f"Error converting storage format to markdown: {e}") # Fallback: return plain text return BeautifulSoup(storage_content, 'html.parser').get_text() def _process_inline_elements(self, element) -> str: """Process inline elements within a block element.""" result = [] for content in element.contents: if hasattr(content, 'name'): if content.name == 'strong': result.append(f"**{content.get_text()}**") elif content.name == 'em': result.append(f"*{content.get_text()}*") elif content.name == 'code': result.append(f"`{content.get_text()}`") elif content.name == 'a': href = content.get('href', '') text = content.get_text() result.append(f"[{text}]({href})") elif content.name == 'br': result.append("\n") else: result.append(content.get_text()) else: result.append(str(content)) return "".join(result) def _table_to_markdown(self, table_element) -> str: """Convert HTML table to markdown table.""" try: rows = table_element.find_all('tr') if not rows: return "" markdown_rows = [] for i, row in enumerate(rows): cells = row.find_all(['th', 'td']) cell_texts = [cell.get_text(strip=True) for cell in cells] markdown_rows.append("| " + " | ".join(cell_texts) + " |") # Add header separator after first row if i == 0: separator = "| " + " | ".join(["---"] * len(cell_texts)) + " |" markdown_rows.append(separator) return "\n".join(markdown_rows) except Exception as e: logger.error(f"Error converting table to markdown: {e}") return "" def _html_to_text(self, html_content: str) -> str: """Convert HTML to plain text.""" if not html_content: return "" try: soup = BeautifulSoup(html_content, 'html.parser') # Remove script and style elements for script in soup(["script", "style"]): script.decompose() return soup.get_text(separator=" ", strip=True) except Exception: return html_content def _get_page_url(self, page: Dict[str, Any]) -> str: """Get the web URL for a page.""" base_url = self.config["base_url"].rstrip("/") page_id = page["id"] if self.is_cloud: # Confluence Cloud URL format space_key = page.get("space", {}).get("key", "") return f"{base_url}/wiki/spaces/{space_key}/pages/{page_id}" else: # Confluence Server URL format return f"{base_url}/pages/viewpage.action?pageId={page_id}" def _get_page_excerpt(self, page: Dict[str, Any]) -> str: """Get a brief excerpt for a page.""" # Try to get excerpt from page metadata excerpt = page.get("excerpt", "") if excerpt: return self._html_to_text(excerpt)[:100] + "..." # Fallback to basic info space_name = page.get("space", {}).get("name", "Unknown Space") version = page.get("version", {}).get("number", "1") return f"Page in {space_name} (v{version})" def _find_page_by_id(self, page_id: str) -> Optional[Dict[str, Any]]: """Find a page in the cache by ID.""" for page in self.pages_cache: if page["id"] == page_id: return page return None async def _search_local_cache(self, query: str, limit: int) -> List[SearchResult]: """Fallback search using local cache.""" results = [] query_lower = query.lower() for page in self.pages_cache: score = 0.0 # Check title match if query_lower in page["title"].lower(): score += 10.0 # Check excerpt match excerpt = self._get_page_excerpt(page) if query_lower in excerpt.lower(): score += 5.0 if score > 0: results.append(SearchResult( title=page["title"], content=excerpt, url=self._get_page_url(page), source=self.name, score=score, metadata={ "page_id": page["id"], "space_key": page.get("space", {}).get("key"), "version": page.get("version", {}).get("number"), "last_modified": page.get("version", {}).get("when") } )) # Sort by score and limit results results.sort(key=lambda x: x.score, reverse=True) return results[:limit] async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.client: await self.client.aclose()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server