Skip to main content
Glama

PubTator-MCP-Server

MIT License
8
  • Linux
  • Apple
pubtator_server.py4.93 kB
from typing import List, Dict, Optional, Any, Union import asyncio import logging from mcp.server.fastmcp import FastMCP from pubtator_search import PubTator3API # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Initialize FastMCP server mcp = FastMCP("pubtator") # Initialize PubTator3API client client = PubTator3API(max_retries=3, timeout=30) @mcp.tool() async def search_pubtator(query: str, max_pages: Optional[int] = 3) -> List[Dict[str, Any]]: logging.info(f"Searching PubTator with query: {query}, max_pages: {max_pages}") """ Search for papers on PubTator using a query string. Args: query: Search query string max_pages: Maximum number of pages to retrieve (default: 3) Returns: List of dictionaries containing paper information """ try: results = await asyncio.to_thread(list, client.search(query, max_pages=max_pages)) return results except Exception as e: logging.error(f"An error occurred while searching: {str(e)}") return [{"error": f"An error occurred while searching: {str(e)}"}] @mcp.tool() async def export_publications(ids: List[str], id_type: str = "pmid", format: str = "biocjson", full_text: bool = False) -> Union[Dict, str]: logging.info(f"Exporting publications: {ids}, id_type: {id_type}, format: {format}, full_text: {full_text}") """ Export publications from PubTator. Args: ids: List of publication IDs id_type: Type of IDs (pmid or pmcid) format: Export format (pubtator, biocxml, or biocjson) full_text: Whether to include full text (only for biocxml/biocjson) Returns: Dictionary or string containing exported publication data """ try: result = await asyncio.to_thread(client.export_publications, ids, id_type, format, full_text) return result except Exception as e: return {"error": f"An error occurred while exporting publications: {str(e)}"} @mcp.tool() async def find_entity_id(query: str, concept: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]: logging.info(f"Finding entity ID for query: {query}, concept: {concept}, limit: {limit}") """ Find entity ID using a free text query. Args: query: Query text concept: Optional, specify biological concept type limit: Optional, limit the number of returned results Returns: Dictionary containing entity IDs """ try: result = await asyncio.to_thread(client.find_entity_id, query, concept, limit) return result except Exception as e: return {"error": f"An error occurred while finding entity ID: {str(e)}"} @mcp.tool() async def find_related_entities(entity_id: str, relation_type: Optional[str] = None, target_entity_type: Optional[str] = None, max_results: Optional[int] = None) -> Dict[str, Any]: logging.info(f"Finding related entities for: {entity_id}, relation_type: {relation_type}, target_entity_type: {target_entity_type}, max_results: {max_results}") """ Find related entities. Args: entity_id: Entity ID relation_type: Optional, specify relation type target_entity_type: Optional, specify target entity type max_results: Optional, limit the number of returned results Returns: Dictionary containing related entities """ try: result = await asyncio.to_thread(client.find_related_entities, entity_id, relation_type, target_entity_type, max_results) return result except Exception as e: return {"error": f"An error occurred while finding related entities: {str(e)}"} @mcp.tool() async def batch_export_from_search(query: str, format: str = "biocjson", max_pages: Optional[int] = 3, full_text: bool = False, batch_size: int = 100) -> List[Union[Dict, str]]: logging.info(f"Batch exporting from search: {query}, format: {format}, max_pages: {max_pages}, full_text: {full_text}, batch_size: {batch_size}") """ Search and batch export publications. Args: query: Search query format: Export format max_pages: Maximum number of search pages full_text: Whether to export full text batch_size: Number of PMIDs to process in each batch Returns: List of dictionaries or strings containing exported publication data """ try: results = await asyncio.to_thread(list, client.batch_export_from_search(query, format, max_pages, full_text, batch_size)) return results except Exception as e: logging.error(f"An error occurred during batch export: {str(e)}") return [{"error": f"An error occurred during batch export: {str(e)}"}] if __name__ == "__main__": logging.info("Starting PubTator MCP server") # Initialize and run the server mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JackKuo666/PubTator-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server