research
Search scientific literature and receive a cited Markdown report synthesizing findings from multiple research papers.
Instructions
Submit a scientific question to the SPARKIT research agent.
SPARKIT searches the literature, reads relevant papers, and returns a cited Markdown report. Best for questions where a correct answer requires synthesizing across multiple primary sources.
Args:
question: Free-text scientific question. Be specific —
"Which kinases are upregulated in pancreatic cancer with
evidence from human tissue?" works better than "tell me
about pancreatic cancer."
response_format: "full" (default) for a multi-paragraph
Markdown report, or "brief" for a tighter summary.
include_citations: Keep True (default) so the report is
usable for downstream work; only set False if you
specifically want unsourced prose.
max_wait_seconds: How long to block waiting for the job before
returning the job_id with instructions to poll via
get_job_status. Default 240s (4 min). Range 30-540.
Returns the cited Markdown report on success. If the job is still
running at the wait limit, returns the job_id and status so the
caller can resume with get_job_status.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| question | Yes | ||
| response_format | No | full | |
| include_citations | No | ||
| max_wait_seconds | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- sparkit_mcp/server.py:130-178 (handler)The main handler for the 'research' MCP tool. Defined as an async function decorated with @mcp.tool(). It accepts a question, response_format, include_citations, and max_wait_seconds; validates inputs, submits the research job via SparkitClient.submit_research(), and polls until completion or timeout via _await_completion().
@mcp.tool() async def research( question: str, response_format: str = "full", include_citations: bool = True, max_wait_seconds: int = _DEFAULT_MAX_WAIT_SECONDS, ) -> str: """Submit a scientific question to the SPARKIT research agent. SPARKIT searches the literature, reads relevant papers, and returns a cited Markdown report. Best for questions where a correct answer requires synthesizing across multiple primary sources. Args: question: Free-text scientific question. Be specific — "Which kinases are upregulated in pancreatic cancer with evidence from human tissue?" works better than "tell me about pancreatic cancer." response_format: ``"full"`` (default) for a multi-paragraph Markdown report, or ``"brief"`` for a tighter summary. include_citations: Keep ``True`` (default) so the report is usable for downstream work; only set ``False`` if you specifically want unsourced prose. max_wait_seconds: How long to block waiting for the job before returning the job_id with instructions to poll via ``get_job_status``. Default 240s (4 min). Range 30-540. Returns the cited Markdown report on success. If the job is still running at the wait limit, returns the job_id and status so the caller can resume with ``get_job_status``. """ if not question or not question.strip(): return "Error: `question` is required and cannot be empty." wait = max(_MIN_MAX_WAIT_SECONDS, min(_MAX_MAX_WAIT_SECONDS, max_wait_seconds)) if response_format not in ("full", "brief"): return "Error: `response_format` must be 'full' or 'brief'." try: async with SparkitClient() as client: job = await client.submit_research( question.strip(), response_format=response_format, include_citations=include_citations, ) logger.info("Submitted SPARKIT job %s", job.job_id) return await _await_completion(client, job, wait) except SparkitAPIError as e: return _format_api_error(e) - sparkit_mcp/server.py:130-130 (registration)The @mcp.tool() decorator on the 'research' function registers it as an MCP tool with the FastMCP server instance.
@mcp.tool() - sparkit_mcp/server.py:131-136 (schema)The type signature and docstring (including Args) define the input schema for the research tool: question (str, required), response_format (str, default 'full'), include_citations (bool, default True), max_wait_seconds (int, default 240).
async def research( question: str, response_format: str = "full", include_citations: bool = True, max_wait_seconds: int = _DEFAULT_MAX_WAIT_SECONDS, ) -> str: - sparkit_mcp/server.py:181-205 (helper)Helper async function _await_completion that polls the job status until completion, failure, or timeout. Used by the research handler to synchronously wait for results.
async def _await_completion( client: SparkitClient, job: Job, max_wait_seconds: int ) -> str: """Poll until the job is terminal or until ``max_wait_seconds`` elapses.""" elapsed = 0.0 current = job while current.status in {"queued", "running"} and elapsed < max_wait_seconds: await asyncio.sleep(_POLL_INTERVAL_SECONDS) elapsed += _POLL_INTERVAL_SECONDS try: current = await client.get_job(job.job_id) except SparkitAPIError as e: # Transient lookup error mid-poll; bail out with the job_id so # the caller can resume rather than swallowing progress. return ( f"Submitted as `{job.job_id}` but couldn't fetch status: " f"{e.message}. Use `get_job_status` to check on it." ) if current.status == "completed": return _format_completed(current) if current.status in {"failed", "cancelled"}: return _format_terminal_failure(current) # Still queued or running and we ran out the clock. return _format_in_flight(current, elapsed) - sparkit_mcp/client.py:157-202 (helper)The SparkitClient.submit_research() method that sends the POST /v1/research API request to submit a research question. Returns a Job object.
async def submit_research( self, question: str, *, response_format: str = "full", include_citations: bool = True, max_answer_tokens: int | None = None, ) -> Job: body: dict[str, Any] = { "question": question, "response_format": response_format, "include_citations": include_citations, } if max_answer_tokens is not None: body["max_answer_tokens"] = max_answer_tokens resp = await self._client.post("/v1/research", json=body) return _job_from_dict(_unwrap(resp)) async def get_job(self, job_id: str) -> Job: resp = await self._client.get(f"/v1/research/{job_id}") return _job_from_dict(_unwrap(resp)) def _unwrap(resp: httpx.Response) -> dict[str, Any]: """Raise ``SparkitAPIError`` on non-2xx; otherwise return parsed JSON.""" if 200 <= resp.status_code < 300: try: return resp.json() except ValueError as e: raise SparkitAPIError( resp.status_code, f"Malformed JSON response: {e}" ) from e # Error path — try to extract the API's structured error. code: str | None = None message = resp.reason_phrase or "Request failed." try: body = resp.json() err = body.get("error") if isinstance(body, dict) else None if isinstance(err, dict): code = err.get("code") message = err.get("message") or message except ValueError: pass raise SparkitAPIError(resp.status_code, message, code=code)