We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/liueic/PubChem-MCP-Server'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
PubChem API 客户端模块
"""
import asyncio
import logging
import os
from typing import Dict, List, Optional, Any
import aiohttp
from asyncio_throttle import Throttler
logger = logging.getLogger(__name__)
class PubChemClient:
"""PubChem REST API 客户端"""
BASE_URL = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
RATE_LIMIT = 5 # 每秒请求数限制
def __init__(self, rate_limit: int = RATE_LIMIT):
self.throttler = Throttler(rate_limit=rate_limit, period=1)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Referer': 'https://pubchem.ncbi.nlm.nih.gov/',
}
# 检查环境变量中的代理设置
proxy = None
if os.getenv('https_proxy') or os.getenv('http_proxy'):
proxy = os.getenv('https_proxy') or os.getenv('http_proxy')
logger.info(f"Using proxy: {proxy}")
connector = aiohttp.TCPConnector()
timeout = aiohttp.ClientTimeout(total=30)
self.session = aiohttp.ClientSession(
headers=headers,
connector=connector,
timeout=timeout,
proxy=proxy
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _make_request(self, url: str, max_retries: int = 3) -> Dict[str, Any]:
"""发送HTTP请求,带重试机制"""
async with self.throttler:
if not self.session:
raise RuntimeError("Client not initialized. Use async context manager.")
for attempt in range(max_retries):
try:
async with self.session.get(url) as response:
if response.status == 200:
return await response.json()
elif response.status == 503:
# 服务器繁忙,等待后重试
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2 # 递增等待时间
logger.warning(f"Server busy, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(wait_time)
continue
else:
logger.error(f"Server busy after {max_retries} attempts: {url}")
return {"error": f"Server busy after {max_retries} attempts"}
else:
logger.error(f"HTTP {response.status}: {url}")
return {"error": f"HTTP {response.status}"}
except Exception as e:
logger.error(f"Request failed (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return {"error": str(e)}
return {"error": "Max retries exceeded"}
async def get_compound_by_name(self, name: str) -> Dict[str, Any]:
"""根据化合物名称获取基础信息"""
url = f"{self.BASE_URL}/compound/name/{name}/property/MolecularFormula,MolecularWeight,IUPACName,IsomericSMILES,InChIKey/JSON"
return await self._make_request(url)
async def get_compound_cid(self, name: str) -> Optional[int]:
"""根据化合物名称获取CID"""
url = f"{self.BASE_URL}/compound/name/{name}/cids/JSON"
result = await self._make_request(url)
if "error" in result or not result:
return None
try:
cids = result.get("IdentifierList", {}).get("CID", [])
return cids[0] if cids else None
except (KeyError, IndexError):
return None
async def get_safety_info(self, cid: int) -> Dict[str, Any]:
"""获取GHS安全分类信息"""
# 使用PubChem PUG View API获取详细的安全信息
# 正确的URL格式:https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON
url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON"
result = await self._make_request(url)
if "error" in result:
return result
return self._parse_safety_info(result)
async def get_toxicity_data(self, cid: int) -> Dict[str, Any]:
"""获取毒性数据"""
# 使用PubChem PUG View API获取详细的毒性信息
url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON"
result = await self._make_request(url)
if "error" in result:
return result
return self._parse_toxicity_data(result)
def _parse_safety_info(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""解析安全信息 - 只设置有实际数据的字段"""
safety_info = {"cid": data.get("Record", {}).get("RecordNumber", 0)}
try:
sections = data.get("Record", {}).get("Section", [])
for section in sections:
heading = section.get("TOCHeading", "")
# 处理Safety and Hazards部分
if "Safety and Hazards" in heading:
subsections = section.get("Section", [])
for subsection in subsections:
sub_heading = subsection.get("TOCHeading", "")
if "Fire Fighting" in sub_heading:
items = self._extract_information_items(subsection)
if items: # 只保留有数据的部分
safety_info["fire_fighting"] = items
elif "Regulatory Information" in sub_heading:
items = self._extract_information_items(subsection)
if items:
safety_info["regulatory_info"] = items
elif "Other Safety Information" in sub_heading:
items = self._extract_information_items(subsection)
if items:
safety_info["other_safety_info"] = items
elif "GHS Classification" in sub_heading:
ghs_data = self._extract_ghs_classification(subsection)
if ghs_data:
safety_info.update(ghs_data)
elif "Hazard Statements" in sub_heading:
statements = self._extract_hazard_statements(subsection)
if statements:
safety_info["ghs_hazard_statements"] = statements
elif "Precautionary Statements" in sub_heading:
statements = self._extract_precautionary_statements(subsection)
if statements:
safety_info["precautionary_statements"] = statements
# 处理Primary Hazards部分 - 查找GHS象形图
elif "Primary Hazards" in heading:
ghs_pictograms = self._extract_ghs_pictograms(section)
if ghs_pictograms:
safety_info["ghs_pictograms"] = ghs_pictograms
# 处理Classification部分 - 查找UN GHS Classification
elif "Classification" in heading:
subsections = section.get("Section", [])
for subsection in subsections:
sub_heading = subsection.get("TOCHeading", "")
if "UN GHS Classification" in sub_heading:
items = self._extract_information_items(subsection)
if items:
safety_info["ghs_classification"] = items
except Exception as e:
logger.error(f"Error parsing safety info: {e}")
return safety_info
def _extract_ghs_pictograms(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
"""提取GHS象形图信息"""
pictograms = []
try:
for info in section.get("Information", []):
value = info.get("Value", {})
if isinstance(value, dict) and "StringWithMarkup" in value:
markup_list = value["StringWithMarkup"]
if isinstance(markup_list, list):
for markup in markup_list:
if isinstance(markup, dict) and "Markup" in markup:
for mark in markup["Markup"]:
if isinstance(mark, dict) and mark.get("Type") == "Icon":
pictogram = {
"code": mark.get("Extra", ""),
"url": mark.get("URL", ""),
"description": mark.get("Extra", "")
}
pictograms.append(pictogram)
except Exception as e:
logger.error(f"Error extracting GHS pictograms: {e}")
return pictograms
def _extract_information_items(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
"""提取信息项"""
items = []
try:
for info in section.get("Information", []):
# 提取StringValue或从StringWithMarkup中提取
value = info.get("StringValue", "")
if not value and "Value" in info:
value_obj = info["Value"]
if isinstance(value_obj, dict) and "StringWithMarkup" in value_obj:
markup_list = value_obj["StringWithMarkup"]
if isinstance(markup_list, list) and markup_list:
# 提取所有String值并连接
strings = []
for markup in markup_list:
if isinstance(markup, dict) and "String" in markup:
strings.append(markup["String"])
value = " ".join(strings)
item = {
"name": info.get("Name", "Unknown"),
"value": value,
"url": info.get("URL", ""),
"reference_number": info.get("ReferenceNumber", "")
}
items.append(item)
except Exception as e:
logger.error(f"Error extracting information items: {e}")
return items
def _extract_ghs_classification(self, subsection: Dict[str, Any]) -> Dict[str, Any]:
"""提取GHS分类信息"""
result = {"signal_word": None, "ghs_pictograms": []}
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Signal Word":
result["signal_word"] = item.get("StringValue", "")
elif item.get("Name") == "Pictogram":
pictograms = item.get("StringValueList", {}).get("String", [])
result["ghs_pictograms"] = pictograms
except Exception as e:
logger.error(f"Error extracting GHS classification: {e}")
return result
def _extract_hazard_statements(self, subsection: Dict[str, Any]) -> List[str]:
"""提取危害声明"""
statements = []
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Hazard Statement":
statements.extend(item.get("StringValueList", {}).get("String", []))
except Exception as e:
logger.error(f"Error extracting hazard statements: {e}")
return statements
def _extract_precautionary_statements(self, subsection: Dict[str, Any]) -> List[str]:
"""提取预防措施声明"""
statements = []
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Precautionary Statement":
statements.extend(item.get("StringValueList", {}).get("String", []))
except Exception as e:
logger.error(f"Error extracting precautionary statements: {e}")
return statements
def _parse_toxicity_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""解析毒性数据 - 递归提取所有Toxicity相关数据"""
toxicity_info = {"cid": data.get("Record", {}).get("RecordNumber", 0)}
try:
sections = data.get("Record", {}).get("Section", [])
for section in sections:
heading = section.get("TOCHeading", "")
# 找到Toxicity主section
if "Toxicity" == heading:
# 递归提取所有子部分的数据
all_toxicity_data = self._extract_toxicity_recursive(section)
toxicity_info.update(all_toxicity_data)
break
except Exception as e:
logger.error(f"Error parsing toxicity data: {e}")
return toxicity_info
def _extract_toxicity_recursive(self, section: Dict[str, Any], path: str = "") -> Dict[str, Any]:
"""递归提取Toxicity section的所有数据"""
result = {}
current_heading = section.get("TOCHeading", "")
# 如果当前section有Information,提取数据
if "Information" in section and section["Information"]:
items = self._extract_information_items(section)
if items:
# 使用更简洁的键名
key_name = self._normalize_section_name(current_heading)
result[key_name] = items
# 递归处理子sections
if "Section" in section:
for subsection in section["Section"]:
sub_result = self._extract_toxicity_recursive(subsection, current_heading)
# 合并结果,避免覆盖
for key, value in sub_result.items():
if key in result:
# 如果键已存在,合并数据(如果是列表)
if isinstance(result[key], list) and isinstance(value, list):
result[key].extend(value)
else:
result[key] = value
return result
def _normalize_section_name(self, heading: str) -> str:
"""将section名称转换为合适的字段名"""
# 移除特殊字符,转换为snake_case
import re
# 移除括号和斜杠
normalized = re.sub(r'[/()\-]', ' ', heading)
# 转换为snake_case
normalized = re.sub(r'\s+', '_', normalized.strip())
normalized = normalized.lower()
return normalized
def _extract_ecotoxicity(self, subsection: Dict[str, Any]) -> Dict[str, Any]:
"""提取生态毒性数据"""
ecotoxicity = {}
try:
for item in subsection.get("Information", []):
name = item.get("Name", "")
if "fish" in name.lower() and "LC50" in name:
ecotoxicity["fish_LC50"] = item.get("StringValue", "")
elif "daphnia" in name.lower():
ecotoxicity["daphnia_EC50"] = item.get("StringValue", "")
except Exception as e:
logger.error(f"Error extracting ecotoxicity: {e}")
return ecotoxicity
def _extract_carcinogenicity(self, subsection: Dict[str, Any]) -> Optional[str]:
"""提取致癌性信息"""
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Carcinogenicity":
return item.get("StringValue", "")
except Exception as e:
logger.error(f"Error extracting carcinogenicity: {e}")
return None
def _extract_reproductive_toxicity(self, subsection: Dict[str, Any]) -> Optional[str]:
"""提取生殖毒性信息"""
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Reproductive Toxicity":
return item.get("StringValue", "")
except Exception as e:
logger.error(f"Error extracting reproductive toxicity: {e}")
return None