#!/usr/bin/env python3
"""
ProductMaster MCP Server - 標準MCPプロトコル実装
自然言語での曖昧検索に対応した商品情報管理システム
"""
import asyncio
import json
import logging
import sys
from typing import Any, Dict, List, Optional
from datetime import datetime
# MCP標準ライブラリ
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
from pydantic import AnyUrl
import httpx
# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("productmaster-mcp")
# ProductMaster APIベースURL
PRODUCTMASTER_BASE_URL = "http://localhost:8001"
class ProductMasterMCPServer:
"""ProductMaster MCP Server - 標準MCPプロトコル実装"""
def __init__(self):
self.server = Server("productmaster-mcp")
self.http_client = None
self.setup_handlers()
async def __aenter__(self):
self.http_client = httpx.AsyncClient(timeout=30.0)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.http_client:
await self.http_client.aclose()
def setup_handlers(self):
"""MCPハンドラー設定"""
@self.server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""MCPツール一覧を返す"""
return [
Tool(
name="search_products_flexible",
description="柔軟な商品検索 - 自然言語クエリや複数条件での商品検索。例: 'ドル建てで満期日が2027年頃の債券'",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "自然言語での検索クエリ。商品名、発行者、説明文からキーワード検索"
},
"product_types": {
"type": "array",
"items": {"type": "string", "enum": ["bond", "stock", "fund"]},
"description": "商品タイプフィルター: bond(債券), stock(株式), fund(投資信託)"
},
"currencies": {
"type": "array",
"items": {"type": "string"},
"description": "通貨フィルター: USD, JPY, EUR等"
},
"investment_range": {
"type": "object",
"properties": {
"min": {"type": "number", "description": "最小投資額"},
"max": {"type": "number", "description": "最大投資額"}
},
"description": "最小投資額範囲でフィルタリング"
},
"risk_levels": {
"type": "array",
"items": {"type": "integer", "minimum": 1, "maximum": 5},
"description": "リスクレベルフィルター: 1(低リスク) - 5(高リスク)"
},
"issuer_keywords": {
"type": "array",
"items": {"type": "string"},
"description": "発行者キーワード: Apple, トヨタ, 日本国等"
},
"sort_by": {
"type": "string",
"enum": ["relevance", "investment_asc", "investment_desc", "risk_asc", "risk_desc"],
"default": "relevance",
"description": "ソート順序: relevance(関連度), investment_desc(投資額降順), risk_desc(リスク降順)等"
},
"limit": {
"type": "integer",
"default": 10,
"minimum": 1,
"maximum": 50,
"description": "取得件数上限"
}
},
"required": []
}
),
Tool(
name="get_product_details",
description="商品詳細情報取得 - 商品コードを指定して詳細情報を取得",
inputSchema={
"type": "object",
"properties": {
"product_code": {
"type": "string",
"description": "商品コード (例: AAPL, JGB394-10Y, TOYOTA50)"
}
},
"required": ["product_code"]
}
),
Tool(
name="get_product_statistics",
description="商品統計情報取得 - 商品タイプ別、通貨別、リスクレベル別の統計情報",
inputSchema={
"type": "object",
"properties": {
"group_by": {
"type": "string",
"enum": ["product_type", "currency", "risk_level", "issuer"],
"description": "グループ化基準: product_type(商品タイプ別), currency(通貨別), risk_level(リスクレベル別), issuer(発行者別)"
},
"filters": {
"type": "object",
"description": "統計対象を絞り込むためのフィルター条件"
}
},
"required": []
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""MCPツール呼び出し処理"""
try:
logger.info(f"Tool called: {name} with arguments: {arguments}")
if name == "search_products_flexible":
result = await self.search_products_flexible(**arguments)
elif name == "get_product_details":
result = await self.get_product_details(**arguments)
elif name == "get_product_statistics":
result = await self.get_product_statistics(**arguments)
else:
raise ValueError(f"Unknown tool: {name}")
# 結果をJSON形式で返す
response_text = json.dumps(result, ensure_ascii=False, indent=2)
logger.info(f"Tool {name} completed successfully")
return [TextContent(type="text", text=response_text)]
except Exception as e:
error_msg = f"Tool {name} error: {str(e)}"
logger.error(error_msg)
return [TextContent(type="text", text=json.dumps({"error": error_msg}, ensure_ascii=False))]
async def get_all_products(self) -> Dict[str, Any]:
"""全商品データ取得"""
try:
response = await self.http_client.get(f"{PRODUCTMASTER_BASE_URL}/api/products")
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Failed to fetch products: {e}")
return {"error": str(e), "products": []}
async def search_products_flexible(self, **kwargs) -> Dict[str, Any]:
"""柔軟な商品検索実装"""
try:
# ProductMaster APIから全商品取得
data = await self.get_all_products()
if "error" in data:
return data
products = data.get("products", [])
# フィルタリング適用
filtered_products = self._apply_filters(products, **kwargs)
# ソート適用
sorted_products = self._apply_sorting(filtered_products, kwargs.get("sort_by", "relevance"))
# 件数制限
limit = kwargs.get("limit", 10)
limited_products = sorted_products[:limit]
return {
"success": True,
"products": limited_products,
"total_found": len(filtered_products),
"returned": len(limited_products),
"filters_applied": self._get_applied_filters(**kwargs),
"query": kwargs.get("query", ""),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Search error: {e}")
return {"success": False, "error": str(e)}
def _apply_filters(self, products: List[Dict], **kwargs) -> List[Dict]:
"""フィルター適用ロジック"""
filtered = products.copy()
# 商品タイプフィルター
if product_types := kwargs.get("product_types"):
filtered = [p for p in filtered if p.get("product_type") in product_types]
# 通貨フィルター
if currencies := kwargs.get("currencies"):
filtered = [p for p in filtered if p.get("currency") in currencies]
# 投資額フィルター
if investment_range := kwargs.get("investment_range"):
min_inv = investment_range.get("min")
max_inv = investment_range.get("max")
if min_inv is not None:
filtered = [p for p in filtered if (p.get("minimum_investment") or 0) >= min_inv]
if max_inv is not None:
filtered = [p for p in filtered if (p.get("minimum_investment") or 0) <= max_inv]
# リスクレベルフィルター
if risk_levels := kwargs.get("risk_levels"):
filtered = [p for p in filtered if p.get("risk_level") in risk_levels]
# 発行者キーワードフィルター
if issuer_keywords := kwargs.get("issuer_keywords"):
filtered = [p for p in filtered
if any(keyword.lower() in (p.get("issuer") or "").lower()
for keyword in issuer_keywords)]
# 自然言語クエリフィルター
if query := kwargs.get("query"):
query_lower = query.lower()
filtered = [p for p in filtered
if any(query_lower in str(p.get(field, "")).lower()
for field in ["product_name", "description", "issuer", "product_code"])]
return filtered
def _apply_sorting(self, products: List[Dict], sort_by: str) -> List[Dict]:
"""ソート適用ロジック"""
if sort_by == "investment_desc":
return sorted(products, key=lambda p: p.get("minimum_investment", 0), reverse=True)
elif sort_by == "investment_asc":
return sorted(products, key=lambda p: p.get("minimum_investment", 0))
elif sort_by == "risk_desc":
return sorted(products, key=lambda p: p.get("risk_level", 0), reverse=True)
elif sort_by == "risk_asc":
return sorted(products, key=lambda p: p.get("risk_level", 0))
else: # relevance or default
return products
def _get_applied_filters(self, **kwargs) -> Dict[str, Any]:
"""適用されたフィルター情報を返す"""
applied = {}
for key, value in kwargs.items():
if value is not None and value != [] and value != {}:
applied[key] = value
return applied
async def get_product_details(self, product_code: str) -> Dict[str, Any]:
"""商品詳細情報取得"""
try:
data = await self.get_all_products()
if "error" in data:
return data
products = data.get("products", [])
product = next((p for p in products if p.get("product_code") == product_code), None)
if not product:
return {
"success": False,
"error": f"Product with code '{product_code}' not found",
"available_codes": [p.get("product_code") for p in products[:5]] # 参考用に5件表示
}
return {
"success": True,
"product": product,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Get product details error: {e}")
return {"success": False, "error": str(e)}
async def get_product_statistics(self, **kwargs) -> Dict[str, Any]:
"""商品統計情報取得"""
try:
data = await self.get_all_products()
if "error" in data:
return data
products = data.get("products", [])
# フィルター適用
if filters := kwargs.get("filters"):
products = self._apply_filters(products, **filters)
# 統計情報生成
stats = {
"success": True,
"total_products": len(products),
"by_type": self._group_by_field(products, "product_type"),
"by_currency": self._group_by_field(products, "currency"),
"by_risk_level": self._group_by_field(products, "risk_level"),
"investment_stats": self._calculate_investment_stats(products),
"timestamp": datetime.now().isoformat()
}
# グループ化指定がある場合
if group_by := kwargs.get("group_by"):
stats["grouped_by"] = group_by
stats["groups"] = self._group_by_field(products, group_by)
return stats
except Exception as e:
logger.error(f"Get statistics error: {e}")
return {"success": False, "error": str(e)}
def _group_by_field(self, products: List[Dict], field: str) -> Dict[str, int]:
"""指定フィールドでグループ化"""
groups = {}
for product in products:
value = product.get(field, "Unknown")
groups[str(value)] = groups.get(str(value), 0) + 1
return groups
def _calculate_investment_stats(self, products: List[Dict]) -> Dict[str, Any]:
"""投資額統計計算"""
investments = [p.get("minimum_investment", 0) for p in products if p.get("minimum_investment")]
if not investments:
return {"count": 0, "message": "No investment data available"}
return {
"count": len(investments),
"min": min(investments),
"max": max(investments),
"average": round(sum(investments) / len(investments), 2)
}
async def run(self):
"""MCPサーバー実行"""
async with self:
# stdio transportでMCPサーバーを起動
from mcp.server.stdio import stdio_server
logger.info("ProductMaster MCP Server starting...")
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="productmaster-mcp",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
)
async def main():
"""MCPサーバーメイン関数"""
server = ProductMasterMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())