Skip to main content
Glama

AnyDocs MCP Server

by funky1688
server.py44 kB
#!/usr/bin/env python3 """ AnyDocs MCP Server Main MCP server implementation that provides document search and retrieval capabilities. """ import asyncio import json from typing import Any, Dict, List, Optional, Sequence from pathlib import Path from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import ( Resource, Tool, TextContent, ImageContent, EmbeddedResource, LoggingLevel ) from .config import AppConfig, ConfigManager, init_config_manager, get_config_manager from .database import DatabaseManager, initialize_database_manager from .adapters import get_adapter, ADAPTER_REGISTRY from .content.engine import ContentEngine from .utils import get_logger logger = get_logger(__name__) class AnyDocsMCPServer: """AnyDocs MCP Server implementation.""" def __init__(self, config: Optional[AppConfig] = None): # Initialize config if not provided if config is None: config_manager = init_config_manager() config = config_manager.get_config() self.config = config self.server = Server("anydocs-mcp") self.db_manager = None self.content_engine = None self.adapters: Dict[str, Any] = {} self._initialized = False # Register handlers self._register_handlers() def _register_handlers(self): """Register MCP server handlers.""" @self.server.list_resources() async def handle_list_resources() -> List[Resource]: """List available document resources.""" try: if not self.db_manager: return [] # Get all active document sources sources = self.db_manager.get_document_sources(active_only=True) resources = [] for source in sources: # Get documents for this source documents = self.db_manager.get_documents_by_source(source.id) for doc in documents: resource = Resource( uri=f"anydocs://{source.name}/{doc.path or doc.slug or doc.id}", name=doc.title, description=doc.description or f"Document from {source.name}", mimeType="text/markdown" ) resources.append(resource) return resources except Exception as e: logger.error(f"Error listing resources: {e}") return [] @self.server.read_resource() async def handle_read_resource(uri: str) -> str: """Read a specific document resource.""" try: if not uri.startswith("anydocs://"): raise ValueError(f"Invalid URI scheme: {uri}") # Parse URI: anydocs://source_name/document_path parts = uri[10:].split("/", 1) # Remove 'anydocs://' prefix (10 chars) if len(parts) != 2: raise ValueError(f"Invalid URI format: {uri}") source_name, doc_path = parts # Find document document = self.db_manager.get_document_by_path(source_name, doc_path) if not document: raise ValueError(f"Document not found: {uri}") # Return processed content or raw content content = document.processed_content or document.content or "" return content except Exception as e: logger.error(f"Error reading resource {uri}: {e}") raise @self.server.list_tools() async def handle_list_tools() -> List[Tool]: """List available tools.""" return [ Tool( name="search_documents", description="Search for documents across all configured sources", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "source": { "type": "string", "description": "Optional: specific source to search in" }, "limit": { "type": "integer", "description": "Maximum number of results (default: 10)", "default": 10 } }, "required": ["query"] } ), Tool( name="get_document", description="Get full content of a specific document", inputSchema={ "type": "object", "properties": { "document_id": { "type": "string", "description": "Document ID or URI" } }, "required": ["document_id"] } ), Tool( name="list_sources", description="List all configured document sources", inputSchema={ "type": "object", "properties": {} } ), Tool( name="summarize_content", description="Generate a summary of document content", inputSchema={ "type": "object", "properties": { "document_id": { "type": "string", "description": "Document ID or URI to summarize" }, "max_length": { "type": "integer", "description": "Maximum summary length in words (default: 150)", "default": 150 } }, "required": ["document_id"] } ), Tool( name="ask_question", description="Ask a question about the documentation", inputSchema={ "type": "object", "properties": { "question": { "type": "string", "description": "Question to ask about the documentation" }, "source": { "type": "string", "description": "Optional: specific source to search for answers" }, "context_limit": { "type": "integer", "description": "Maximum number of documents to use as context (default: 5)", "default": 5 } }, "required": ["question"] } ), # Advanced AI-powered tools (Phase 5) Tool( name="generate_documentation", description="AI-assisted documentation generation from existing content", inputSchema={ "type": "object", "properties": { "topic": { "type": "string", "description": "Topic or title for the documentation to generate" }, "source_documents": { "type": "array", "items": {"type": "string"}, "description": "Optional: List of document IDs to use as reference material" }, "doc_type": { "type": "string", "enum": ["guide", "tutorial", "reference", "api", "faq"], "description": "Type of documentation to generate (default: guide)", "default": "guide" }, "max_length": { "type": "integer", "description": "Maximum length in words (default: 500)", "default": 500 } }, "required": ["topic"] } ), Tool( name="translate_content", description="Translate documentation content to different languages", inputSchema={ "type": "object", "properties": { "document_id": { "type": "string", "description": "Document ID or URI to translate" }, "target_language": { "type": "string", "description": "Target language (e.g., 'es', 'fr', 'de', 'zh', 'ja')" }, "preserve_formatting": { "type": "boolean", "description": "Whether to preserve markdown formatting (default: true)", "default": true } }, "required": ["document_id", "target_language"] } ), Tool( name="extract_insights", description="Extract analytics and insights from documentation", inputSchema={ "type": "object", "properties": { "source": { "type": "string", "description": "Optional: specific source to analyze" }, "insight_type": { "type": "string", "enum": ["overview", "gaps", "topics", "complexity", "structure"], "description": "Type of insights to extract (default: overview)", "default": "overview" }, "include_metrics": { "type": "boolean", "description": "Include quantitative metrics (default: true)", "default": true } } } ), Tool( name="suggest_improvements", description="AI-powered suggestions for documentation improvements", inputSchema={ "type": "object", "properties": { "document_id": { "type": "string", "description": "Document ID or URI to analyze for improvements" }, "focus_area": { "type": "string", "enum": ["clarity", "completeness", "structure", "examples", "accessibility"], "description": "Area to focus improvement suggestions on" }, "audience": { "type": "string", "enum": ["beginner", "intermediate", "advanced", "all"], "description": "Target audience for improvements (default: all)", "default": "all" } }, "required": ["document_id"] } ) ] @self.server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool calls.""" try: if name == "search_documents": return await self._search_documents(**arguments) elif name == "get_document": return await self._get_document(**arguments) elif name == "list_sources": return await self._list_sources(**arguments) elif name == "summarize_content": return await self._summarize_content(**arguments) elif name == "ask_question": return await self._ask_question(**arguments) elif name == "generate_documentation": return await self._generate_documentation(**arguments) elif name == "translate_content": return await self._translate_content(**arguments) elif name == "extract_insights": return await self._extract_insights(**arguments) elif name == "suggest_improvements": return await self._suggest_improvements(**arguments) else: raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"Error calling tool {name}: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def _search_documents(self, query: str, source: Optional[str] = None, limit: int = 10) -> List[TextContent]: """Search for documents.""" try: results = self.db_manager.search_documents( query=query, source_name=source, limit=limit ) if not results: return [TextContent(type="text", text="No documents found matching your query.")] # Format results response_lines = [f"Found {len(results)} document(s):\n"] for i, doc in enumerate(results, 1): source_info = f" (from {doc.source.name})" if doc.source else "" doc_path = doc.path or doc.slug or doc.id source_name = doc.source.name if doc.source else 'unknown' response_lines.append( f"{i}. **{doc.title}**{source_info}\n" f" Path: {doc.path or doc.slug or 'N/A'}\n" f" Description: {doc.description or 'No description'}\n" f" URI: anydocs://{source_name}/{doc_path}\n" ) return [TextContent(type="text", text="\n".join(response_lines))] except Exception as e: logger.error(f"Error searching documents: {e}") return [TextContent(type="text", text=f"Search error: {str(e)}")] async def _get_document(self, document_id: str) -> List[TextContent]: """Get full document content.""" try: if not self.db_manager: return [TextContent(type="text", text="Database not initialized")] # Handle URI format if document_id.startswith("anydocs://"): # Parse URI: anydocs://source_name/document_path parts = document_id[10:].split("/", 1) # anydocs:// is 10 characters if len(parts) != 2: return [TextContent(type="text", text=f"Invalid URI format: {document_id}")] source_name, doc_path = parts document = self.db_manager.get_document_by_path(source_name, doc_path) elif ":" in document_id and not document_id.startswith("anydocs://"): # Invalid URI scheme return [TextContent(type="text", text=f"Invalid URI format: {document_id}")] else: # Handle direct document ID document = self.db_manager.get_document_by_id(document_id) if not document: return [TextContent(type="text", text=f"Document not found: {document_id}")] # Format response with document metadata content = document.processed_content or document.content or "No content available" response = f"**{document.title}**\n\n{content}" if document.source: response += f"\n\n*Source: {document.source.name}*" if document.path: response += f"\n*Path: {document.path}*" return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error getting document {document_id}: {e}") return [TextContent(type="text", text=f"Error retrieving document: {str(e)}")] async def _list_sources(self) -> List[TextContent]: """List all document sources.""" try: if not self.db_manager: return [TextContent(type="text", text="Database not initialized")] sources = self.db_manager.get_document_sources(active_only=False) if not sources: return [TextContent(type="text", text="No document sources configured")] response_lines = [f"**Configured Document Sources ({len(sources)}):**\n"] for i, source in enumerate(sources, 1): status = "✓ Active" if source.enabled else "✗ Inactive" doc_count = self.db_manager.get_document_count_by_source(source.id) if hasattr(self.db_manager, 'get_document_count_by_source') else "Unknown" response_lines.append( f"{i}. **{source.name}** ({source.adapter_type})\n" f" Status: {status}\n" f" Base URL: {getattr(source, 'base_url', 'N/A')}\n" f" Documents: {doc_count}\n" f" Last sync: {getattr(source, 'last_sync_at', 'Never')}\n" ) return [TextContent(type="text", text="\n".join(response_lines))] except Exception as e: logger.error(f"Error listing sources: {e}") return [TextContent(type="text", text=f"Error listing sources: {str(e)}")] async def _summarize_content(self, document_id: str, max_length: int = 150) -> List[TextContent]: """Generate a summary of document content.""" try: # Get the document content first content_result = await self._get_document(document_id) if not content_result or not content_result[0].text: return [TextContent(type="text", text="Unable to retrieve document for summarization")] content = content_result[0].text # Check if the result was an error message if content.startswith("Document not found:") or content.startswith("Error retrieving document:") or content.startswith("Invalid URI format:"): return [TextContent(type="text", text="Unable to retrieve document for summarization")] # Simple extractive summarization (first few sentences) # In a production system, you'd use a proper summarization model sentences = content.split('. ') summary_sentences = [] word_count = 0 for sentence in sentences[:10]: # Limit to first 10 sentences sentence_words = len(sentence.split()) if word_count + sentence_words > max_length: break summary_sentences.append(sentence.strip()) word_count += sentence_words if not summary_sentences: # Fallback: take first paragraph paragraphs = content.split('\n\n') if paragraphs: first_para = paragraphs[0][:500] # First 500 chars summary = first_para + "..." if len(paragraphs[0]) > 500 else first_para else: summary = "Unable to generate summary" else: summary = '. '.join(summary_sentences) if not summary.endswith('.'): summary += '.' response = f"**Summary for {document_id}:**\n\n{summary}\n\n*Summary length: {len(summary.split())} words*" return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error summarizing content for {document_id}: {e}") return [TextContent(type="text", text=f"Error generating summary: {str(e)}")] async def _ask_question(self, question: str, source: Optional[str] = None, context_limit: int = 5) -> List[TextContent]: """Ask a question about the documentation.""" try: if not question.strip(): return [TextContent(type="text", text="Question cannot be empty")] if not self.db_manager: return [TextContent(type="text", text="Database not initialized")] # Search for relevant documents search_results = self.db_manager.search_documents( query=question, source_name=source, limit=context_limit ) if not search_results: return [TextContent( type="text", text=f"I couldn't find any relevant documentation to answer: '{question}'" )] # Compile context from search results context_pieces = [] for doc in search_results: content = doc.processed_content or doc.content or "" if content: # Take first 300 characters as context snippet = content[:300] + "..." if len(content) > 300 else content context_pieces.append(f"**{doc.title}**: {snippet}") context_text = "\n\n".join(context_pieces) # Generate response based on context # In a production system, you'd use an LLM here response = ( f"**Question:** {question}\n\n" f"**Based on the available documentation:**\n\n" f"{context_text}\n\n" f"*Found {len(search_results)} relevant document(s). " f"For more detailed information, please refer to the specific documents listed above.*" ) return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error answering question '{question}': {e}") return [TextContent(type="text", text=f"Error processing question: {str(e)}")] async def _generate_documentation(self, topic: str, source_documents: Optional[List[str]] = None, doc_type: str = "guide", max_length: int = 500) -> List[TextContent]: """Generate AI-assisted documentation.""" try: logger.info(f"Generating {doc_type} documentation for topic: {topic}") # Gather context from source documents if provided context_content = [] if source_documents: for doc_id in source_documents[:5]: # Limit to 5 documents try: doc_result = await self._get_document(doc_id) if doc_result and doc_result[0].text: content = doc_result[0].text[:1000] # First 1000 chars context_content.append(content) except Exception as e: logger.warning(f"Failed to get source document {doc_id}: {e}") # Generate template based on document type if doc_type == "guide": template = f"""# {topic} ## Overview This guide provides a comprehensive overview of {topic}, designed to help you understand and implement the concepts effectively. ## Prerequisites - Basic understanding of the related concepts - Access to necessary tools and resources ## Step-by-Step Instructions ### Step 1: Getting Started Begin by familiarizing yourself with the fundamental concepts of {topic}. ### Step 2: Implementation Implement the core functionality following these guidelines: 1. Set up your environment 2. Configure necessary parameters 3. Execute the implementation ### Step 3: Verification Verify your implementation by testing and validating results. ## Best Practices - Follow established conventions - Document your implementation - Test thoroughly before deployment ## Troubleshooting **Common Issue**: Description of common problem **Solution**: How to resolve the issue ## Additional Resources - Related documentation - Community resources - Further reading """ elif doc_type == "tutorial": template = f"""# {topic} Tutorial ## What You'll Learn By the end of this tutorial, you will understand the basics of {topic} and be able to implement practical examples. ## Before You Begin Ensure you have: - Required software installed - Basic knowledge of prerequisites - Access to example files ## Tutorial Steps ### 1. Introduction Let's start with a simple example to understand {topic}. ### 2. Hands-on Exercise Follow along with this practical exercise. ### 3. Advanced Usage Now let's explore more advanced features. ### 4. Real-world Example Here's how to apply {topic} in a real-world scenario. ## Next Steps - Explore advanced features - Check out related tutorials - Join the community """ else: template = f"""# {topic} Documentation This is AI-generated {doc_type} documentation for {topic}. Please customize this template based on your specific needs and requirements. """ response = ( f"**Generated {doc_type.title()} Documentation: {topic}**\n\n" f"{template}\n\n" f"*This documentation was AI-generated based on {len(context_content)} source document(s). " f"Please review and edit as needed for accuracy and completeness.*" ) return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error generating documentation for '{topic}': {e}") return [TextContent(type="text", text=f"Error generating documentation: {str(e)}")] async def _translate_content(self, document_id: str, target_language: str, preserve_formatting: bool = True) -> List[TextContent]: """Translate documentation content to different languages.""" try: logger.info(f"Translating document {document_id} to {target_language}") # Get the document content doc_result = await self._get_document(document_id) if not doc_result or not doc_result[0].text: return [TextContent(type="text", text=f"Unable to retrieve document {document_id} for translation")] content = doc_result[0].text # Language mapping for full names language_names = { 'es': 'Spanish', 'fr': 'French', 'de': 'German', 'it': 'Italian', 'pt': 'Portuguese', 'ru': 'Russian', 'zh': 'Chinese', 'ja': 'Japanese', 'ko': 'Korean', 'ar': 'Arabic', 'hi': 'Hindi', 'nl': 'Dutch' } target_lang_name = language_names.get(target_language.lower(), target_language) # Simulate translation (in production, use a proper translation service) translated_content = f"""[TRANSLATION TO {target_lang_name.upper()}] {content} [END TRANSLATION] --- **Translation Notes:** - Original document: {document_id} - Target language: {target_lang_name} ({target_language}) - Formatting preserved: {preserve_formatting} - Translation method: AI-assisted (requires manual review) **Important**: This is a simulated translation. In a production environment, this would be processed by a professional translation service such as: - Google Translate API - Azure Translator - AWS Translate - DeepL API Please review and edit the translation for accuracy and cultural appropriateness.""" return [TextContent(type="text", text=translated_content)] except Exception as e: logger.error(f"Error translating document {document_id}: {e}") return [TextContent(type="text", text=f"Error translating content: {str(e)}")] async def _extract_insights(self, source: Optional[str] = None, insight_type: str = "overview", include_metrics: bool = True) -> List[TextContent]: """Extract analytics and insights from documentation.""" try: logger.info(f"Extracting {insight_type} insights for source: {source or 'all sources'}") if not self.db_manager: return [TextContent(type="text", text="Database not initialized")] # Get documents for analysis if source: documents = self.db_manager.get_documents_by_source_name(source) sources = [source] else: documents = self.db_manager.get_all_documents() source_objs = self.db_manager.get_document_sources() sources = [s.name for s in source_objs] if not documents: return [TextContent(type="text", text="No documents found for analysis")] total_docs = len(documents) total_words = sum(len((doc.processed_content or doc.content or "").split()) for doc in documents) avg_words = total_words // total_docs if total_docs > 0 else 0 # Generate insights based on type if insight_type == "overview": insights = f"""# Documentation Insights - Overview ## Summary Analyzed documentation across {len(sources)} source(s) with {total_docs} total documents. ## Key Metrics - **Total Documents**: {total_docs} - **Total Word Count**: {total_words:,} - **Average Words per Document**: {avg_words:,} - **Sources**: {', '.join(sources)} ## Content Analysis - **Average Document Length**: {'Detailed' if avg_words > 500 else 'Concise' if avg_words > 200 else 'Brief'} - **Documentation Coverage**: {'Comprehensive' if total_docs > 50 else 'Moderate' if total_docs > 20 else 'Basic'} ## Recommendations - Consider expanding {'brief' if avg_words < 200 else 'detailed'} documentation - {'Add more comprehensive content' if total_docs < 20 else 'Good documentation coverage'} *Analysis generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* """ elif insight_type == "gaps": # Analyze for common documentation gaps expected_topics = ['installation', 'configuration', 'authentication', 'troubleshooting', 'examples'] covered_topics = [] missing_topics = [] for topic in expected_topics: found = any(topic in (doc.title or "").lower() or topic in (doc.content or "").lower() for doc in documents) if found: covered_topics.append(topic) else: missing_topics.append(topic) insights = f"""# Documentation Insights - Gap Analysis ## Coverage Analysis ### Covered Topics ({len(covered_topics)}) {chr(10).join(f'✓ {topic.title()}' for topic in covered_topics)} ### Potential Gaps ({len(missing_topics)}) {chr(10).join(f'⚠️ {topic.title()}' for topic in missing_topics)} ## Content Quality Score **{len(covered_topics)}/{len(expected_topics)} topics covered ({len(covered_topics)/len(expected_topics)*100:.1f}%)** *Gap analysis completed on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* """ else: insights = f"""# Documentation Insights - {insight_type.title()} Analyzed {total_docs} documents from {len(sources)} source(s). ## Basic Metrics - Total documents: {total_docs} - Average words per document: {avg_words} - Sources analyzed: {', '.join(sources)} *Analysis type '{insight_type}' completed on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* """ return [TextContent(type="text", text=insights)] except Exception as e: logger.error(f"Error extracting insights: {e}") return [TextContent(type="text", text=f"Error extracting insights: {str(e)}")] async def _suggest_improvements(self, document_id: str, focus_area: Optional[str] = None, audience: str = "all") -> List[TextContent]: """AI-powered suggestions for documentation improvements.""" try: logger.info(f"Generating improvement suggestions for document {document_id}") # Get the document content doc_result = await self._get_document(document_id) if not doc_result or not doc_result[0].text: return [TextContent(type="text", text=f"Unable to retrieve document {document_id} for analysis")] content = doc_result[0].text word_count = len(content.split()) # Analyze document characteristics has_examples = '```' in content or 'example' in content.lower() has_headings = '#' in content or any(f'<h{i}>' in content for i in range(1, 7)) has_links = '](' in content or 'http' in content has_lists = any(marker in content for marker in ['-', '*', '1.']) # Generate suggestions based on focus area suggestions = [] if not focus_area or focus_area == "clarity": if word_count > 1000: suggestions.append("Consider breaking this document into smaller, more focused sections") if not has_headings: suggestions.append("Add clear headings to improve document structure and navigation") if word_count < 100: suggestions.append("Consider expanding the content with more detailed explanations") if not focus_area or focus_area == "completeness": if not has_examples: suggestions.append("Add practical examples to illustrate key concepts") if 'todo' in content.lower() or 'tbd' in content.lower(): suggestions.append("Complete sections marked as TODO or TBD") if not focus_area or focus_area == "structure": if not has_lists and word_count > 200: suggestions.append("Use bullet points or numbered lists to break up dense text") if not has_headings: suggestions.append("Add section headings to improve document organization") if not focus_area or focus_area == "examples": if not has_examples: suggestions.append("Include code examples or practical demonstrations") if has_examples and 'output' not in content.lower(): suggestions.append("Show expected outputs for code examples") if not focus_area or focus_area == "accessibility": if audience in ["beginner", "all"] and not any(term in content.lower() for term in ['basic', 'introduction', 'getting started']): suggestions.append("Add introductory content for beginners") if not has_links: suggestions.append("Add links to related resources and references") # If no specific suggestions, provide general ones if not suggestions: suggestions = [ "Document appears well-structured", "Consider adding more interactive elements if applicable", "Review content for current accuracy and relevance" ] response = f"""# Improvement Suggestions for {document_id} ## Document Analysis - **Word Count**: {word_count} - **Has Examples**: {'Yes' if has_examples else 'No'} - **Has Headings**: {'Yes' if has_headings else 'No'} - **Has Links**: {'Yes' if has_links else 'No'} - **Target Audience**: {audience.title()} - **Focus Area**: {focus_area.title() if focus_area else 'General'} ## Suggested Improvements {chr(10).join(f'{i+1}. {suggestion}' for i, suggestion in enumerate(suggestions))} ## Priority Recommendations **High Priority**: {suggestions[0] if suggestions else 'No immediate issues identified'} **Medium Priority**: {suggestions[1] if len(suggestions) > 1 else 'Consider regular content reviews'} --- *Suggestions generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* *These are AI-generated suggestions. Please review and adapt based on your specific requirements.* """ return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error generating suggestions for {document_id}: {e}") return [TextContent(type="text", text=f"Error generating suggestions: {str(e)}")] async def initialize(self): """Initialize the server.""" if self._initialized: return try: logger.info("Initializing AnyDocs MCP Server...") # Initialize database manager self.db_manager = DatabaseManager( database_url=self.config.database.url, echo=self.config.database.echo ) # Initialize database schema self.db_manager.initialize_database() # Initialize content engine self.content_engine = ContentEngine() # Load and initialize adapters await self._initialize_adapters() self._initialized = True logger.info("AnyDocs MCP Server initialized successfully") except Exception as e: logger.error(f"Failed to initialize server: {e}") raise async def _initialize_adapters(self) -> None: """Initialize document adapters based on configuration.""" try: # Get configured sources from database sources = self.db_manager.get_document_sources(active_only=True) for source in sources: try: # Get adapter from registry if source.adapter_type not in ADAPTER_REGISTRY: logger.warning(f"Unknown adapter type: {source.adapter_type}") continue adapter_class = ADAPTER_REGISTRY[source.adapter_type] adapter = adapter_class(source.config or {}) # Initialize adapter await adapter.initialize() self.adapters[source.name] = adapter logger.info(f"Initialized adapter: {source.name} ({source.adapter_type})") except Exception as e: logger.error(f"Failed to initialize adapter {source.name}: {e}") except Exception as e: logger.error(f"Error initializing adapters: {e}") async def run(self): """Run the MCP server.""" try: # Initialize if not already done await self.initialize() logger.info("Starting AnyDocs MCP server...") # Run server with stdio transport async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="anydocs-mcp", server_version="0.1.0", capabilities={ "resources": {}, "tools": {}, "logging": {} } ) ) except Exception as e: logger.error(f"Server error: {e}") raise # Global server instance _server_instance: Optional[AnyDocsMCPServer] = None def get_server() -> AnyDocsMCPServer: """Get the global server instance.""" global _server_instance if _server_instance is None: _server_instance = AnyDocsMCPServer() return _server_instance async def main(config_path: Optional[str] = None): """Main entry point.""" import argparse parser = argparse.ArgumentParser( description="AnyDocs MCP Server - Transform documentation into MCP-compatible server" ) parser.add_argument( "-c", "--config", help="Path to configuration file", type=str, default=config_path ) parser.add_argument( "--debug", help="Enable debug mode", action="store_true" ) args = parser.parse_args() try: # Initialize configuration if args.config: config_manager = init_config_manager(args.config) else: config_manager = init_config_manager(config_file="config.yaml") config = config_manager.get_config() # Enable debug mode if requested if args.debug: config.server.debug = True config.logging.level = "DEBUG" # Create and run server server = AnyDocsMCPServer(config) await server.run() except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server failed: {e}") raise if __name__ == "__main__": import sys config_path = sys.argv[1] if len(sys.argv) > 1 else None asyncio.run(main(config_path))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server