Skip to main content
Glama

SingleStore MCP Server

jobs.py6.54 kB
import time import singlestoredb as s2 from datetime import datetime, timezone from typing import Optional from mcp.server.fastmcp import Context from src.api.tools.jobs import utils from src.config import config from src.logger import get_logger # Set up logger for this module logger = get_logger() async def create_job_from_notebook( ctx: Context, name: str, notebook_path: str, mode: str = "Once", execution_interval_in_minutes: Optional[int] = None, ) -> dict: """ Create a scheduled job to run a notebook (uploaded to shared space). Args: ctx: Context object name: Name of the job notebook_path: Remote path to the shared notebook file mode: Job mode (options: "Once", "Recurring") execution_interval_in_minutes: Optional interval in minutes for recurring jobs Returns: Dict with job creation result """ logger.info("Creating job from notebook") settings = config.get_settings() start_time = time.time() user_id = config.get_user_id() try: jobs_manager = utils.get_org_jobs_manager() job_obj = jobs_manager.schedule( notebook_path=notebook_path, name=name, mode=s2.management.job.Mode(mode), create_snapshot=True, execution_interval_in_minutes=execution_interval_in_minutes, ) settings.analytics_manager.track_event( user_id, "tool_calling", { "name": "create_job_from_notebook", "job_name": name, "notebook_path": notebook_path, }, ) execution_time = (time.time() - start_time) * 1000 return { "status": "success", "message": f"Job '{name}' created successfully.", "data": { "jobID": job_obj.job_id, "name": job_obj.name, "description": job_obj.description, "completedExecutionsCount": job_obj.completed_executions_count, "schedule": { "mode": job_obj.schedule.mode.value, "executionIntervalInMinutes": job_obj.schedule.execution_interval_in_minutes, }, "createdAt": job_obj.created_at, "terminatedAt": job_obj.terminated_at, }, "metadata": { "executionTimeMs": round(execution_time, 2), "timestamp": datetime.now(timezone.utc).isoformat(), }, } except Exception as e: logger.error(f"Error creating job: {str(e)}") return { "status": "error", "message": f"Failed to create job: {str(e)}", "errorCode": "JOB_CREATION_FAILED", "errorDetails": {"exception_type": type(e).__name__}, } async def get_job( ctx: Context, job_id: str, ) -> dict: """ Retrieve details of a scheduled job by its ID. Args: ctx: Context object job_id: ID of the job to retrieve Returns: Dict with job details or error info """ settings = config.get_settings() start_time = time.time() user_id = config.get_user_id() try: jobs_manager = utils.get_org_jobs_manager() job_obj = jobs_manager.get(job_id) if not job_obj: return { "status": "error", "message": f"Job with ID '{job_id}' not found.", "errorCode": "JOB_NOT_FOUND", } settings.analytics_manager.track_event( user_id, "tool_calling", { "name": "get_job", "job_id": job_id, }, ) execution_time = (time.time() - start_time) * 1000 return { "status": "success", "message": f"Job '{job_obj.name}' retrieved successfully.", "data": { "jobID": job_obj.job_id, "name": job_obj.name, "description": job_obj.description, "completedExecutionsCount": job_obj.completed_executions_count, "schedule": { "mode": job_obj.schedule.mode.value, "executionIntervalInMinutes": job_obj.schedule.execution_interval_in_minutes, }, "createdAt": job_obj.created_at, "terminatedAt": job_obj.terminated_at, }, "metadata": { "executionTimeMs": round(execution_time, 2), "timestamp": datetime.now(timezone.utc).isoformat(), }, } except Exception as e: logger.error(f"Error retrieving job: {str(e)}") return { "status": "error", "message": f"Failed to retrieve job: {str(e)}", "errorCode": "JOB_GET_FAILED", "errorDetails": {"exception_type": type(e).__name__}, } async def delete_job( ctx: Context, job_id: str, ) -> dict: """ Delete a scheduled job by its ID. Args: ctx: Context object job_id: ID of the job to delete Returns: Dict with deletion result """ settings = config.get_settings() start_time = time.time() user_id = config.get_user_id() try: jobs_manager = utils.get_org_jobs_manager() success = jobs_manager.delete(job_id) if not success: return { "status": "error", "message": f"Failed to delete job with ID '{job_id}'.", "errorCode": "JOB_DELETION_FAILED", } settings.analytics_manager.track_event( user_id, "tool_calling", { "name": "delete_job", "job_id": job_id, }, ) execution_time = (time.time() - start_time) * 1000 return { "status": "success", "message": f"Job '{job_id}' deleted successfully.", "metadata": { "executionTimeMs": round(execution_time, 2), "timestamp": datetime.now(timezone.utc).isoformat(), }, } except Exception as e: logger.error(f"Error deleting job: {str(e)}") return { "status": "error", "message": f"Failed to delete job: {str(e)}", "errorCode": "JOB_DELETION_FAILED", "errorDetails": {"exception_type": type(e).__name__}, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/singlestore-labs/mcp-server-singlestore'

If you have feedback or need assistance with the MCP directory API, please join our Discord server