Skip to main content
Glama
takehig

ProductMaster MCP Server

by takehig
mcp_server.py16.9 kB
#!/usr/bin/env python3 """ ProductMaster MCP Server - 標準MCPプロトコル実装 自然言語での曖昧検索に対応した商品情報管理システム """ import asyncio import json import logging import sys from typing import Any, Dict, List, Optional from datetime import datetime # MCP標準ライブラリ from mcp.server.models import InitializationOptions from mcp.server import NotificationOptions, Server from mcp.types import ( Resource, Tool, TextContent, ImageContent, EmbeddedResource, LoggingLevel ) from pydantic import AnyUrl import httpx # ログ設定 logging.basicConfig(level=logging.INFO) logger = logging.getLogger("productmaster-mcp") # ProductMaster APIベースURL PRODUCTMASTER_BASE_URL = "http://localhost:8001" class ProductMasterMCPServer: """ProductMaster MCP Server - 標準MCPプロトコル実装""" def __init__(self): self.server = Server("productmaster-mcp") self.http_client = None self.setup_handlers() async def __aenter__(self): self.http_client = httpx.AsyncClient(timeout=30.0) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.http_client: await self.http_client.aclose() def setup_handlers(self): """MCPハンドラー設定""" @self.server.list_tools() async def handle_list_tools() -> list[Tool]: """MCPツール一覧を返す""" return [ Tool( name="search_products_flexible", description="柔軟な商品検索 - 自然言語クエリや複数条件での商品検索。例: 'ドル建てで満期日が2027年頃の債券'", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "自然言語での検索クエリ。商品名、発行者、説明文からキーワード検索" }, "product_types": { "type": "array", "items": {"type": "string", "enum": ["bond", "stock", "fund"]}, "description": "商品タイプフィルター: bond(債券), stock(株式), fund(投資信託)" }, "currencies": { "type": "array", "items": {"type": "string"}, "description": "通貨フィルター: USD, JPY, EUR等" }, "investment_range": { "type": "object", "properties": { "min": {"type": "number", "description": "最小投資額"}, "max": {"type": "number", "description": "最大投資額"} }, "description": "最小投資額範囲でフィルタリング" }, "risk_levels": { "type": "array", "items": {"type": "integer", "minimum": 1, "maximum": 5}, "description": "リスクレベルフィルター: 1(低リスク) - 5(高リスク)" }, "issuer_keywords": { "type": "array", "items": {"type": "string"}, "description": "発行者キーワード: Apple, トヨタ, 日本国等" }, "sort_by": { "type": "string", "enum": ["relevance", "investment_asc", "investment_desc", "risk_asc", "risk_desc"], "default": "relevance", "description": "ソート順序: relevance(関連度), investment_desc(投資額降順), risk_desc(リスク降順)等" }, "limit": { "type": "integer", "default": 10, "minimum": 1, "maximum": 50, "description": "取得件数上限" } }, "required": [] } ), Tool( name="get_product_details", description="商品詳細情報取得 - 商品コードを指定して詳細情報を取得", inputSchema={ "type": "object", "properties": { "product_code": { "type": "string", "description": "商品コード (例: AAPL, JGB394-10Y, TOYOTA50)" } }, "required": ["product_code"] } ), Tool( name="get_product_statistics", description="商品統計情報取得 - 商品タイプ別、通貨別、リスクレベル別の統計情報", inputSchema={ "type": "object", "properties": { "group_by": { "type": "string", "enum": ["product_type", "currency", "risk_level", "issuer"], "description": "グループ化基準: product_type(商品タイプ別), currency(通貨別), risk_level(リスクレベル別), issuer(発行者別)" }, "filters": { "type": "object", "description": "統計対象を絞り込むためのフィルター条件" } }, "required": [] } ) ] @self.server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: """MCPツール呼び出し処理""" try: logger.info(f"Tool called: {name} with arguments: {arguments}") if name == "search_products_flexible": result = await self.search_products_flexible(**arguments) elif name == "get_product_details": result = await self.get_product_details(**arguments) elif name == "get_product_statistics": result = await self.get_product_statistics(**arguments) else: raise ValueError(f"Unknown tool: {name}") # 結果をJSON形式で返す response_text = json.dumps(result, ensure_ascii=False, indent=2) logger.info(f"Tool {name} completed successfully") return [TextContent(type="text", text=response_text)] except Exception as e: error_msg = f"Tool {name} error: {str(e)}" logger.error(error_msg) return [TextContent(type="text", text=json.dumps({"error": error_msg}, ensure_ascii=False))] async def get_all_products(self) -> Dict[str, Any]: """全商品データ取得""" try: response = await self.http_client.get(f"{PRODUCTMASTER_BASE_URL}/api/products") response.raise_for_status() return response.json() except Exception as e: logger.error(f"Failed to fetch products: {e}") return {"error": str(e), "products": []} async def search_products_flexible(self, **kwargs) -> Dict[str, Any]: """柔軟な商品検索実装""" try: # ProductMaster APIから全商品取得 data = await self.get_all_products() if "error" in data: return data products = data.get("products", []) # フィルタリング適用 filtered_products = self._apply_filters(products, **kwargs) # ソート適用 sorted_products = self._apply_sorting(filtered_products, kwargs.get("sort_by", "relevance")) # 件数制限 limit = kwargs.get("limit", 10) limited_products = sorted_products[:limit] return { "success": True, "products": limited_products, "total_found": len(filtered_products), "returned": len(limited_products), "filters_applied": self._get_applied_filters(**kwargs), "query": kwargs.get("query", ""), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Search error: {e}") return {"success": False, "error": str(e)} def _apply_filters(self, products: List[Dict], **kwargs) -> List[Dict]: """フィルター適用ロジック""" filtered = products.copy() # 商品タイプフィルター if product_types := kwargs.get("product_types"): filtered = [p for p in filtered if p.get("product_type") in product_types] # 通貨フィルター if currencies := kwargs.get("currencies"): filtered = [p for p in filtered if p.get("currency") in currencies] # 投資額フィルター if investment_range := kwargs.get("investment_range"): min_inv = investment_range.get("min") max_inv = investment_range.get("max") if min_inv is not None: filtered = [p for p in filtered if (p.get("minimum_investment") or 0) >= min_inv] if max_inv is not None: filtered = [p for p in filtered if (p.get("minimum_investment") or 0) <= max_inv] # リスクレベルフィルター if risk_levels := kwargs.get("risk_levels"): filtered = [p for p in filtered if p.get("risk_level") in risk_levels] # 発行者キーワードフィルター if issuer_keywords := kwargs.get("issuer_keywords"): filtered = [p for p in filtered if any(keyword.lower() in (p.get("issuer") or "").lower() for keyword in issuer_keywords)] # 自然言語クエリフィルター if query := kwargs.get("query"): query_lower = query.lower() filtered = [p for p in filtered if any(query_lower in str(p.get(field, "")).lower() for field in ["product_name", "description", "issuer", "product_code"])] return filtered def _apply_sorting(self, products: List[Dict], sort_by: str) -> List[Dict]: """ソート適用ロジック""" if sort_by == "investment_desc": return sorted(products, key=lambda p: p.get("minimum_investment", 0), reverse=True) elif sort_by == "investment_asc": return sorted(products, key=lambda p: p.get("minimum_investment", 0)) elif sort_by == "risk_desc": return sorted(products, key=lambda p: p.get("risk_level", 0), reverse=True) elif sort_by == "risk_asc": return sorted(products, key=lambda p: p.get("risk_level", 0)) else: # relevance or default return products def _get_applied_filters(self, **kwargs) -> Dict[str, Any]: """適用されたフィルター情報を返す""" applied = {} for key, value in kwargs.items(): if value is not None and value != [] and value != {}: applied[key] = value return applied async def get_product_details(self, product_code: str) -> Dict[str, Any]: """商品詳細情報取得""" try: data = await self.get_all_products() if "error" in data: return data products = data.get("products", []) product = next((p for p in products if p.get("product_code") == product_code), None) if not product: return { "success": False, "error": f"Product with code '{product_code}' not found", "available_codes": [p.get("product_code") for p in products[:5]] # 参考用に5件表示 } return { "success": True, "product": product, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Get product details error: {e}") return {"success": False, "error": str(e)} async def get_product_statistics(self, **kwargs) -> Dict[str, Any]: """商品統計情報取得""" try: data = await self.get_all_products() if "error" in data: return data products = data.get("products", []) # フィルター適用 if filters := kwargs.get("filters"): products = self._apply_filters(products, **filters) # 統計情報生成 stats = { "success": True, "total_products": len(products), "by_type": self._group_by_field(products, "product_type"), "by_currency": self._group_by_field(products, "currency"), "by_risk_level": self._group_by_field(products, "risk_level"), "investment_stats": self._calculate_investment_stats(products), "timestamp": datetime.now().isoformat() } # グループ化指定がある場合 if group_by := kwargs.get("group_by"): stats["grouped_by"] = group_by stats["groups"] = self._group_by_field(products, group_by) return stats except Exception as e: logger.error(f"Get statistics error: {e}") return {"success": False, "error": str(e)} def _group_by_field(self, products: List[Dict], field: str) -> Dict[str, int]: """指定フィールドでグループ化""" groups = {} for product in products: value = product.get(field, "Unknown") groups[str(value)] = groups.get(str(value), 0) + 1 return groups def _calculate_investment_stats(self, products: List[Dict]) -> Dict[str, Any]: """投資額統計計算""" investments = [p.get("minimum_investment", 0) for p in products if p.get("minimum_investment")] if not investments: return {"count": 0, "message": "No investment data available"} return { "count": len(investments), "min": min(investments), "max": max(investments), "average": round(sum(investments) / len(investments), 2) } async def run(self): """MCPサーバー実行""" async with self: # stdio transportでMCPサーバーを起動 from mcp.server.stdio import stdio_server logger.info("ProductMaster MCP Server starting...") async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="productmaster-mcp", server_version="1.0.0", capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={} ) ) ) async def main(): """MCPサーバーメイン関数""" server = ProductMasterMCPServer() await server.run() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/takehig/ProductMaster-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server