map_ids
Convert protein identifiers between different biological databases using UniProt's mapping service across 200+ supported namespaces.
Instructions
Map identifiers between UniProt-supported namespaces.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| from_db | Yes | ||
| to_db | Yes | ||
| ids | Yes |
Implementation Reference
- src/uniprot_mcp/server.py:244-271 (handler)The core handler function for the 'map_ids' tool, which also serves as its registration via the @mcp.tool() decorator. It filters input IDs, submits the mapping job to UniProt, polls for results, and returns a parsed MappingResult.@mcp.tool() # type: ignore[misc] async def map_ids( from_db: str, to_db: str, ids: list[str], ctx: Context[ServerSession, None] | None = None, ) -> MappingResult: """Map identifiers between UniProt-supported namespaces.""" filtered_ids = [identifier for identifier in ids if identifier] if not filtered_ids: return MappingResult(from_db=from_db, to_db=to_db, results={}) async with new_client() as client: job_id = await start_id_mapping(client, from_db=from_db, to_db=to_db, ids=filtered_ids) if ctx is not None: await ctx.info( f"Submitted UniProt ID mapping job ({from_db}->{to_db}) for {len(filtered_ids)} IDs." ) payload = await _poll_mapping_job(job_id, ctx=ctx) if ctx is not None: await ctx.info( f"Completed UniProt ID mapping job ({from_db}->{to_db}) for {len(filtered_ids)} IDs." ) return parse_mapping_result(payload, from_db=from_db, to_db=to_db)
- Pydantic BaseModel defining the structured output schema returned by the map_ids tool handler.class MappingResult(BaseModel): """Outcome of UniProt ID mapping operations.""" from_db: str = Field(description="Source identifier namespace.") to_db: str = Field(description="Target identifier namespace.") results: dict[str, list[str]] = Field( default_factory=dict, description="Mapping from input IDs to resolved identifiers (empty list for no match).", )
- src/uniprot_mcp/server.py:198-222 (helper)Internal helper function used by map_ids to poll the UniProt ID mapping job status until completion, with progress reporting and timeout.async def _poll_mapping_job( job_id: str, *, ctx: Context[ServerSession, None] | None = None, ) -> dict[str, Any]: """Poll the UniProt mapping job until completion or timeout.""" elapsed = 0.0 async with new_client() as client: while elapsed < MAPPING_MAX_WAIT: status = await get_mapping_status(client, job_id) if _mapping_is_complete(status): results = await get_mapping_results(client, job_id) return cast(dict[str, Any], results) elapsed += MAPPING_POLL_INTERVAL if ctx is not None: progress = min(1.0, elapsed / MAPPING_MAX_WAIT) await ctx.report_progress( progress=progress, total=1.0, message="Polling UniProt ID mapping job", ) await asyncio.sleep(MAPPING_POLL_INTERVAL) raise UniProtClientError("ID mapping timed out waiting for completion.")
- src/uniprot_mcp/server.py:224-242 (helper)Utility function to check if the UniProt mapping job status indicates completion or failure, used by _poll_mapping_job.def _mapping_is_complete(status: dict[str, Any]) -> bool: """Determine whether a mapping job has completed.""" complete_flags = { "FINISHED", "FINISHED_UNSUCCESSFULLY", "COMPLETE", "COMPLETED", "DONE", } failed_flags = {"FAILED", "ERROR", "CANCELLED"} if status.get("status") in failed_flags or status.get("jobStatus") in failed_flags: raise UniProtClientError(f"ID mapping failed with status: {status}") if status.get("status") in complete_flags: return True if status.get("jobStatus") in complete_flags: return True return bool(status.get("resultsReady") or status.get("ready"))
- Helper parser that transforms the raw JSON response from UniProt ID mapping into the structured MappingResult model used by the tool.def parse_mapping_result( js: dict[str, Any], *, from_db: str, to_db: str, ) -> MappingResult: """Convert an ID mapping response into MappingResult.""" mappings: dict[str, list[str]] = {} def register_result(source: str | None, targets: Iterable[Any]) -> None: if not source: return values: list[str] = [] for target in targets: if isinstance(target, dict): candidate = target.get("id") or target.get("identifier") or target.get("value") if candidate: values.append(str(candidate)) elif target is not None: values.append(str(target)) if source not in mappings: mappings[source] = [] mappings[source].extend(values) for item in js.get("results") or []: if not isinstance(item, dict): continue source = item.get("from") or item.get("fromId") to_value = item.get("to") or item.get("toId") or item.get("mappedTo") if isinstance(to_value, list): register_result(source, to_value) elif to_value is not None: register_result(source, [to_value]) else: register_result(source, []) # Some responses return an explicit mapping dictionary for source, value in (js.get("mappedResults") or {}).items(): if isinstance(value, list): register_result(source, value) else: register_result(source, [value]) # Ensure failed IDs are tracked with empty lists for failed in js.get("failedIds") or []: if failed not in mappings: mappings[failed] = [] # Normalise ordering and remove duplicates per ID for key, values in mappings.items(): deduped = list(dict.fromkeys(values)) mappings[key] = deduped return MappingResult(from_db=from_db, to_db=to_db, results=mappings)