legal_search_patents_by_keyword
Search patents by keyword across EP, US, or WO databases to retrieve matching records with title, applicant, publication date, and IPC codes in AI-ready format.
Instructions
Search patents by keyword across EP, US, or WO patent databases. Returns a list of matching patent records (up to 10) with title, applicant, publication date, and IPC codes in AI-Ready Markdown. Verified sources: EPO OPS (EP/WO), USPTO PatentsView (US). Token-efficient. Example: search_patents_by_keyword('CRISPR gene editing', 'EP', '2020-01-01') Parameters: date_from — ISO date string (YYYY-MM-DD), optional filter.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| keywords | Yes | ||
| jurisdiction | No | EP | |
| date_from | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- datanexus/tools/t11.py:469-628 (handler)The main handler function 'search_patents_by_keyword' — performs patent keyword search against EPO OPS and USPTO PatentsView APIs, with caching, circuit breaker, audit, and markdown formatting.
@mcp.tool() @with_timeout @verify_entitlement("T11") async def search_patents_by_keyword( keywords: str, jurisdiction: str = "EP", date_from: str = "", ) -> dict: """Use this to search for patents by keyword to find prior art before filing. Provide keywords and optional jurisdiction. Returns matching patents with numbers, titles, and filing dates.""" kw_clean = keywords.strip() juris_clean = jurisdiction.strip().upper() params = { "keywords": kw_clean, "jurisdiction": juris_clean, "date_from": date_from.strip(), } async with AuditContext("T11", params, "1.0") as _: _incr_calls("T11") phash = make_params_hash(params) cached = get_cached("T11", phash) if cached: return cached results: list[dict] = [] source_used = "" staleness: list[str] = [] # ── EPO OPS full-text search ────────────────────────────────────────── if juris_clean in ("EP", "WO") and not is_tripped("epo_ops"): token = _get_epo_token() if token: try: cql_parts = [f'txt="{kw_clean}"'] if juris_clean == "WO": cql_parts.append('pn=WO') if date_from: cql_parts.append(f'pd>={date_from.replace("-", "")}') cql = " AND ".join(cql_parts) url = f"{EPO_OPS_URL}/published-data/search/biblio" async with httpx.AsyncClient( timeout=_HTTP_TIMEOUT, headers=_HEADERS ) as client: resp = await client.get( url, params={"q": cql, "Range": "1-10"}, headers={"Authorization": f"Bearer {token}", "Accept": "application/json"}, ) resp.raise_for_status() _track_epo_bytes(len(resp.content)) data = resp.json() docs = ( data.get("ops:world-patent-data", {}) .get("ops:biblio-search", {}) .get("ops:search-result", {}) .get("ops:publication-reference", []) ) if isinstance(docs, dict): docs = [docs] for doc in docs[:10]: entry = _epo_doc_to_dict(doc) entry["jurisdiction"] = juris_clean results.append(entry) source_used = "EPO OPS" record_success_sync("epo_ops") except Exception as exc: log.warning("EPO search_patents_by_keyword failed: %s", exc) record_failure_sync("epo_ops") staleness.append(get_staleness_notice("epo_ops", "unknown")) # ── USPTO PatentsView fallback ──────────────────────────────────────── if not results and not is_tripped("patentsview"): try: q_parts: list[dict] = [{"_text_any": {"patent_title": kw_clean}}] if date_from: q_parts.append({"_gte": {"patent_date": date_from}}) query = {"_and": q_parts} if len(q_parts) > 1 else q_parts[0] payload = { "q": query, "f": [ "patent_id", "patent_title", "patent_date", "inventor_first_name", "inventor_last_name", "assignee_organization", ], "o": {"per_page": 10}, } async with httpx.AsyncClient( timeout=_HTTP_TIMEOUT, headers=_HEADERS ) as client: resp = await client.post( PATENTSVIEW_URL, json=payload, headers={"Content-Type": "application/json"}, ) resp.raise_for_status() data = resp.json() for p in (data.get("patents") or []): entry = _patentsview_doc_to_dict(p) entry["jurisdiction"] = "US" results.append(entry) source_used = "USPTO PatentsView" record_success_sync("patentsview") except Exception as exc: log.warning("PatentsView search_patents_by_keyword failed: %s", exc) record_failure_sync("patentsview") staleness.append(get_staleness_notice("patentsview", "unknown")) if not results: resp = error_response( ErrorCode.NOT_FOUND, f"No patent results found for '{kw_clean}'.", params, ) if staleness: resp["staleness_notices"] = staleness return resp rows = [] for r in results: t = r.get("title") or r.get("patent_title", "—") appl = (", ".join(r.get("applicants", r.get("assignees", []))) or "—")[:60] date = r.get("pub_date", r.get("date", "—")) ipc = ", ".join(r.get("ipc_codes", [])) or "—" pid = r.get("patent_number", r.get("patent_id", "")) rows.append(f"| {pid} | {t[:60]} | {appl} | {date} | {ipc} |") table = ( "| Patent | Title | Applicant | Published | IPC |\n" "|---|---|---|---|---|\n" + "\n".join(rows) ) md = f"""## Patent Search: {kw_clean} ({juris_clean}) **Source:** {source_used} **Results:** {len(results)} {table} {DISCLAIMER}""" _validate_canary(md) phash_out = make_params_hash(params) out = { "keywords": kw_clean, "jurisdiction": juris_clean, "count": len(results), "results": results, "source": source_used, "markdown": md, "disclaimer": DISCLAIMER, **standard_response_fields("T11", phash_out, "1.0"), } if staleness: out["staleness_notices"] = staleness set_cached("T11", phash_out, out, T11_TTL) return out - datanexus/tools/legal.py:1-19 (registration)The tool registration file — imports search_patents_by_keyword from t11.py and registers it on the 'DataNexus Legal' FastMCP server via legal.tool().
""" DataNexus Legal sub-server — T11 tools. Sprint 3 P01: mcp-tool registrations only. Tool logic lives in t11.py. """ from fastmcp import FastMCP from datanexus.tools.t11 import ( fetch_patent_by_number, search_patents_by_keyword, fetch_patent_citations, fetch_inventor_portfolio, ) legal = FastMCP("DataNexus Legal") legal.tool()(fetch_patent_by_number) legal.tool()(search_patents_by_keyword) legal.tool()(fetch_patent_citations) legal.tool()(fetch_inventor_portfolio) - datanexus/tools/meta.py:16-43 (registration)The TOOL_REGISTRY entry listing 'legal_search_patents_by_keyword' with its task description 'search for patents by keyword to find prior art' — used for keyword-based tool discovery.
TOOL_REGISTRY = [ {"name": "nonprofit_fetch_nonprofit_by_ein", "task": "research a US charity or nonprofit by EIN number"}, {"name": "nonprofit_search_nonprofits_by_name", "task": "search for nonprofits or charities by organisation name"}, {"name": "nonprofit_fetch_charity_uk", "task": "look up a UK registered charity by number or name"}, {"name": "security_fetch_package_vulnerabilities", "task": "check a software package for known CVEs and security vulnerabilities"}, {"name": "security_fetch_dependency_graph", "task": "get the full dependency tree for a software package"}, {"name": "security_fetch_cve_detail", "task": "get full detail on a specific CVE vulnerability by ID"}, {"name": "security_audit_sbom_vulnerabilities", "task": "audit a software bill of materials for known vulnerabilities"}, {"name": "security_fetch_package_licence", "task": "check the open source licence for a package version"}, {"name": "compliance_fetch_npi_provider", "task": "verify a US healthcare provider by NPI number"}, {"name": "compliance_search_npi_by_name", "task": "search for a healthcare provider by name and state"}, {"name": "compliance_fetch_finra_broker", "task": "verify a financial broker or advisor registration with FINRA"}, {"name": "compliance_check_sam_exclusion", "task": "check whether a person or company is excluded from federal contracting"}, {"name": "domain_fetch_domain_rdap", "task": "look up domain registration and ownership details"}, {"name": "domain_fetch_ssl_certificate_chain", "task": "inspect the SSL certificate chain for a domain"}, {"name": "domain_fetch_dns_records", "task": "get DNS records for a domain"}, {"name": "domain_fetch_domain_history", "task": "get historical SSL certificate records for a domain"}, {"name": "legal_fetch_patent_by_number", "task": "look up a specific patent by number across US EP or WO"}, {"name": "legal_search_patents_by_keyword", "task": "search for patents by keyword to find prior art"}, {"name": "legal_fetch_patent_citations", "task": "get forward and backward citation chains for a patent"}, {"name": "legal_fetch_inventor_portfolio", "task": "get all patents filed by a specific inventor or assignee"}, {"name": "govcon_search_contract_awards", "task": "search government contract awards by keyword or agency"}, {"name": "govcon_fetch_vendor_contract_history", "task": "get the full government contract history for a specific vendor"}, {"name": "govcon_fetch_open_solicitations", "task": "find currently open government procurement opportunities"}, {"name": "regulatory_search_open_rulemakings", "task": "find open regulatory rulemakings and comment periods"}, {"name": "regulatory_fetch_docket_details", "task": "get full details for a specific regulatory docket by ID"}, {"name": "regulatory_fetch_federal_register_notices","task": "fetch recent Federal Register notices for an agency"}, ] - datanexus/tools/t11.py:469-628 (schema)The function signature serves as the schema: accepts 'keywords' (str), 'jurisdiction' (str, default 'EP'), and 'date_from' (str, default ''). Returns a dict with markdown, results, source, disclaimer, and standard_response_fields.
@mcp.tool() @with_timeout @verify_entitlement("T11") async def search_patents_by_keyword( keywords: str, jurisdiction: str = "EP", date_from: str = "", ) -> dict: """Use this to search for patents by keyword to find prior art before filing. Provide keywords and optional jurisdiction. Returns matching patents with numbers, titles, and filing dates.""" kw_clean = keywords.strip() juris_clean = jurisdiction.strip().upper() params = { "keywords": kw_clean, "jurisdiction": juris_clean, "date_from": date_from.strip(), } async with AuditContext("T11", params, "1.0") as _: _incr_calls("T11") phash = make_params_hash(params) cached = get_cached("T11", phash) if cached: return cached results: list[dict] = [] source_used = "" staleness: list[str] = [] # ── EPO OPS full-text search ────────────────────────────────────────── if juris_clean in ("EP", "WO") and not is_tripped("epo_ops"): token = _get_epo_token() if token: try: cql_parts = [f'txt="{kw_clean}"'] if juris_clean == "WO": cql_parts.append('pn=WO') if date_from: cql_parts.append(f'pd>={date_from.replace("-", "")}') cql = " AND ".join(cql_parts) url = f"{EPO_OPS_URL}/published-data/search/biblio" async with httpx.AsyncClient( timeout=_HTTP_TIMEOUT, headers=_HEADERS ) as client: resp = await client.get( url, params={"q": cql, "Range": "1-10"}, headers={"Authorization": f"Bearer {token}", "Accept": "application/json"}, ) resp.raise_for_status() _track_epo_bytes(len(resp.content)) data = resp.json() docs = ( data.get("ops:world-patent-data", {}) .get("ops:biblio-search", {}) .get("ops:search-result", {}) .get("ops:publication-reference", []) ) if isinstance(docs, dict): docs = [docs] for doc in docs[:10]: entry = _epo_doc_to_dict(doc) entry["jurisdiction"] = juris_clean results.append(entry) source_used = "EPO OPS" record_success_sync("epo_ops") except Exception as exc: log.warning("EPO search_patents_by_keyword failed: %s", exc) record_failure_sync("epo_ops") staleness.append(get_staleness_notice("epo_ops", "unknown")) # ── USPTO PatentsView fallback ──────────────────────────────────────── if not results and not is_tripped("patentsview"): try: q_parts: list[dict] = [{"_text_any": {"patent_title": kw_clean}}] if date_from: q_parts.append({"_gte": {"patent_date": date_from}}) query = {"_and": q_parts} if len(q_parts) > 1 else q_parts[0] payload = { "q": query, "f": [ "patent_id", "patent_title", "patent_date", "inventor_first_name", "inventor_last_name", "assignee_organization", ], "o": {"per_page": 10}, } async with httpx.AsyncClient( timeout=_HTTP_TIMEOUT, headers=_HEADERS ) as client: resp = await client.post( PATENTSVIEW_URL, json=payload, headers={"Content-Type": "application/json"}, ) resp.raise_for_status() data = resp.json() for p in (data.get("patents") or []): entry = _patentsview_doc_to_dict(p) entry["jurisdiction"] = "US" results.append(entry) source_used = "USPTO PatentsView" record_success_sync("patentsview") except Exception as exc: log.warning("PatentsView search_patents_by_keyword failed: %s", exc) record_failure_sync("patentsview") staleness.append(get_staleness_notice("patentsview", "unknown")) if not results: resp = error_response( ErrorCode.NOT_FOUND, f"No patent results found for '{kw_clean}'.", params, ) if staleness: resp["staleness_notices"] = staleness return resp rows = [] for r in results: t = r.get("title") or r.get("patent_title", "—") appl = (", ".join(r.get("applicants", r.get("assignees", []))) or "—")[:60] date = r.get("pub_date", r.get("date", "—")) ipc = ", ".join(r.get("ipc_codes", [])) or "—" pid = r.get("patent_number", r.get("patent_id", "")) rows.append(f"| {pid} | {t[:60]} | {appl} | {date} | {ipc} |") table = ( "| Patent | Title | Applicant | Published | IPC |\n" "|---|---|---|---|---|\n" + "\n".join(rows) ) md = f"""## Patent Search: {kw_clean} ({juris_clean}) **Source:** {source_used} **Results:** {len(results)} {table} {DISCLAIMER}""" _validate_canary(md) phash_out = make_params_hash(params) out = { "keywords": kw_clean, "jurisdiction": juris_clean, "count": len(results), "results": results, "source": source_used, "markdown": md, "disclaimer": DISCLAIMER, **standard_response_fields("T11", phash_out, "1.0"), } if staleness: out["staleness_notices"] = staleness set_cached("T11", phash_out, out, T11_TTL) return out - datanexus/tools/t11.py:212-270 (helper)Helper '_epo_doc_to_dict' normalizes EPO OPS API responses to a standard patent dict format used by search_patents_by_keyword.
def _epo_doc_to_dict(doc: dict) -> dict: """Extract key fields from a single EPO OPS bibliographic document.""" bib = doc.get("bibliographic-data", doc) # Title title_obj = bib.get("invention-title", {}) if isinstance(title_obj, list): title_obj = title_obj[0] if title_obj else {} title = title_obj.get("$", title_obj.get("#text", "")) if isinstance(title_obj, dict) else str(title_obj) # Applicants/assignees parties = bib.get("parties", {}) applicants_raw = parties.get("applicants", {}).get("applicant", []) if isinstance(applicants_raw, dict): applicants_raw = [applicants_raw] applicants = [] for ap in applicants_raw: name_obj = ap.get("applicant-name", {}).get("name", {}) name = name_obj.get("$", "") if isinstance(name_obj, dict) else str(name_obj) if name: applicants.append(name) # Inventors inventors_raw = parties.get("inventors", {}).get("inventor", []) if isinstance(inventors_raw, dict): inventors_raw = [inventors_raw] inventors = [] for inv in inventors_raw: name_obj = inv.get("inventor-name", {}).get("name", {}) name = name_obj.get("$", "") if isinstance(name_obj, dict) else str(name_obj) if name: inventors.append(name) # Dates pub_ref = bib.get("publication-reference", {}).get("document-id", {}) if isinstance(pub_ref, list): pub_ref = pub_ref[0] if pub_ref else {} pub_date = pub_ref.get("date", {}) pub_date = pub_date.get("$", "") if isinstance(pub_date, dict) else str(pub_date) # IPC classification ipc_raw = bib.get("classifications-ipcr", {}).get("classification-ipcr", []) if isinstance(ipc_raw, dict): ipc_raw = [ipc_raw] ipc_codes = [] for ipc in ipc_raw[:3]: section = ipc.get("section", {}).get("$", "") cls = ipc.get("class", {}).get("$", "") sub = ipc.get("subclass", {}).get("$", "") if section: ipc_codes.append(f"{section}{cls}{sub}".strip()) return { "title": title, "applicants": applicants, "inventors": inventors, "pub_date": pub_date, "ipc_codes": ipc_codes, }