server.py•44 kB
#!/usr/bin/env python3
"""
AnyDocs MCP Server
Main MCP server implementation that provides document search and retrieval capabilities.
"""
import asyncio
import json
from typing import Any, Dict, List, Optional, Sequence
from pathlib import Path
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
from .config import AppConfig, ConfigManager, init_config_manager, get_config_manager
from .database import DatabaseManager, initialize_database_manager
from .adapters import get_adapter, ADAPTER_REGISTRY
from .content.engine import ContentEngine
from .utils import get_logger
logger = get_logger(__name__)
class AnyDocsMCPServer:
"""AnyDocs MCP Server implementation."""
def __init__(self, config: Optional[AppConfig] = None):
# Initialize config if not provided
if config is None:
config_manager = init_config_manager()
config = config_manager.get_config()
self.config = config
self.server = Server("anydocs-mcp")
self.db_manager = None
self.content_engine = None
self.adapters: Dict[str, Any] = {}
self._initialized = False
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register MCP server handlers."""
@self.server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""List available document resources."""
try:
if not self.db_manager:
return []
# Get all active document sources
sources = self.db_manager.get_document_sources(active_only=True)
resources = []
for source in sources:
# Get documents for this source
documents = self.db_manager.get_documents_by_source(source.id)
for doc in documents:
resource = Resource(
uri=f"anydocs://{source.name}/{doc.path or doc.slug or doc.id}",
name=doc.title,
description=doc.description or f"Document from {source.name}",
mimeType="text/markdown"
)
resources.append(resource)
return resources
except Exception as e:
logger.error(f"Error listing resources: {e}")
return []
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read a specific document resource."""
try:
if not uri.startswith("anydocs://"):
raise ValueError(f"Invalid URI scheme: {uri}")
# Parse URI: anydocs://source_name/document_path
parts = uri[10:].split("/", 1) # Remove 'anydocs://' prefix (10 chars)
if len(parts) != 2:
raise ValueError(f"Invalid URI format: {uri}")
source_name, doc_path = parts
# Find document
document = self.db_manager.get_document_by_path(source_name, doc_path)
if not document:
raise ValueError(f"Document not found: {uri}")
# Return processed content or raw content
content = document.processed_content or document.content or ""
return content
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
raise
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="search_documents",
description="Search for documents across all configured sources",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"source": {
"type": "string",
"description": "Optional: specific source to search in"
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 10)",
"default": 10
}
},
"required": ["query"]
}
),
Tool(
name="get_document",
description="Get full content of a specific document",
inputSchema={
"type": "object",
"properties": {
"document_id": {
"type": "string",
"description": "Document ID or URI"
}
},
"required": ["document_id"]
}
),
Tool(
name="list_sources",
description="List all configured document sources",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="summarize_content",
description="Generate a summary of document content",
inputSchema={
"type": "object",
"properties": {
"document_id": {
"type": "string",
"description": "Document ID or URI to summarize"
},
"max_length": {
"type": "integer",
"description": "Maximum summary length in words (default: 150)",
"default": 150
}
},
"required": ["document_id"]
}
),
Tool(
name="ask_question",
description="Ask a question about the documentation",
inputSchema={
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "Question to ask about the documentation"
},
"source": {
"type": "string",
"description": "Optional: specific source to search for answers"
},
"context_limit": {
"type": "integer",
"description": "Maximum number of documents to use as context (default: 5)",
"default": 5
}
},
"required": ["question"]
}
),
# Advanced AI-powered tools (Phase 5)
Tool(
name="generate_documentation",
description="AI-assisted documentation generation from existing content",
inputSchema={
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Topic or title for the documentation to generate"
},
"source_documents": {
"type": "array",
"items": {"type": "string"},
"description": "Optional: List of document IDs to use as reference material"
},
"doc_type": {
"type": "string",
"enum": ["guide", "tutorial", "reference", "api", "faq"],
"description": "Type of documentation to generate (default: guide)",
"default": "guide"
},
"max_length": {
"type": "integer",
"description": "Maximum length in words (default: 500)",
"default": 500
}
},
"required": ["topic"]
}
),
Tool(
name="translate_content",
description="Translate documentation content to different languages",
inputSchema={
"type": "object",
"properties": {
"document_id": {
"type": "string",
"description": "Document ID or URI to translate"
},
"target_language": {
"type": "string",
"description": "Target language (e.g., 'es', 'fr', 'de', 'zh', 'ja')"
},
"preserve_formatting": {
"type": "boolean",
"description": "Whether to preserve markdown formatting (default: true)",
"default": true
}
},
"required": ["document_id", "target_language"]
}
),
Tool(
name="extract_insights",
description="Extract analytics and insights from documentation",
inputSchema={
"type": "object",
"properties": {
"source": {
"type": "string",
"description": "Optional: specific source to analyze"
},
"insight_type": {
"type": "string",
"enum": ["overview", "gaps", "topics", "complexity", "structure"],
"description": "Type of insights to extract (default: overview)",
"default": "overview"
},
"include_metrics": {
"type": "boolean",
"description": "Include quantitative metrics (default: true)",
"default": true
}
}
}
),
Tool(
name="suggest_improvements",
description="AI-powered suggestions for documentation improvements",
inputSchema={
"type": "object",
"properties": {
"document_id": {
"type": "string",
"description": "Document ID or URI to analyze for improvements"
},
"focus_area": {
"type": "string",
"enum": ["clarity", "completeness", "structure", "examples", "accessibility"],
"description": "Area to focus improvement suggestions on"
},
"audience": {
"type": "string",
"enum": ["beginner", "intermediate", "advanced", "all"],
"description": "Target audience for improvements (default: all)",
"default": "all"
}
},
"required": ["document_id"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls."""
try:
if name == "search_documents":
return await self._search_documents(**arguments)
elif name == "get_document":
return await self._get_document(**arguments)
elif name == "list_sources":
return await self._list_sources(**arguments)
elif name == "summarize_content":
return await self._summarize_content(**arguments)
elif name == "ask_question":
return await self._ask_question(**arguments)
elif name == "generate_documentation":
return await self._generate_documentation(**arguments)
elif name == "translate_content":
return await self._translate_content(**arguments)
elif name == "extract_insights":
return await self._extract_insights(**arguments)
elif name == "suggest_improvements":
return await self._suggest_improvements(**arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error calling tool {name}: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _search_documents(self, query: str, source: Optional[str] = None, limit: int = 10) -> List[TextContent]:
"""Search for documents."""
try:
results = self.db_manager.search_documents(
query=query,
source_name=source,
limit=limit
)
if not results:
return [TextContent(type="text", text="No documents found matching your query.")]
# Format results
response_lines = [f"Found {len(results)} document(s):\n"]
for i, doc in enumerate(results, 1):
source_info = f" (from {doc.source.name})" if doc.source else ""
doc_path = doc.path or doc.slug or doc.id
source_name = doc.source.name if doc.source else 'unknown'
response_lines.append(
f"{i}. **{doc.title}**{source_info}\n"
f" Path: {doc.path or doc.slug or 'N/A'}\n"
f" Description: {doc.description or 'No description'}\n"
f" URI: anydocs://{source_name}/{doc_path}\n"
)
return [TextContent(type="text", text="\n".join(response_lines))]
except Exception as e:
logger.error(f"Error searching documents: {e}")
return [TextContent(type="text", text=f"Search error: {str(e)}")]
async def _get_document(self, document_id: str) -> List[TextContent]:
"""Get full document content."""
try:
if not self.db_manager:
return [TextContent(type="text", text="Database not initialized")]
# Handle URI format
if document_id.startswith("anydocs://"):
# Parse URI: anydocs://source_name/document_path
parts = document_id[10:].split("/", 1) # anydocs:// is 10 characters
if len(parts) != 2:
return [TextContent(type="text", text=f"Invalid URI format: {document_id}")]
source_name, doc_path = parts
document = self.db_manager.get_document_by_path(source_name, doc_path)
elif ":" in document_id and not document_id.startswith("anydocs://"):
# Invalid URI scheme
return [TextContent(type="text", text=f"Invalid URI format: {document_id}")]
else:
# Handle direct document ID
document = self.db_manager.get_document_by_id(document_id)
if not document:
return [TextContent(type="text", text=f"Document not found: {document_id}")]
# Format response with document metadata
content = document.processed_content or document.content or "No content available"
response = f"**{document.title}**\n\n{content}"
if document.source:
response += f"\n\n*Source: {document.source.name}*"
if document.path:
response += f"\n*Path: {document.path}*"
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error getting document {document_id}: {e}")
return [TextContent(type="text", text=f"Error retrieving document: {str(e)}")]
async def _list_sources(self) -> List[TextContent]:
"""List all document sources."""
try:
if not self.db_manager:
return [TextContent(type="text", text="Database not initialized")]
sources = self.db_manager.get_document_sources(active_only=False)
if not sources:
return [TextContent(type="text", text="No document sources configured")]
response_lines = [f"**Configured Document Sources ({len(sources)}):**\n"]
for i, source in enumerate(sources, 1):
status = "✓ Active" if source.enabled else "✗ Inactive"
doc_count = self.db_manager.get_document_count_by_source(source.id) if hasattr(self.db_manager, 'get_document_count_by_source') else "Unknown"
response_lines.append(
f"{i}. **{source.name}** ({source.adapter_type})\n"
f" Status: {status}\n"
f" Base URL: {getattr(source, 'base_url', 'N/A')}\n"
f" Documents: {doc_count}\n"
f" Last sync: {getattr(source, 'last_sync_at', 'Never')}\n"
)
return [TextContent(type="text", text="\n".join(response_lines))]
except Exception as e:
logger.error(f"Error listing sources: {e}")
return [TextContent(type="text", text=f"Error listing sources: {str(e)}")]
async def _summarize_content(self, document_id: str, max_length: int = 150) -> List[TextContent]:
"""Generate a summary of document content."""
try:
# Get the document content first
content_result = await self._get_document(document_id)
if not content_result or not content_result[0].text:
return [TextContent(type="text", text="Unable to retrieve document for summarization")]
content = content_result[0].text
# Check if the result was an error message
if content.startswith("Document not found:") or content.startswith("Error retrieving document:") or content.startswith("Invalid URI format:"):
return [TextContent(type="text", text="Unable to retrieve document for summarization")]
# Simple extractive summarization (first few sentences)
# In a production system, you'd use a proper summarization model
sentences = content.split('. ')
summary_sentences = []
word_count = 0
for sentence in sentences[:10]: # Limit to first 10 sentences
sentence_words = len(sentence.split())
if word_count + sentence_words > max_length:
break
summary_sentences.append(sentence.strip())
word_count += sentence_words
if not summary_sentences:
# Fallback: take first paragraph
paragraphs = content.split('\n\n')
if paragraphs:
first_para = paragraphs[0][:500] # First 500 chars
summary = first_para + "..." if len(paragraphs[0]) > 500 else first_para
else:
summary = "Unable to generate summary"
else:
summary = '. '.join(summary_sentences)
if not summary.endswith('.'):
summary += '.'
response = f"**Summary for {document_id}:**\n\n{summary}\n\n*Summary length: {len(summary.split())} words*"
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error summarizing content for {document_id}: {e}")
return [TextContent(type="text", text=f"Error generating summary: {str(e)}")]
async def _ask_question(self, question: str, source: Optional[str] = None, context_limit: int = 5) -> List[TextContent]:
"""Ask a question about the documentation."""
try:
if not question.strip():
return [TextContent(type="text", text="Question cannot be empty")]
if not self.db_manager:
return [TextContent(type="text", text="Database not initialized")]
# Search for relevant documents
search_results = self.db_manager.search_documents(
query=question,
source_name=source,
limit=context_limit
)
if not search_results:
return [TextContent(
type="text",
text=f"I couldn't find any relevant documentation to answer: '{question}'"
)]
# Compile context from search results
context_pieces = []
for doc in search_results:
content = doc.processed_content or doc.content or ""
if content:
# Take first 300 characters as context
snippet = content[:300] + "..." if len(content) > 300 else content
context_pieces.append(f"**{doc.title}**: {snippet}")
context_text = "\n\n".join(context_pieces)
# Generate response based on context
# In a production system, you'd use an LLM here
response = (
f"**Question:** {question}\n\n"
f"**Based on the available documentation:**\n\n"
f"{context_text}\n\n"
f"*Found {len(search_results)} relevant document(s). "
f"For more detailed information, please refer to the specific documents listed above.*"
)
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error answering question '{question}': {e}")
return [TextContent(type="text", text=f"Error processing question: {str(e)}")]
async def _generate_documentation(self, topic: str, source_documents: Optional[List[str]] = None, doc_type: str = "guide", max_length: int = 500) -> List[TextContent]:
"""Generate AI-assisted documentation."""
try:
logger.info(f"Generating {doc_type} documentation for topic: {topic}")
# Gather context from source documents if provided
context_content = []
if source_documents:
for doc_id in source_documents[:5]: # Limit to 5 documents
try:
doc_result = await self._get_document(doc_id)
if doc_result and doc_result[0].text:
content = doc_result[0].text[:1000] # First 1000 chars
context_content.append(content)
except Exception as e:
logger.warning(f"Failed to get source document {doc_id}: {e}")
# Generate template based on document type
if doc_type == "guide":
template = f"""# {topic}
## Overview
This guide provides a comprehensive overview of {topic}, designed to help you understand and implement the concepts effectively.
## Prerequisites
- Basic understanding of the related concepts
- Access to necessary tools and resources
## Step-by-Step Instructions
### Step 1: Getting Started
Begin by familiarizing yourself with the fundamental concepts of {topic}.
### Step 2: Implementation
Implement the core functionality following these guidelines:
1. Set up your environment
2. Configure necessary parameters
3. Execute the implementation
### Step 3: Verification
Verify your implementation by testing and validating results.
## Best Practices
- Follow established conventions
- Document your implementation
- Test thoroughly before deployment
## Troubleshooting
**Common Issue**: Description of common problem
**Solution**: How to resolve the issue
## Additional Resources
- Related documentation
- Community resources
- Further reading
"""
elif doc_type == "tutorial":
template = f"""# {topic} Tutorial
## What You'll Learn
By the end of this tutorial, you will understand the basics of {topic} and be able to implement practical examples.
## Before You Begin
Ensure you have:
- Required software installed
- Basic knowledge of prerequisites
- Access to example files
## Tutorial Steps
### 1. Introduction
Let's start with a simple example to understand {topic}.
### 2. Hands-on Exercise
Follow along with this practical exercise.
### 3. Advanced Usage
Now let's explore more advanced features.
### 4. Real-world Example
Here's how to apply {topic} in a real-world scenario.
## Next Steps
- Explore advanced features
- Check out related tutorials
- Join the community
"""
else:
template = f"""# {topic} Documentation
This is AI-generated {doc_type} documentation for {topic}.
Please customize this template based on your specific needs and requirements.
"""
response = (
f"**Generated {doc_type.title()} Documentation: {topic}**\n\n"
f"{template}\n\n"
f"*This documentation was AI-generated based on {len(context_content)} source document(s). "
f"Please review and edit as needed for accuracy and completeness.*"
)
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error generating documentation for '{topic}': {e}")
return [TextContent(type="text", text=f"Error generating documentation: {str(e)}")]
async def _translate_content(self, document_id: str, target_language: str, preserve_formatting: bool = True) -> List[TextContent]:
"""Translate documentation content to different languages."""
try:
logger.info(f"Translating document {document_id} to {target_language}")
# Get the document content
doc_result = await self._get_document(document_id)
if not doc_result or not doc_result[0].text:
return [TextContent(type="text", text=f"Unable to retrieve document {document_id} for translation")]
content = doc_result[0].text
# Language mapping for full names
language_names = {
'es': 'Spanish', 'fr': 'French', 'de': 'German', 'it': 'Italian',
'pt': 'Portuguese', 'ru': 'Russian', 'zh': 'Chinese', 'ja': 'Japanese',
'ko': 'Korean', 'ar': 'Arabic', 'hi': 'Hindi', 'nl': 'Dutch'
}
target_lang_name = language_names.get(target_language.lower(), target_language)
# Simulate translation (in production, use a proper translation service)
translated_content = f"""[TRANSLATION TO {target_lang_name.upper()}]
{content}
[END TRANSLATION]
---
**Translation Notes:**
- Original document: {document_id}
- Target language: {target_lang_name} ({target_language})
- Formatting preserved: {preserve_formatting}
- Translation method: AI-assisted (requires manual review)
**Important**: This is a simulated translation. In a production environment,
this would be processed by a professional translation service such as:
- Google Translate API
- Azure Translator
- AWS Translate
- DeepL API
Please review and edit the translation for accuracy and cultural appropriateness."""
return [TextContent(type="text", text=translated_content)]
except Exception as e:
logger.error(f"Error translating document {document_id}: {e}")
return [TextContent(type="text", text=f"Error translating content: {str(e)}")]
async def _extract_insights(self, source: Optional[str] = None, insight_type: str = "overview", include_metrics: bool = True) -> List[TextContent]:
"""Extract analytics and insights from documentation."""
try:
logger.info(f"Extracting {insight_type} insights for source: {source or 'all sources'}")
if not self.db_manager:
return [TextContent(type="text", text="Database not initialized")]
# Get documents for analysis
if source:
documents = self.db_manager.get_documents_by_source_name(source)
sources = [source]
else:
documents = self.db_manager.get_all_documents()
source_objs = self.db_manager.get_document_sources()
sources = [s.name for s in source_objs]
if not documents:
return [TextContent(type="text", text="No documents found for analysis")]
total_docs = len(documents)
total_words = sum(len((doc.processed_content or doc.content or "").split()) for doc in documents)
avg_words = total_words // total_docs if total_docs > 0 else 0
# Generate insights based on type
if insight_type == "overview":
insights = f"""# Documentation Insights - Overview
## Summary
Analyzed documentation across {len(sources)} source(s) with {total_docs} total documents.
## Key Metrics
- **Total Documents**: {total_docs}
- **Total Word Count**: {total_words:,}
- **Average Words per Document**: {avg_words:,}
- **Sources**: {', '.join(sources)}
## Content Analysis
- **Average Document Length**: {'Detailed' if avg_words > 500 else 'Concise' if avg_words > 200 else 'Brief'}
- **Documentation Coverage**: {'Comprehensive' if total_docs > 50 else 'Moderate' if total_docs > 20 else 'Basic'}
## Recommendations
- Consider expanding {'brief' if avg_words < 200 else 'detailed'} documentation
- {'Add more comprehensive content' if total_docs < 20 else 'Good documentation coverage'}
*Analysis generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
elif insight_type == "gaps":
# Analyze for common documentation gaps
expected_topics = ['installation', 'configuration', 'authentication', 'troubleshooting', 'examples']
covered_topics = []
missing_topics = []
for topic in expected_topics:
found = any(topic in (doc.title or "").lower() or topic in (doc.content or "").lower() for doc in documents)
if found:
covered_topics.append(topic)
else:
missing_topics.append(topic)
insights = f"""# Documentation Insights - Gap Analysis
## Coverage Analysis
### Covered Topics ({len(covered_topics)})
{chr(10).join(f'✓ {topic.title()}' for topic in covered_topics)}
### Potential Gaps ({len(missing_topics)})
{chr(10).join(f'⚠️ {topic.title()}' for topic in missing_topics)}
## Content Quality Score
**{len(covered_topics)}/{len(expected_topics)} topics covered ({len(covered_topics)/len(expected_topics)*100:.1f}%)**
*Gap analysis completed on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
else:
insights = f"""# Documentation Insights - {insight_type.title()}
Analyzed {total_docs} documents from {len(sources)} source(s).
## Basic Metrics
- Total documents: {total_docs}
- Average words per document: {avg_words}
- Sources analyzed: {', '.join(sources)}
*Analysis type '{insight_type}' completed on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
return [TextContent(type="text", text=insights)]
except Exception as e:
logger.error(f"Error extracting insights: {e}")
return [TextContent(type="text", text=f"Error extracting insights: {str(e)}")]
async def _suggest_improvements(self, document_id: str, focus_area: Optional[str] = None, audience: str = "all") -> List[TextContent]:
"""AI-powered suggestions for documentation improvements."""
try:
logger.info(f"Generating improvement suggestions for document {document_id}")
# Get the document content
doc_result = await self._get_document(document_id)
if not doc_result or not doc_result[0].text:
return [TextContent(type="text", text=f"Unable to retrieve document {document_id} for analysis")]
content = doc_result[0].text
word_count = len(content.split())
# Analyze document characteristics
has_examples = '```' in content or 'example' in content.lower()
has_headings = '#' in content or any(f'<h{i}>' in content for i in range(1, 7))
has_links = '](' in content or 'http' in content
has_lists = any(marker in content for marker in ['-', '*', '1.'])
# Generate suggestions based on focus area
suggestions = []
if not focus_area or focus_area == "clarity":
if word_count > 1000:
suggestions.append("Consider breaking this document into smaller, more focused sections")
if not has_headings:
suggestions.append("Add clear headings to improve document structure and navigation")
if word_count < 100:
suggestions.append("Consider expanding the content with more detailed explanations")
if not focus_area or focus_area == "completeness":
if not has_examples:
suggestions.append("Add practical examples to illustrate key concepts")
if 'todo' in content.lower() or 'tbd' in content.lower():
suggestions.append("Complete sections marked as TODO or TBD")
if not focus_area or focus_area == "structure":
if not has_lists and word_count > 200:
suggestions.append("Use bullet points or numbered lists to break up dense text")
if not has_headings:
suggestions.append("Add section headings to improve document organization")
if not focus_area or focus_area == "examples":
if not has_examples:
suggestions.append("Include code examples or practical demonstrations")
if has_examples and 'output' not in content.lower():
suggestions.append("Show expected outputs for code examples")
if not focus_area or focus_area == "accessibility":
if audience in ["beginner", "all"] and not any(term in content.lower() for term in ['basic', 'introduction', 'getting started']):
suggestions.append("Add introductory content for beginners")
if not has_links:
suggestions.append("Add links to related resources and references")
# If no specific suggestions, provide general ones
if not suggestions:
suggestions = [
"Document appears well-structured",
"Consider adding more interactive elements if applicable",
"Review content for current accuracy and relevance"
]
response = f"""# Improvement Suggestions for {document_id}
## Document Analysis
- **Word Count**: {word_count}
- **Has Examples**: {'Yes' if has_examples else 'No'}
- **Has Headings**: {'Yes' if has_headings else 'No'}
- **Has Links**: {'Yes' if has_links else 'No'}
- **Target Audience**: {audience.title()}
- **Focus Area**: {focus_area.title() if focus_area else 'General'}
## Suggested Improvements
{chr(10).join(f'{i+1}. {suggestion}' for i, suggestion in enumerate(suggestions))}
## Priority Recommendations
**High Priority**: {suggestions[0] if suggestions else 'No immediate issues identified'}
**Medium Priority**: {suggestions[1] if len(suggestions) > 1 else 'Consider regular content reviews'}
---
*Suggestions generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
*These are AI-generated suggestions. Please review and adapt based on your specific requirements.*
"""
return [TextContent(type="text", text=response)]
except Exception as e:
logger.error(f"Error generating suggestions for {document_id}: {e}")
return [TextContent(type="text", text=f"Error generating suggestions: {str(e)}")]
async def initialize(self):
"""Initialize the server."""
if self._initialized:
return
try:
logger.info("Initializing AnyDocs MCP Server...")
# Initialize database manager
self.db_manager = DatabaseManager(
database_url=self.config.database.url,
echo=self.config.database.echo
)
# Initialize database schema
self.db_manager.initialize_database()
# Initialize content engine
self.content_engine = ContentEngine()
# Load and initialize adapters
await self._initialize_adapters()
self._initialized = True
logger.info("AnyDocs MCP Server initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize server: {e}")
raise
async def _initialize_adapters(self) -> None:
"""Initialize document adapters based on configuration."""
try:
# Get configured sources from database
sources = self.db_manager.get_document_sources(active_only=True)
for source in sources:
try:
# Get adapter from registry
if source.adapter_type not in ADAPTER_REGISTRY:
logger.warning(f"Unknown adapter type: {source.adapter_type}")
continue
adapter_class = ADAPTER_REGISTRY[source.adapter_type]
adapter = adapter_class(source.config or {})
# Initialize adapter
await adapter.initialize()
self.adapters[source.name] = adapter
logger.info(f"Initialized adapter: {source.name} ({source.adapter_type})")
except Exception as e:
logger.error(f"Failed to initialize adapter {source.name}: {e}")
except Exception as e:
logger.error(f"Error initializing adapters: {e}")
async def run(self):
"""Run the MCP server."""
try:
# Initialize if not already done
await self.initialize()
logger.info("Starting AnyDocs MCP server...")
# Run server with stdio transport
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="anydocs-mcp",
server_version="0.1.0",
capabilities={
"resources": {},
"tools": {},
"logging": {}
}
)
)
except Exception as e:
logger.error(f"Server error: {e}")
raise
# Global server instance
_server_instance: Optional[AnyDocsMCPServer] = None
def get_server() -> AnyDocsMCPServer:
"""Get the global server instance."""
global _server_instance
if _server_instance is None:
_server_instance = AnyDocsMCPServer()
return _server_instance
async def main(config_path: Optional[str] = None):
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description="AnyDocs MCP Server - Transform documentation into MCP-compatible server"
)
parser.add_argument(
"-c", "--config",
help="Path to configuration file",
type=str,
default=config_path
)
parser.add_argument(
"--debug",
help="Enable debug mode",
action="store_true"
)
args = parser.parse_args()
try:
# Initialize configuration
if args.config:
config_manager = init_config_manager(args.config)
else:
config_manager = init_config_manager(config_file="config.yaml")
config = config_manager.get_config()
# Enable debug mode if requested
if args.debug:
config.server.debug = True
config.logging.level = "DEBUG"
# Create and run server
server = AnyDocsMCPServer(config)
await server.run()
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server failed: {e}")
raise
if __name__ == "__main__":
import sys
config_path = sys.argv[1] if len(sys.argv) > 1 else None
asyncio.run(main(config_path))