confluence.pyโข22.9 kB
#!/usr/bin/env python3
"""
Confluence Document Adapter
Adapter for integrating with Atlassian Confluence spaces.
"""
import asyncio
import json
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin, urlparse
from datetime import datetime
import base64
import httpx
from bs4 import BeautifulSoup
from mcp.types import Resource
from .base import BaseDocumentAdapter, DocumentContent, SearchResult
from ..utils import get_logger
logger = get_logger(__name__)
class ConfluenceAdapter(BaseDocumentAdapter):
"""Adapter for Confluence spaces.
Supports both Confluence Cloud and Server instances.
Required configuration:
- base_url: Base URL of the Confluence instance
- username: Confluence username or email
- api_token: Confluence API token (Cloud) or password (Server)
- space_key: Confluence space key (optional, for specific space)
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.client: Optional[httpx.AsyncClient] = None
self.is_cloud: bool = False
self.space_info: Dict[str, Any] = {}
self.pages_cache: List[Dict[str, Any]] = []
async def initialize(self) -> None:
"""Initialize the Confluence adapter."""
logger.info(f"Initializing Confluence adapter for {self.config.get('base_url')}")
# Validate required configuration
self._validate_config(["base_url", "username", "api_token"])
# Determine if this is Confluence Cloud or Server
self.is_cloud = "atlassian.net" in self.config["base_url"]
# Setup HTTP client with authentication
auth_header = self._get_auth_header()
self.client = httpx.AsyncClient(
base_url=self.config["base_url"].rstrip("/"),
headers={
"Authorization": auth_header,
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "AnyDocs-MCP/0.1.0"
},
timeout=30.0,
follow_redirects=True
)
# Test connection
await self._test_connection()
# Get space information
await self._fetch_space_info()
# Build pages cache
await self._build_pages_cache()
self._initialized = True
logger.info(f"Confluence adapter initialized successfully for space: {self.space_info.get('name', 'All Spaces')}")
async def list_resources(self) -> List[Resource]:
"""List all pages in the Confluence space(s)."""
self._ensure_initialized()
resources = []
for page in self.pages_cache:
resource = Resource(
uri=self.get_resource_uri(page["id"]),
name=page["title"],
description=self._get_page_excerpt(page),
mimeType="text/markdown"
)
resources.append(resource)
logger.debug(f"Listed {len(resources)} resources from Confluence")
return resources
async def get_content(self, resource_path: str) -> DocumentContent:
"""Get content for a specific Confluence page."""
self._ensure_initialized()
# Find page in cache
page = self._find_page_by_id(resource_path)
if not page:
raise FileNotFoundError(f"Page not found: {resource_path}")
try:
# Fetch full page content
full_page = await self._fetch_page_content(resource_path)
# Convert Confluence storage format to markdown
content = self._storage_to_markdown(full_page.get("body", {}).get("storage", {}).get("value", ""))
return DocumentContent(
title=full_page["title"],
content=content,
url=self._get_page_url(full_page),
source=self.name,
content_type="text/markdown",
last_modified=full_page.get("version", {}).get("when"),
metadata={
"page_id": full_page["id"],
"space_key": full_page.get("space", {}).get("key"),
"version": full_page.get("version", {}).get("number"),
"created_by": full_page.get("version", {}).get("by", {}).get("displayName"),
"created_date": full_page.get("createdDate"),
"parent_id": full_page.get("ancestors", [{}])[-1].get("id") if full_page.get("ancestors") else None
}
)
except Exception as e:
logger.error(f"Error fetching content for page {resource_path}: {e}")
raise
async def search(self, query: str, limit: int = 10) -> List[SearchResult]:
"""Search Confluence pages using CQL (Confluence Query Language)."""
self._ensure_initialized()
if not query.strip():
raise ValueError("Search query cannot be empty")
try:
# Use Confluence search API with CQL
cql_query = f'text ~ "{query}"'
if "space_key" in self.config:
cql_query += f' AND space = "{self.config["space_key"]}"'
params = {
"cql": cql_query,
"limit": limit,
"expand": "content.space,content.version,content.body.view"
}
response = await self.client.get("/rest/api/content/search", params=params)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("results", []):
content = item.get("content", {})
if content.get("type") == "page":
# Get content snippet
body_view = content.get("body", {}).get("view", {}).get("value", "")
snippet = self._html_to_text(body_view)[:200]
if len(snippet) == 200:
snippet += "..."
results.append(SearchResult(
title=content["title"],
content=snippet,
url=self._get_page_url(content),
source=self.name,
score=1.0, # Confluence API doesn't provide relevance scores
metadata={
"page_id": content["id"],
"space_key": content.get("space", {}).get("key"),
"version": content.get("version", {}).get("number"),
"last_modified": content.get("version", {}).get("when")
}
))
return results
except Exception as e:
logger.error(f"Error searching Confluence: {e}")
# Fallback to local cache search
return await self._search_local_cache(query, limit)
async def get_structure(self) -> str:
"""Get the structure of the Confluence space."""
self._ensure_initialized()
structure_lines = [f"# {self.space_info.get('name', 'Confluence Space')}"]
structure_lines.append("")
if self.space_info.get("description"):
structure_lines.append(self.space_info["description"])
structure_lines.append("")
# Build hierarchical structure
root_pages = [p for p in self.pages_cache if not p.get("ancestors")]
def add_page_to_structure(page: Dict[str, Any], indent: int = 0):
prefix = " " * indent + "- "
url = self._get_page_url(page)
structure_lines.append(f"{prefix}[{page['title']}]({url})")
# Add child pages
child_pages = [p for p in self.pages_cache
if p.get("ancestors") and p["ancestors"][-1]["id"] == page["id"]]
for child in child_pages:
add_page_to_structure(child, indent + 1)
for page in root_pages:
add_page_to_structure(page)
return "\n".join(structure_lines)
def _get_auth_header(self) -> str:
"""Get the appropriate authentication header."""
username = self.config["username"]
api_token = self.config["api_token"]
# For Confluence Cloud, use email:api_token
# For Confluence Server, use username:password
credentials = f"{username}:{api_token}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded_credentials}"
async def _test_connection(self) -> None:
"""Test the Confluence API connection."""
try:
response = await self.client.get("/rest/api/user/current")
response.raise_for_status()
logger.debug("Confluence API connection test successful")
except Exception as e:
logger.error(f"Confluence API connection test failed: {e}")
raise ConnectionError(f"Failed to connect to Confluence API: {e}")
async def _fetch_space_info(self) -> None:
"""Fetch information about the Confluence space."""
try:
if "space_key" in self.config:
# Get specific space info
response = await self.client.get(f"/rest/api/space/{self.config['space_key']}")
response.raise_for_status()
self.space_info = response.json()
else:
# Get all accessible spaces
response = await self.client.get("/rest/api/space")
response.raise_for_status()
data = response.json()
spaces = data.get("results", [])
if spaces:
self.space_info = {
"name": f"All Spaces ({len(spaces)} spaces)",
"key": "all",
"description": f"Access to {len(spaces)} Confluence spaces"
}
else:
self.space_info = {"name": "No Accessible Spaces", "key": "none"}
except Exception as e:
logger.error(f"Error fetching space info: {e}")
self.space_info = {"name": "Confluence Space", "key": "unknown"}
async def _build_pages_cache(self) -> None:
"""Build cache of all pages in the space(s)."""
try:
params = {
"limit": 100,
"expand": "space,version,ancestors"
}
# Add space filter if specified
if "space_key" in self.config:
params["spaceKey"] = self.config["space_key"]
all_pages = []
start = 0
while True:
params["start"] = start
response = await self.client.get("/rest/api/content", params=params)
response.raise_for_status()
data = response.json()
pages = data.get("results", [])
if not pages:
break
# Filter for pages only (exclude blog posts, etc.)
page_results = [p for p in pages if p.get("type") == "page"]
all_pages.extend(page_results)
# Check if there are more pages
if len(pages) < params["limit"]:
break
start += params["limit"]
# Safety limit to prevent infinite loops
if len(all_pages) > 1000:
logger.warning("Reached page limit of 1000, stopping cache build")
break
self.pages_cache = all_pages
logger.debug(f"Cached {len(self.pages_cache)} pages")
except Exception as e:
logger.error(f"Error building pages cache: {e}")
self.pages_cache = []
async def _fetch_page_content(self, page_id: str) -> Dict[str, Any]:
"""Fetch full content for a specific page."""
try:
params = {
"expand": "body.storage,version,space,ancestors"
}
response = await self.client.get(f"/rest/api/content/{page_id}", params=params)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error fetching page content: {e}")
raise
def _storage_to_markdown(self, storage_content: str) -> str:
"""Convert Confluence storage format to markdown."""
if not storage_content:
return ""
try:
soup = BeautifulSoup(storage_content, 'html.parser')
# Convert common Confluence elements to markdown
markdown_lines = []
for element in soup.find_all(recursive=False):
if element.name == 'h1':
markdown_lines.append(f"# {element.get_text()}")
elif element.name == 'h2':
markdown_lines.append(f"## {element.get_text()}")
elif element.name == 'h3':
markdown_lines.append(f"### {element.get_text()}")
elif element.name == 'h4':
markdown_lines.append(f"#### {element.get_text()}")
elif element.name == 'h5':
markdown_lines.append(f"##### {element.get_text()}")
elif element.name == 'h6':
markdown_lines.append(f"###### {element.get_text()}")
elif element.name == 'p':
text = self._process_inline_elements(element)
if text.strip():
markdown_lines.append(text)
elif element.name == 'ul':
for li in element.find_all('li', recursive=False):
text = self._process_inline_elements(li)
markdown_lines.append(f"- {text}")
elif element.name == 'ol':
for i, li in enumerate(element.find_all('li', recursive=False), 1):
text = self._process_inline_elements(li)
markdown_lines.append(f"{i}. {text}")
elif element.name == 'blockquote':
text = self._process_inline_elements(element)
for line in text.split('\n'):
if line.strip():
markdown_lines.append(f"> {line}")
elif element.name in ['ac:structured-macro', 'structured-macro']:
# Handle Confluence macros
macro_name = element.get('ac:name') or element.get('name')
if macro_name == 'code':
# Code block macro
code_content = element.find('ac:plain-text-body') or element.find('plain-text-body')
if code_content:
language = element.get('ac:parameter') or ''
markdown_lines.append(f"```{language}")
markdown_lines.append(code_content.get_text())
markdown_lines.append("```")
elif macro_name == 'info':
# Info panel
info_content = element.find('ac:rich-text-body') or element.find('rich-text-body')
if info_content:
text = self._process_inline_elements(info_content)
markdown_lines.append(f"> โน๏ธ {text}")
elif macro_name == 'warning':
# Warning panel
warning_content = element.find('ac:rich-text-body') or element.find('rich-text-body')
if warning_content:
text = self._process_inline_elements(warning_content)
markdown_lines.append(f"> โ ๏ธ {text}")
else:
# Generic macro
macro_body = element.find('ac:rich-text-body') or element.find('rich-text-body')
if macro_body:
text = self._process_inline_elements(macro_body)
markdown_lines.append(f"**{macro_name.title()}:** {text}")
elif element.name == 'table':
# Convert table to markdown
table_md = self._table_to_markdown(element)
if table_md:
markdown_lines.append(table_md)
else:
# Fallback: get text content
text = element.get_text(strip=True)
if text:
markdown_lines.append(text)
markdown_lines.append("") # Add spacing between elements
return "\n".join(markdown_lines).strip()
except Exception as e:
logger.error(f"Error converting storage format to markdown: {e}")
# Fallback: return plain text
return BeautifulSoup(storage_content, 'html.parser').get_text()
def _process_inline_elements(self, element) -> str:
"""Process inline elements within a block element."""
result = []
for content in element.contents:
if hasattr(content, 'name'):
if content.name == 'strong':
result.append(f"**{content.get_text()}**")
elif content.name == 'em':
result.append(f"*{content.get_text()}*")
elif content.name == 'code':
result.append(f"`{content.get_text()}`")
elif content.name == 'a':
href = content.get('href', '')
text = content.get_text()
result.append(f"[{text}]({href})")
elif content.name == 'br':
result.append("\n")
else:
result.append(content.get_text())
else:
result.append(str(content))
return "".join(result)
def _table_to_markdown(self, table_element) -> str:
"""Convert HTML table to markdown table."""
try:
rows = table_element.find_all('tr')
if not rows:
return ""
markdown_rows = []
for i, row in enumerate(rows):
cells = row.find_all(['th', 'td'])
cell_texts = [cell.get_text(strip=True) for cell in cells]
markdown_rows.append("| " + " | ".join(cell_texts) + " |")
# Add header separator after first row
if i == 0:
separator = "| " + " | ".join(["---"] * len(cell_texts)) + " |"
markdown_rows.append(separator)
return "\n".join(markdown_rows)
except Exception as e:
logger.error(f"Error converting table to markdown: {e}")
return ""
def _html_to_text(self, html_content: str) -> str:
"""Convert HTML to plain text."""
if not html_content:
return ""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
return soup.get_text(separator=" ", strip=True)
except Exception:
return html_content
def _get_page_url(self, page: Dict[str, Any]) -> str:
"""Get the web URL for a page."""
base_url = self.config["base_url"].rstrip("/")
page_id = page["id"]
if self.is_cloud:
# Confluence Cloud URL format
space_key = page.get("space", {}).get("key", "")
return f"{base_url}/wiki/spaces/{space_key}/pages/{page_id}"
else:
# Confluence Server URL format
return f"{base_url}/pages/viewpage.action?pageId={page_id}"
def _get_page_excerpt(self, page: Dict[str, Any]) -> str:
"""Get a brief excerpt for a page."""
# Try to get excerpt from page metadata
excerpt = page.get("excerpt", "")
if excerpt:
return self._html_to_text(excerpt)[:100] + "..."
# Fallback to basic info
space_name = page.get("space", {}).get("name", "Unknown Space")
version = page.get("version", {}).get("number", "1")
return f"Page in {space_name} (v{version})"
def _find_page_by_id(self, page_id: str) -> Optional[Dict[str, Any]]:
"""Find a page in the cache by ID."""
for page in self.pages_cache:
if page["id"] == page_id:
return page
return None
async def _search_local_cache(self, query: str, limit: int) -> List[SearchResult]:
"""Fallback search using local cache."""
results = []
query_lower = query.lower()
for page in self.pages_cache:
score = 0.0
# Check title match
if query_lower in page["title"].lower():
score += 10.0
# Check excerpt match
excerpt = self._get_page_excerpt(page)
if query_lower in excerpt.lower():
score += 5.0
if score > 0:
results.append(SearchResult(
title=page["title"],
content=excerpt,
url=self._get_page_url(page),
source=self.name,
score=score,
metadata={
"page_id": page["id"],
"space_key": page.get("space", {}).get("key"),
"version": page.get("version", {}).get("number"),
"last_modified": page.get("version", {}).get("when")
}
))
# Sort by score and limit results
results.sort(key=lambda x: x.score, reverse=True)
return results[:limit]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
await self.client.aclose()