"""
PubChem API 客户端模块
"""
import asyncio
import logging
import os
from typing import Dict, List, Optional, Any
import aiohttp
from asyncio_throttle import Throttler
logger = logging.getLogger(__name__)
class PubChemClient:
"""PubChem REST API 客户端"""
BASE_URL = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
RATE_LIMIT = 5 # 每秒请求数限制
def __init__(self, rate_limit: int = RATE_LIMIT):
self.throttler = Throttler(rate_limit=rate_limit, period=1)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Referer': 'https://pubchem.ncbi.nlm.nih.gov/',
}
# 检查环境变量中的代理设置
proxy = None
if os.getenv('https_proxy') or os.getenv('http_proxy'):
proxy = os.getenv('https_proxy') or os.getenv('http_proxy')
logger.info(f"Using proxy: {proxy}")
connector = aiohttp.TCPConnector()
timeout = aiohttp.ClientTimeout(total=30)
self.session = aiohttp.ClientSession(
headers=headers,
connector=connector,
timeout=timeout,
proxy=proxy
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _make_request(self, url: str, max_retries: int = 3) -> Dict[str, Any]:
"""发送HTTP请求,带重试机制"""
async with self.throttler:
if not self.session:
raise RuntimeError("Client not initialized. Use async context manager.")
for attempt in range(max_retries):
try:
async with self.session.get(url) as response:
if response.status == 200:
return await response.json()
elif response.status == 503:
# 服务器繁忙,等待后重试
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2 # 递增等待时间
logger.warning(f"Server busy, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(wait_time)
continue
else:
logger.error(f"Server busy after {max_retries} attempts: {url}")
return {"error": f"Server busy after {max_retries} attempts"}
else:
logger.error(f"HTTP {response.status}: {url}")
return {"error": f"HTTP {response.status}"}
except Exception as e:
logger.error(f"Request failed (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return {"error": str(e)}
return {"error": "Max retries exceeded"}
async def get_compound_by_name(self, name: str) -> Dict[str, Any]:
"""根据化合物名称获取基础信息"""
url = f"{self.BASE_URL}/compound/name/{name}/property/MolecularFormula,MolecularWeight,IUPACName,IsomericSMILES,InChIKey/JSON"
return await self._make_request(url)
async def get_compound_cid(self, name: str) -> Optional[int]:
"""根据化合物名称获取CID"""
url = f"{self.BASE_URL}/compound/name/{name}/cids/JSON"
result = await self._make_request(url)
if "error" in result or not result:
return None
try:
cids = result.get("IdentifierList", {}).get("CID", [])
return cids[0] if cids else None
except (KeyError, IndexError):
return None
async def get_safety_info(self, cid: int) -> Dict[str, Any]:
"""获取GHS安全分类信息"""
# 使用PubChem PUG View API获取详细的安全信息
# 正确的URL格式:https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON
url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON"
result = await self._make_request(url)
if "error" in result:
return result
return self._parse_safety_info(result)
async def get_toxicity_data(self, cid: int) -> Dict[str, Any]:
"""获取毒性数据"""
# 使用PubChem PUG View API获取详细的毒性信息
url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON"
result = await self._make_request(url)
if "error" in result:
return result
return self._parse_toxicity_data(result)
def _parse_safety_info(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""解析安全信息 - 只设置有实际数据的字段"""
safety_info = {"cid": data.get("Record", {}).get("RecordNumber", 0)}
try:
sections = data.get("Record", {}).get("Section", [])
for section in sections:
heading = section.get("TOCHeading", "")
# 处理Safety and Hazards部分
if "Safety and Hazards" in heading:
subsections = section.get("Section", [])
for subsection in subsections:
sub_heading = subsection.get("TOCHeading", "")
if "Fire Fighting" in sub_heading:
items = self._extract_information_items(subsection)
if items: # 只保留有数据的部分
safety_info["fire_fighting"] = items
elif "Regulatory Information" in sub_heading:
items = self._extract_information_items(subsection)
if items:
safety_info["regulatory_info"] = items
elif "Other Safety Information" in sub_heading:
items = self._extract_information_items(subsection)
if items:
safety_info["other_safety_info"] = items
elif "GHS Classification" in sub_heading:
ghs_data = self._extract_ghs_classification(subsection)
if ghs_data:
safety_info.update(ghs_data)
elif "Hazard Statements" in sub_heading:
statements = self._extract_hazard_statements(subsection)
if statements:
safety_info["ghs_hazard_statements"] = statements
elif "Precautionary Statements" in sub_heading:
statements = self._extract_precautionary_statements(subsection)
if statements:
safety_info["precautionary_statements"] = statements
# 处理Primary Hazards部分 - 查找GHS象形图
elif "Primary Hazards" in heading:
ghs_pictograms = self._extract_ghs_pictograms(section)
if ghs_pictograms:
safety_info["ghs_pictograms"] = ghs_pictograms
# 处理Classification部分 - 查找UN GHS Classification
elif "Classification" in heading:
subsections = section.get("Section", [])
for subsection in subsections:
sub_heading = subsection.get("TOCHeading", "")
if "UN GHS Classification" in sub_heading:
items = self._extract_information_items(subsection)
if items:
safety_info["ghs_classification"] = items
except Exception as e:
logger.error(f"Error parsing safety info: {e}")
return safety_info
def _extract_ghs_pictograms(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
"""提取GHS象形图信息"""
pictograms = []
try:
for info in section.get("Information", []):
value = info.get("Value", {})
if isinstance(value, dict) and "StringWithMarkup" in value:
markup_list = value["StringWithMarkup"]
if isinstance(markup_list, list):
for markup in markup_list:
if isinstance(markup, dict) and "Markup" in markup:
for mark in markup["Markup"]:
if isinstance(mark, dict) and mark.get("Type") == "Icon":
pictogram = {
"code": mark.get("Extra", ""),
"url": mark.get("URL", ""),
"description": mark.get("Extra", "")
}
pictograms.append(pictogram)
except Exception as e:
logger.error(f"Error extracting GHS pictograms: {e}")
return pictograms
def _extract_information_items(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
"""提取信息项"""
items = []
try:
for info in section.get("Information", []):
# 提取StringValue或从StringWithMarkup中提取
value = info.get("StringValue", "")
if not value and "Value" in info:
value_obj = info["Value"]
if isinstance(value_obj, dict) and "StringWithMarkup" in value_obj:
markup_list = value_obj["StringWithMarkup"]
if isinstance(markup_list, list) and markup_list:
# 提取所有String值并连接
strings = []
for markup in markup_list:
if isinstance(markup, dict) and "String" in markup:
strings.append(markup["String"])
value = " ".join(strings)
item = {
"name": info.get("Name", "Unknown"),
"value": value,
"url": info.get("URL", ""),
"reference_number": info.get("ReferenceNumber", "")
}
items.append(item)
except Exception as e:
logger.error(f"Error extracting information items: {e}")
return items
def _extract_ghs_classification(self, subsection: Dict[str, Any]) -> Dict[str, Any]:
"""提取GHS分类信息"""
result = {"signal_word": None, "ghs_pictograms": []}
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Signal Word":
result["signal_word"] = item.get("StringValue", "")
elif item.get("Name") == "Pictogram":
pictograms = item.get("StringValueList", {}).get("String", [])
result["ghs_pictograms"] = pictograms
except Exception as e:
logger.error(f"Error extracting GHS classification: {e}")
return result
def _extract_hazard_statements(self, subsection: Dict[str, Any]) -> List[str]:
"""提取危害声明"""
statements = []
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Hazard Statement":
statements.extend(item.get("StringValueList", {}).get("String", []))
except Exception as e:
logger.error(f"Error extracting hazard statements: {e}")
return statements
def _extract_precautionary_statements(self, subsection: Dict[str, Any]) -> List[str]:
"""提取预防措施声明"""
statements = []
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Precautionary Statement":
statements.extend(item.get("StringValueList", {}).get("String", []))
except Exception as e:
logger.error(f"Error extracting precautionary statements: {e}")
return statements
def _parse_toxicity_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""解析毒性数据 - 递归提取所有Toxicity相关数据"""
toxicity_info = {"cid": data.get("Record", {}).get("RecordNumber", 0)}
try:
sections = data.get("Record", {}).get("Section", [])
for section in sections:
heading = section.get("TOCHeading", "")
# 找到Toxicity主section
if "Toxicity" == heading:
# 递归提取所有子部分的数据
all_toxicity_data = self._extract_toxicity_recursive(section)
toxicity_info.update(all_toxicity_data)
break
except Exception as e:
logger.error(f"Error parsing toxicity data: {e}")
return toxicity_info
def _extract_toxicity_recursive(self, section: Dict[str, Any], path: str = "") -> Dict[str, Any]:
"""递归提取Toxicity section的所有数据"""
result = {}
current_heading = section.get("TOCHeading", "")
# 如果当前section有Information,提取数据
if "Information" in section and section["Information"]:
items = self._extract_information_items(section)
if items:
# 使用更简洁的键名
key_name = self._normalize_section_name(current_heading)
result[key_name] = items
# 递归处理子sections
if "Section" in section:
for subsection in section["Section"]:
sub_result = self._extract_toxicity_recursive(subsection, current_heading)
# 合并结果,避免覆盖
for key, value in sub_result.items():
if key in result:
# 如果键已存在,合并数据(如果是列表)
if isinstance(result[key], list) and isinstance(value, list):
result[key].extend(value)
else:
result[key] = value
return result
def _normalize_section_name(self, heading: str) -> str:
"""将section名称转换为合适的字段名"""
# 移除特殊字符,转换为snake_case
import re
# 移除括号和斜杠
normalized = re.sub(r'[/()\-]', ' ', heading)
# 转换为snake_case
normalized = re.sub(r'\s+', '_', normalized.strip())
normalized = normalized.lower()
return normalized
def _extract_ecotoxicity(self, subsection: Dict[str, Any]) -> Dict[str, Any]:
"""提取生态毒性数据"""
ecotoxicity = {}
try:
for item in subsection.get("Information", []):
name = item.get("Name", "")
if "fish" in name.lower() and "LC50" in name:
ecotoxicity["fish_LC50"] = item.get("StringValue", "")
elif "daphnia" in name.lower():
ecotoxicity["daphnia_EC50"] = item.get("StringValue", "")
except Exception as e:
logger.error(f"Error extracting ecotoxicity: {e}")
return ecotoxicity
def _extract_carcinogenicity(self, subsection: Dict[str, Any]) -> Optional[str]:
"""提取致癌性信息"""
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Carcinogenicity":
return item.get("StringValue", "")
except Exception as e:
logger.error(f"Error extracting carcinogenicity: {e}")
return None
def _extract_reproductive_toxicity(self, subsection: Dict[str, Any]) -> Optional[str]:
"""提取生殖毒性信息"""
try:
for item in subsection.get("Information", []):
if item.get("Name") == "Reproductive Toxicity":
return item.get("StringValue", "")
except Exception as e:
logger.error(f"Error extracting reproductive toxicity: {e}")
return None