Skip to main content
Glama
liueic

PubChem Chemical Safety MCP Server

by liueic
pubchem_client.py17.4 kB
""" PubChem API 客户端模块 """ import asyncio import logging import os from typing import Dict, List, Optional, Any import aiohttp from asyncio_throttle import Throttler logger = logging.getLogger(__name__) class PubChemClient: """PubChem REST API 客户端""" BASE_URL = "https://pubchem.ncbi.nlm.nih.gov/rest/pug" RATE_LIMIT = 5 # 每秒请求数限制 def __init__(self, rate_limit: int = RATE_LIMIT): self.throttler = Throttler(rate_limit=rate_limit, period=1) self.session: Optional[aiohttp.ClientSession] = None async def __aenter__(self): headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Referer': 'https://pubchem.ncbi.nlm.nih.gov/', } # 检查环境变量中的代理设置 proxy = None if os.getenv('https_proxy') or os.getenv('http_proxy'): proxy = os.getenv('https_proxy') or os.getenv('http_proxy') logger.info(f"Using proxy: {proxy}") connector = aiohttp.TCPConnector() timeout = aiohttp.ClientTimeout(total=30) self.session = aiohttp.ClientSession( headers=headers, connector=connector, timeout=timeout, proxy=proxy ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def _make_request(self, url: str, max_retries: int = 3) -> Dict[str, Any]: """发送HTTP请求,带重试机制""" async with self.throttler: if not self.session: raise RuntimeError("Client not initialized. Use async context manager.") for attempt in range(max_retries): try: async with self.session.get(url) as response: if response.status == 200: return await response.json() elif response.status == 503: # 服务器繁忙,等待后重试 if attempt < max_retries - 1: wait_time = (attempt + 1) * 2 # 递增等待时间 logger.warning(f"Server busy, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})") await asyncio.sleep(wait_time) continue else: logger.error(f"Server busy after {max_retries} attempts: {url}") return {"error": f"Server busy after {max_retries} attempts"} else: logger.error(f"HTTP {response.status}: {url}") return {"error": f"HTTP {response.status}"} except Exception as e: logger.error(f"Request failed (attempt {attempt + 1}): {e}") if attempt < max_retries - 1: await asyncio.sleep(1) continue return {"error": str(e)} return {"error": "Max retries exceeded"} async def get_compound_by_name(self, name: str) -> Dict[str, Any]: """根据化合物名称获取基础信息""" url = f"{self.BASE_URL}/compound/name/{name}/property/MolecularFormula,MolecularWeight,IUPACName,IsomericSMILES,InChIKey/JSON" return await self._make_request(url) async def get_compound_cid(self, name: str) -> Optional[int]: """根据化合物名称获取CID""" url = f"{self.BASE_URL}/compound/name/{name}/cids/JSON" result = await self._make_request(url) if "error" in result or not result: return None try: cids = result.get("IdentifierList", {}).get("CID", []) return cids[0] if cids else None except (KeyError, IndexError): return None async def get_safety_info(self, cid: int) -> Dict[str, Any]: """获取GHS安全分类信息""" # 使用PubChem PUG View API获取详细的安全信息 # 正确的URL格式:https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON" result = await self._make_request(url) if "error" in result: return result return self._parse_safety_info(result) async def get_toxicity_data(self, cid: int) -> Dict[str, Any]: """获取毒性数据""" # 使用PubChem PUG View API获取详细的毒性信息 url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON" result = await self._make_request(url) if "error" in result: return result return self._parse_toxicity_data(result) def _parse_safety_info(self, data: Dict[str, Any]) -> Dict[str, Any]: """解析安全信息 - 只设置有实际数据的字段""" safety_info = {"cid": data.get("Record", {}).get("RecordNumber", 0)} try: sections = data.get("Record", {}).get("Section", []) for section in sections: heading = section.get("TOCHeading", "") # 处理Safety and Hazards部分 if "Safety and Hazards" in heading: subsections = section.get("Section", []) for subsection in subsections: sub_heading = subsection.get("TOCHeading", "") if "Fire Fighting" in sub_heading: items = self._extract_information_items(subsection) if items: # 只保留有数据的部分 safety_info["fire_fighting"] = items elif "Regulatory Information" in sub_heading: items = self._extract_information_items(subsection) if items: safety_info["regulatory_info"] = items elif "Other Safety Information" in sub_heading: items = self._extract_information_items(subsection) if items: safety_info["other_safety_info"] = items elif "GHS Classification" in sub_heading: ghs_data = self._extract_ghs_classification(subsection) if ghs_data: safety_info.update(ghs_data) elif "Hazard Statements" in sub_heading: statements = self._extract_hazard_statements(subsection) if statements: safety_info["ghs_hazard_statements"] = statements elif "Precautionary Statements" in sub_heading: statements = self._extract_precautionary_statements(subsection) if statements: safety_info["precautionary_statements"] = statements # 处理Primary Hazards部分 - 查找GHS象形图 elif "Primary Hazards" in heading: ghs_pictograms = self._extract_ghs_pictograms(section) if ghs_pictograms: safety_info["ghs_pictograms"] = ghs_pictograms # 处理Classification部分 - 查找UN GHS Classification elif "Classification" in heading: subsections = section.get("Section", []) for subsection in subsections: sub_heading = subsection.get("TOCHeading", "") if "UN GHS Classification" in sub_heading: items = self._extract_information_items(subsection) if items: safety_info["ghs_classification"] = items except Exception as e: logger.error(f"Error parsing safety info: {e}") return safety_info def _extract_ghs_pictograms(self, section: Dict[str, Any]) -> List[Dict[str, Any]]: """提取GHS象形图信息""" pictograms = [] try: for info in section.get("Information", []): value = info.get("Value", {}) if isinstance(value, dict) and "StringWithMarkup" in value: markup_list = value["StringWithMarkup"] if isinstance(markup_list, list): for markup in markup_list: if isinstance(markup, dict) and "Markup" in markup: for mark in markup["Markup"]: if isinstance(mark, dict) and mark.get("Type") == "Icon": pictogram = { "code": mark.get("Extra", ""), "url": mark.get("URL", ""), "description": mark.get("Extra", "") } pictograms.append(pictogram) except Exception as e: logger.error(f"Error extracting GHS pictograms: {e}") return pictograms def _extract_information_items(self, section: Dict[str, Any]) -> List[Dict[str, Any]]: """提取信息项""" items = [] try: for info in section.get("Information", []): # 提取StringValue或从StringWithMarkup中提取 value = info.get("StringValue", "") if not value and "Value" in info: value_obj = info["Value"] if isinstance(value_obj, dict) and "StringWithMarkup" in value_obj: markup_list = value_obj["StringWithMarkup"] if isinstance(markup_list, list) and markup_list: # 提取所有String值并连接 strings = [] for markup in markup_list: if isinstance(markup, dict) and "String" in markup: strings.append(markup["String"]) value = " ".join(strings) item = { "name": info.get("Name", "Unknown"), "value": value, "url": info.get("URL", ""), "reference_number": info.get("ReferenceNumber", "") } items.append(item) except Exception as e: logger.error(f"Error extracting information items: {e}") return items def _extract_ghs_classification(self, subsection: Dict[str, Any]) -> Dict[str, Any]: """提取GHS分类信息""" result = {"signal_word": None, "ghs_pictograms": []} try: for item in subsection.get("Information", []): if item.get("Name") == "Signal Word": result["signal_word"] = item.get("StringValue", "") elif item.get("Name") == "Pictogram": pictograms = item.get("StringValueList", {}).get("String", []) result["ghs_pictograms"] = pictograms except Exception as e: logger.error(f"Error extracting GHS classification: {e}") return result def _extract_hazard_statements(self, subsection: Dict[str, Any]) -> List[str]: """提取危害声明""" statements = [] try: for item in subsection.get("Information", []): if item.get("Name") == "Hazard Statement": statements.extend(item.get("StringValueList", {}).get("String", [])) except Exception as e: logger.error(f"Error extracting hazard statements: {e}") return statements def _extract_precautionary_statements(self, subsection: Dict[str, Any]) -> List[str]: """提取预防措施声明""" statements = [] try: for item in subsection.get("Information", []): if item.get("Name") == "Precautionary Statement": statements.extend(item.get("StringValueList", {}).get("String", [])) except Exception as e: logger.error(f"Error extracting precautionary statements: {e}") return statements def _parse_toxicity_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """解析毒性数据 - 递归提取所有Toxicity相关数据""" toxicity_info = {"cid": data.get("Record", {}).get("RecordNumber", 0)} try: sections = data.get("Record", {}).get("Section", []) for section in sections: heading = section.get("TOCHeading", "") # 找到Toxicity主section if "Toxicity" == heading: # 递归提取所有子部分的数据 all_toxicity_data = self._extract_toxicity_recursive(section) toxicity_info.update(all_toxicity_data) break except Exception as e: logger.error(f"Error parsing toxicity data: {e}") return toxicity_info def _extract_toxicity_recursive(self, section: Dict[str, Any], path: str = "") -> Dict[str, Any]: """递归提取Toxicity section的所有数据""" result = {} current_heading = section.get("TOCHeading", "") # 如果当前section有Information,提取数据 if "Information" in section and section["Information"]: items = self._extract_information_items(section) if items: # 使用更简洁的键名 key_name = self._normalize_section_name(current_heading) result[key_name] = items # 递归处理子sections if "Section" in section: for subsection in section["Section"]: sub_result = self._extract_toxicity_recursive(subsection, current_heading) # 合并结果,避免覆盖 for key, value in sub_result.items(): if key in result: # 如果键已存在,合并数据(如果是列表) if isinstance(result[key], list) and isinstance(value, list): result[key].extend(value) else: result[key] = value return result def _normalize_section_name(self, heading: str) -> str: """将section名称转换为合适的字段名""" # 移除特殊字符,转换为snake_case import re # 移除括号和斜杠 normalized = re.sub(r'[/()\-]', ' ', heading) # 转换为snake_case normalized = re.sub(r'\s+', '_', normalized.strip()) normalized = normalized.lower() return normalized def _extract_ecotoxicity(self, subsection: Dict[str, Any]) -> Dict[str, Any]: """提取生态毒性数据""" ecotoxicity = {} try: for item in subsection.get("Information", []): name = item.get("Name", "") if "fish" in name.lower() and "LC50" in name: ecotoxicity["fish_LC50"] = item.get("StringValue", "") elif "daphnia" in name.lower(): ecotoxicity["daphnia_EC50"] = item.get("StringValue", "") except Exception as e: logger.error(f"Error extracting ecotoxicity: {e}") return ecotoxicity def _extract_carcinogenicity(self, subsection: Dict[str, Any]) -> Optional[str]: """提取致癌性信息""" try: for item in subsection.get("Information", []): if item.get("Name") == "Carcinogenicity": return item.get("StringValue", "") except Exception as e: logger.error(f"Error extracting carcinogenicity: {e}") return None def _extract_reproductive_toxicity(self, subsection: Dict[str, Any]) -> Optional[str]: """提取生殖毒性信息""" try: for item in subsection.get("Information", []): if item.get("Name") == "Reproductive Toxicity": return item.get("StringValue", "") except Exception as e: logger.error(f"Error extracting reproductive toxicity: {e}") return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liueic/PubChem-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server