Skip to main content
Glama
liueic

PubChem Chemical Safety MCP Server

by liueic
mcp_server.py5.69 kB
""" PubChem Chemical Safety MCP Server 基于MCP协议的化学安全信息服务器 """ import logging from typing import Dict, Any, List from mcp.server import FastMCP from mcp.server.fastmcp import Context from pubchem_mcp.services.pubchem_client import PubChemClient from pubchem_mcp.services.cache_service import CacheService from pubchem_mcp.models.schemas import CompoundInfo, SafetyInfo, ToxicityData # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 创建FastMCP应用 app = FastMCP('pubchem-chemical-safety') @app.tool() async def get_compound_info(ctx: Context, name: str) -> Dict[str, Any]: """ 获取化合物基础信息 Args: name: 化合物名称 Returns: 包含CID、分子式、分子量等基础信息的字典 """ try: # 初始化缓存服务 cache_service = CacheService() await cache_service.initialize() # 检查缓存 cached_data = await cache_service.get_compound_info(name) if cached_data: await cache_service.close() return cached_data # 从PubChem获取数据 async with PubChemClient() as client: # 获取CID cid = await client.get_compound_cid(name) if not cid: await cache_service.close() return {"name": name, "error": "Compound not found"} # 获取基础信息 compound_data = await client.get_compound_by_name(name) if "error" in compound_data: await cache_service.close() return {"name": name, "error": compound_data["error"]} # 解析数据 info = _parse_compound_data(name, cid, compound_data) # 缓存结果 await cache_service.set_compound_info(name, info.model_dump()) await cache_service.close() return info.model_dump() except Exception as e: logger.error(f"Error getting compound info for {name}: {e}") return {"name": name, "error": str(e)} @app.tool() async def get_safety_info(ctx: Context, cid: int) -> Dict[str, Any]: """ 获取GHS安全分类信息 Args: cid: PubChem化合物ID Returns: 包含信号词、GHS象形图、危害声明等安全信息的字典 """ try: # 初始化缓存服务 cache_service = CacheService() await cache_service.initialize() # 检查缓存 cached_data = await cache_service.get_safety_info(cid) if cached_data: await cache_service.close() return {"cid": cid, **cached_data} # 从PubChem获取数据 async with PubChemClient() as client: safety_data = await client.get_safety_info(cid) if "error" in safety_data: await cache_service.close() return {"cid": cid, "error": safety_data["error"]} # 缓存结果 await cache_service.set_safety_info(cid, safety_data) await cache_service.close() return {"cid": cid, **safety_data} except Exception as e: logger.error(f"Error getting safety info for CID {cid}: {e}") return {"cid": cid, "error": str(e)} @app.tool() async def get_toxicity_data(ctx: Context, cid: int) -> Dict[str, Any]: """ 获取毒性实验数据 Args: cid: PubChem化合物ID Returns: 包含急性毒性、生态毒性、致癌性等毒性数据的字典 """ try: # 初始化缓存服务 cache_service = CacheService() await cache_service.initialize() # 检查缓存 cached_data = await cache_service.get_toxicity_data(cid) if cached_data: await cache_service.close() return {"cid": cid, **cached_data} # 从PubChem获取数据 async with PubChemClient() as client: toxicity_data = await client.get_toxicity_data(cid) if "error" in toxicity_data: await cache_service.close() return {"cid": cid, "error": toxicity_data["error"]} # 缓存结果 await cache_service.set_toxicity_data(cid, toxicity_data) await cache_service.close() return {"cid": cid, **toxicity_data} except Exception as e: logger.error(f"Error getting toxicity data for CID {cid}: {e}") return {"cid": cid, "error": str(e)} def _parse_compound_data(name: str, cid: int, data: Dict[str, Any]) -> CompoundInfo: """解析化合物数据""" try: properties = data.get("PropertyTable", {}).get("Properties", []) if not properties: return CompoundInfo(name=name, cid=cid) props = properties[0] return CompoundInfo( cid=cid, name=name, molecular_formula=props.get("MolecularFormula"), molecular_weight=props.get("MolecularWeight"), iupac_name=props.get("IUPACName"), smiles=props.get("IsomericSMILES"), inchi_key=props.get("InChIKey"), synonyms=[] # 需要额外API调用获取 ) except Exception as e: logger.error(f"Error parsing compound data: {e}") return CompoundInfo(name=name, cid=cid) if __name__ == "__main__": app.run(transport='stdio')

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liueic/PubChem-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server