Skip to main content
Glama
wikidata_client.py10 kB
"""Wikidata API client implementation.""" import asyncio import json from typing import Any, Dict, List, Optional from urllib.parse import quote import httpx from .config import Config class WikidataClient: def __init__(self, config: Config) -> None: self.config = config self.session = httpx.AsyncClient( headers={"User-Agent": config.user_agent}, timeout=config.timeout, ) self._rate_limiter = asyncio.Semaphore(config.rate_limit) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.session.aclose() async def _make_request(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]: async with self._rate_limiter: try: response = await self.session.get(url, params=params) response.raise_for_status() return response.json() except httpx.TimeoutException: raise Exception(f"Request timed out after {self.config.timeout} seconds") except httpx.HTTPStatusError as e: if e.response.status_code == 429: raise Exception("Wikidata API is rate limiting requests. Please wait and try again.") elif e.response.status_code >= 500: raise Exception(f"Wikidata server error ({e.response.status_code}). Please try again later.") else: raise Exception(f"Wikidata API error {e.response.status_code}: {e.response.text[:200]}") except httpx.ConnectError: raise Exception(f"Cannot connect to Wikidata API. Check your internet connection.") async def search_entities( self, query: str, language: str = "en", limit: int = 10, type: Optional[str] = None ) -> Dict[str, Any]: params = { "action": "wbsearchentities", "search": query, "language": language, "limit": min(limit, self.config.max_results), "format": "json", } if type: params["type"] = type data = await self._make_request(self.config.wikibase_api_url, params) entities = [] for item in data.get("search", []): entities.append({ "id": item.get("id"), "label": item.get("label"), "description": item.get("description", ""), "url": f"https://www.wikidata.org/entity/{item.get('id')}" }) return {"entities": entities} async def get_entity( self, entity_id: str, language: str = "en", properties: Optional[List[str]] = None, simplified: bool = False ) -> Dict[str, Any]: params = { "action": "wbgetentities", "ids": entity_id, "languages": language, "format": "json", } if properties: params["props"] = "|".join(properties) data = await self._make_request(self.config.wikibase_api_url, params) if "entities" not in data or entity_id not in data["entities"]: raise ValueError(f"Entity {entity_id} not found") entity_data = data["entities"][entity_id] if simplified: return self._simplify_entity(entity_data, language) return {"entity": entity_data} def _simplify_entity(self, entity_data: Dict[str, Any], language: str) -> Dict[str, Any]: entity = { "id": entity_data.get("id"), "labels": {}, "descriptions": {}, "properties": {} } if "labels" in entity_data: entity["labels"] = { lang: label["value"] for lang, label in entity_data["labels"].items() } if "descriptions" in entity_data: entity["descriptions"] = { lang: desc["value"] for lang, desc in entity_data["descriptions"].items() } if "claims" in entity_data: for prop_id, claims in entity_data["claims"].items(): prop_values = [] for claim in claims: if "mainsnak" in claim and "datavalue" in claim["mainsnak"]: value = claim["mainsnak"]["datavalue"]["value"] if isinstance(value, dict) and "id" in value: prop_values.append({ "value": value["id"], "label": value.get("label", value["id"]) }) else: prop_values.append({"value": str(value)}) if prop_values: entity["properties"][prop_id] = prop_values return {"entity": entity} async def sparql_query( self, query: str, format: str = "json", limit: int = 100 ) -> Dict[str, Any]: if "LIMIT" not in query.upper(): query += f" LIMIT {min(limit, 1000)}" params = { "query": query, "format": format } try: response = await self.session.get( self.config.sparql_endpoint, params=params, headers={ "Accept": f"application/sparql-results+{format}", "User-Agent": self.config.user_agent } ) response.raise_for_status() if format == "json": return response.json() else: return {"result": response.text} except asyncio.TimeoutError: raise Exception(f"SPARQL query timed out after {self.config.timeout} seconds. Try simplifying your query or increasing the timeout.") except httpx.TimeoutException: raise Exception(f"SPARQL query timed out after {self.config.timeout} seconds. Try simplifying your query or increasing the timeout.") except httpx.HTTPStatusError as e: if e.response.status_code == 500: raise Exception(f"SPARQL server error (500). Your query may have syntax errors or be too complex: {query[:100]}...") elif e.response.status_code == 429: raise Exception("SPARQL service is rate limiting requests. Please wait and try again.") else: raise Exception(f"SPARQL HTTP error {e.response.status_code}: {e.response.text[:200]}") except httpx.ConnectError: raise Exception(f"Cannot connect to SPARQL endpoint {self.config.sparql_endpoint}. Check your internet connection.") except Exception as e: if "timeout" in str(e).lower(): raise Exception(f"SPARQL query timed out after {self.config.timeout} seconds. Try simplifying your query or increasing the timeout.") raise Exception(f"SPARQL query failed: {str(e)}") async def get_relations( self, entity_id: str, relation_type: str = "outgoing", property_filter: Optional[List[str]] = None, limit: int = 20 ) -> Dict[str, Any]: if relation_type == "outgoing": query = f""" SELECT ?property ?propertyLabel ?target ?targetLabel WHERE {{ wd:{entity_id} ?property ?target . ?prop wikibase:directClaim ?property . SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en" . }} }} LIMIT {limit} """ elif relation_type == "incoming": query = f""" SELECT ?property ?propertyLabel ?source ?sourceLabel WHERE {{ ?source ?property wd:{entity_id} . ?prop wikibase:directClaim ?property . SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en" . }} }} LIMIT {limit} """ else: raise ValueError(f"Invalid relation_type: {relation_type}") result = await self.sparql_query(query) relations = [] for binding in result.get("results", {}).get("bindings", []): relation = { "property": binding.get("property", {}).get("value", "").split("/")[-1], "property_label": binding.get("propertyLabel", {}).get("value", ""), "direction": relation_type } if relation_type == "outgoing": relation["target"] = { "id": binding.get("target", {}).get("value", "").split("/")[-1], "label": binding.get("targetLabel", {}).get("value", "") } else: relation["source"] = { "id": binding.get("source", {}).get("value", "").split("/")[-1], "label": binding.get("sourceLabel", {}).get("value", "") } relations.append(relation) return {"relations": relations} async def find_by_property( self, property: str, value: str, language: str = "en", limit: int = 10 ) -> Dict[str, Any]: query = f""" SELECT ?item ?itemLabel WHERE {{ ?item wdt:{property} "{value}" . SERVICE wikibase:label {{ bd:serviceParam wikibase:language "{language}" . }} }} LIMIT {limit} """ result = await self.sparql_query(query) entities = [] for binding in result.get("results", {}).get("bindings", []): entities.append({ "id": binding.get("item", {}).get("value", "").split("/")[-1], "label": binding.get("itemLabel", {}).get("value", "") }) return {"entities": entities}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/joelgombin/mcp-wikidata'

If you have feedback or need assistance with the MCP directory API, please join our Discord server