jobs_submit_blueprint
Submit a demo background job that simulates long-running orchestration workflows. Configure duration in seconds and optional modules to test background task execution.
Instructions
Submit a demo background job that models long-running orchestration work.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| duration_seconds | No | ||
| modules | No |
Implementation Reference
- src/mcp_template/modules/jobs.py:11-16 (handler)The MCP tool handler for 'jobs_submit_blueprint'. It is an async function registered via @server.tool that delegates to container.jobs.submit_blueprint().
@server.tool(name="jobs_submit_blueprint") async def jobs_submit_blueprint(duration_seconds: float = 3.0, modules: list[str] | None = None) -> object: """Submit a demo background job that models long-running orchestration work.""" with container.metrics.observe_tool("jobs_submit_blueprint"): module_names = modules or container.registry.names() return await container.jobs.submit_blueprint(module_names=module_names, duration_seconds=duration_seconds) - src/mcp_template/services/jobs.py:35-56 (handler)The core business logic for submit_blueprint: creates a JobStatus, enqueues it, and starts background execution via _run_blueprint_job.
async def submit_blueprint(self, module_names: Sequence[str], duration_seconds: float = 3.0) -> JobSubmission: if self._task_group is None: raise RuntimeError("Job service is not ready until server lifespan starts") job = JobStatus( id=str(uuid4()), name="blueprint", status="queued", submitted_at=utc_now(), metadata={ "modules": list(module_names), "duration_seconds": float(duration_seconds), }, ) async with self._lock: self._jobs[job.id] = job queue_depth = sum(1 for existing in self._jobs.values() if existing.status == "queued") self._metrics.record_job_submitted(job.name) self._refresh_metrics() self._task_group.start_soon(self._run_blueprint_job, job.id, list(module_names), float(duration_seconds)) return JobSubmission(job=job.model_copy(deep=True), queue_depth=queue_depth) - JobStatus and JobSubmission Pydantic models used as the schema for the job submission input/output.
class JobStatus(TemplateModel): id: str name: str status: Literal["queued", "running", "succeeded", "failed"] submitted_at: datetime started_at: datetime | None = None completed_at: datetime | None = None progress: float = 0.0 metadata: dict[str, Any] = Field(default_factory=dict) result: dict[str, Any] | None = None error: str | None = None class JobSubmission(TemplateModel): job: JobStatus queue_depth: int - src/mcp_template/modules/jobs.py:55-64 (registration)ModuleDescriptor returned by the register() function listing 'jobs_submit_blueprint' as one of the tools exposed by this module.
return ModuleDescriptor( name="jobs", title="Jobs", summary="Long-running task scaffolding and status resources for orchestration-heavy servers.", tags=["background", "async", "orchestration"], maturity="beta", tools=["jobs_submit_blueprint", "jobs_get_status", "jobs_list"], resources=["job://{job_id}"], prompts=["jobs_postmortem"], ) - The private _run_blueprint_job method that simulates a long-running background job with progress reporting and result generation.
async def _run_blueprint_job(self, job_id: str, module_names: list[str], duration_seconds: float) -> None: kind = "blueprint" try: await self._patch(job_id, status="running", started_at=utc_now()) steps = max(1, min(12, int(max(duration_seconds, 0.5) * 2))) sleep_for = max(duration_seconds / steps, 0.05) modules = module_names or ["system", "workspace", "jobs", "design"] for step in range(steps): await anyio.sleep(sleep_for) await self._patch(job_id, progress=round((step + 1) / steps, 4)) recommendations = [ f"Expose {module} behind a dedicated service layer before adding domain-specific tools." for module in modules ] result = { "summary": "Blueprint job completed successfully.", "recommended_next_actions": recommendations, "modules": modules, "operational_notes": [ "Keep transport-specific concerns at the edge.", "Prefer structured output for tools that may be chained by hosts.", "Add auth only when the external trust boundary is clear.", ], } await self._patch( job_id, status="succeeded", progress=1.0, completed_at=utc_now(), result=result, ) self._metrics.record_job_finished(kind, "succeeded") except Exception as exc: await self._patch(job_id, status="failed", completed_at=utc_now(), error=str(exc)) self._metrics.record_job_finished(kind, "failed") finally: self._refresh_metrics() async def _patch(self, job_id: str, **changes: object) -> None: async with self._lock: job = self._jobs[job_id] for field_name, value in changes.items(): setattr(job, field_name, value) def _refresh_metrics(self) -> None: self._metrics.set_active_jobs(self.active_count_snapshot())