"""
Main MCP server entry point for case study generation.
This module implements the Model Context Protocol server that exposes
three main tools for processing documents, analyzing GitHub repositories,
and researching companies using Gemma3.
"""
import asyncio
import logging
import json
from typing import Any, Dict, List, Optional
import os
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
from mcp.server.stdio import stdio_server
from document_processor import DocumentProcessor, DocumentProcessorError
from github_analyzer import GitHubAnalyzer, GitHubAnalyzerError
from company_researcher import CompanyResearcher, CompanyResearcherError
from gemma3_client import get_gemma3_client, Gemma3ClientError
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CaseStudyMCPServer:
"""
MCP Server for case study generation using Gemma3.
Provides three main tools:
1. process_document_text - Extract business insights from documents
2. analyze_github_repo - Analyze GitHub repositories for business value
3. research_company_basic - Generate basic company research and context
"""
def __init__(self):
"""Initialize the MCP server with processors."""
self.server = Server("case-study-mcp-server")
# Initialize processors
try:
self.gemma3_client = get_gemma3_client()
self.document_processor = DocumentProcessor(self.gemma3_client)
# Get GitHub token from environment
github_token = os.getenv("GITHUB_TOKEN")
self.github_analyzer = GitHubAnalyzer(self.gemma3_client, github_token)
self.company_researcher = CompanyResearcher(self.gemma3_client)
logger.info("MCP server initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize MCP server: {e}")
raise
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register MCP protocol handlers."""
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="process_document_text",
description="Process document content with Gemma3 to extract business insights",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Document content from Google Drive or other sources"
},
"doc_type": {
"type": "string",
"enum": ["proposal", "case_study", "contract", "general"],
"description": "Type of document being processed",
"default": "general"
}
},
"required": ["text"]
}
),
Tool(
name="analyze_github_repo",
description="Analyze GitHub repository for business value and technical insights",
inputSchema={
"type": "object",
"properties": {
"repo_url": {
"type": "string",
"description": "GitHub repository URL (e.g., github.com/owner/repo)"
}
},
"required": ["repo_url"]
}
),
Tool(
name="research_company_basic",
description="Basic company research and context generation",
inputSchema={
"type": "object",
"properties": {
"company_name": {
"type": "string",
"description": "Name of the company to research"
},
"company_context": {
"type": "string",
"description": "Optional additional context about the company",
"default": ""
}
},
"required": ["company_name"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> List[TextContent]:
"""Handle tool calls."""
try:
if name == "process_document_text":
return await self._handle_process_document_text(arguments)
elif name == "analyze_github_repo":
return await self._handle_analyze_github_repo(arguments)
elif name == "research_company_basic":
return await self._handle_research_company_basic(arguments)
else:
return [TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
logger.error(f"Tool call failed for {name}: {e}")
return [TextContent(
type="text",
text=f"Tool execution failed: {str(e)}"
)]
@self.server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""List available resources."""
return [
Resource(
uri="health://status",
name="MCP Server Health Status",
description="Current health status of the MCP server and its components",
mimeType="application/json"
)
]
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read a resource."""
if uri == "health://status":
health_status = await self._get_health_status()
return json.dumps(health_status, indent=2)
else:
raise ValueError(f"Unknown resource: {uri}")
async def _handle_process_document_text(self, arguments: dict) -> List[TextContent]:
"""Handle document text processing."""
try:
text = arguments.get("text", "")
doc_type = arguments.get("doc_type", "general")
if not text:
return [TextContent(
type="text",
text="Error: Document text is required"
)]
# Process document
result = await self.document_processor.process_document_content_async(text, doc_type)
# Format response
if result.get("processing_success", False):
response = {
"success": True,
"document_type": result.get("document_type"),
"insights": {
"challenges": result.get("challenges", []),
"solutions": result.get("solutions", []),
"metrics": result.get("metrics", []),
"context": result.get("context", ""),
"key_stakeholders": result.get("key_stakeholders", []),
"timeline": result.get("timeline", "")
},
"metadata": {
"document_length": result.get("document_length"),
"processed_length": result.get("processed_length")
}
}
else:
response = {
"success": False,
"error": result.get("error", "Processing failed"),
"fallback_insights": result
}
return [TextContent(
type="text",
text=json.dumps(response, indent=2)
)]
except DocumentProcessorError as e:
return [TextContent(
type="text",
text=f"Document processing error: {str(e)}"
)]
except Exception as e:
logger.error(f"Unexpected error in document processing: {e}")
return [TextContent(
type="text",
text=f"Unexpected error: {str(e)}"
)]
async def _handle_analyze_github_repo(self, arguments: dict) -> List[TextContent]:
"""Handle GitHub repository analysis."""
try:
repo_url = arguments.get("repo_url", "")
if not repo_url:
return [TextContent(
type="text",
text="Error: Repository URL is required"
)]
# Analyze repository
result = await self.github_analyzer.analyze_repository_async(repo_url)
# Format response
if result.get("analysis_success", False):
response = {
"success": True,
"repository": {
"url": result.get("repository_url"),
"name": result.get("repository_name"),
"tech_stack": result.get("tech_stack", []),
"stars": result.get("stars"),
"language": result.get("language")
},
"business_analysis": {
"problem_solved": result.get("problem_solved", ""),
"key_features": result.get("key_features", []),
"business_value": result.get("business_value", ""),
"technical_benefits": result.get("technical_benefits", []),
"target_users": result.get("target_users", ""),
"scalability": result.get("scalability", ""),
"integration_points": result.get("integration_points", [])
},
"metadata": {
"license": result.get("license"),
"topics": result.get("topics", [])
}
}
else:
response = {
"success": False,
"error": result.get("error", "Analysis failed"),
"fallback_analysis": result
}
return [TextContent(
type="text",
text=json.dumps(response, indent=2)
)]
except GitHubAnalyzerError as e:
return [TextContent(
type="text",
text=f"GitHub analysis error: {str(e)}"
)]
except Exception as e:
logger.error(f"Unexpected error in GitHub analysis: {e}")
return [TextContent(
type="text",
text=f"Unexpected error: {str(e)}"
)]
async def _handle_research_company_basic(self, arguments: dict) -> List[TextContent]:
"""Handle basic company research."""
try:
company_name = arguments.get("company_name", "")
company_context = arguments.get("company_context", "")
if not company_name:
return [TextContent(
type="text",
text="Error: Company name is required"
)]
# Research company
result = await self.company_researcher.research_company_async(company_name, company_context)
# Format response
if result.get("research_success", False):
response = {
"success": True,
"company": {
"name": result.get("company_name"),
"profile": result.get("company_profile", ""),
"industry": result.get("industry", ""),
"business_model": result.get("business_model", "")
},
"insights": {
"challenges": result.get("challenges", []),
"opportunities": result.get("opportunities", []),
"technology_needs": result.get("technology_needs", [])
},
"context_analysis": {
"detected_industry": result.get("detected_industry"),
"detected_business_model": result.get("detected_business_model"),
"enhanced_context": result.get("enhanced_context")
}
}
else:
response = {
"success": False,
"error": result.get("error", "Research failed"),
"fallback_research": result
}
return [TextContent(
type="text",
text=json.dumps(response, indent=2)
)]
except CompanyResearcherError as e:
return [TextContent(
type="text",
text=f"Company research error: {str(e)}"
)]
except Exception as e:
logger.error(f"Unexpected error in company research: {e}")
return [TextContent(
type="text",
text=f"Unexpected error: {str(e)}"
)]
async def _get_health_status(self) -> Dict[str, Any]:
"""Get comprehensive health status."""
try:
# Check Gemma3 client
gemma3_health = self.gemma3_client.health_check()
# Check GitHub API
github_health = self.github_analyzer.health_check()
# Check document processor (no health check method available)
doc_health = {"status": "initialized", "processor": "DocumentProcessor"}
# Check company researcher
company_health = self.company_researcher.health_check()
return {
"server_status": "healthy",
"timestamp": "health_check_performed",
"components": {
"gemma3_client": gemma3_health,
"github_analyzer": github_health,
"document_processor": doc_health,
"company_researcher": company_health
},
"capabilities": {
"document_processing": True,
"github_analysis": True,
"company_research": True
}
}
except Exception as e:
return {
"server_status": "unhealthy",
"error": str(e),
"timestamp": "health_check_failed"
}
async def run(self):
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="case-study-mcp-server",
server_version="0.1.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
)
async def main():
"""Main entry point."""
try:
# Check for required environment or configuration
logger.info("Starting Case Study MCP Server...")
# Initialize and run server
server = CaseStudyMCPServer()
await server.run()
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server failed to start: {e}")
raise
if __name__ == "__main__":
# Run the server
asyncio.run(main())