Skip to main content
Glama
steveandroulakis

Temporal Nexus Calculator MCP Server

demo_workflow.pyβ€’12.5 kB
"""Demonstration workflow showing how to call MCP tools from Temporal workflows.""" import asyncio import uuid import logging from typing import List, Dict, Any from mcp import ClientSession from nexusmcp import WorkflowTransport from pydantic import BaseModel from temporalio import workflow from temporalio.client import Client from temporalio.contrib.pydantic import pydantic_data_converter from temporalio.worker import Worker # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CalculatorDemoInput(BaseModel): """Input for the calculator demonstration workflow.""" endpoint: str = "mcp-gateway" expressions: List[str] = [ "2 + 3 * 4", "10 / 2", "2 ** 3", "abs(-5)", "round(3.14159, 2)" ] class CalculatorDemoResult(BaseModel): """Result of the calculator demonstration workflow.""" tools_discovered: int calculations_performed: int results: List[Dict[str, Any]] total_sum: float # The workflow must have the sandbox disabled to make MCP calls @workflow.defn(sandboxed=False) class CalculatorDemoWorkflow: """Demonstration workflow that uses MCP calculator tools. This workflow shows how to: 1. Connect to MCP tools through Nexus 2. Discover available tools 3. Call various calculator operations 4. Handle results and errors """ @workflow.run async def run(self, input: CalculatorDemoInput) -> CalculatorDemoResult: """Run the calculator demonstration.""" workflow.logger.info(f"Starting calculator demo with endpoint: {input.endpoint}") # Connect to MCP tools through the Nexus endpoint transport = WorkflowTransport(input.endpoint) results = [] total_sum = 0.0 async with transport.connect() as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Initialize the MCP session await session.initialize() workflow.logger.info("MCP session initialized") # Discover available tools list_tools_result = await session.list_tools() tools_count = len(list_tools_result.tools) workflow.logger.info(f"Discovered {tools_count} MCP tools") # Log available tools for tool in list_tools_result.tools: workflow.logger.info(f" - {tool.name}: {tool.description}") # Perform calculations using the expression evaluator for expression in input.expressions: try: workflow.logger.info(f"Evaluating expression: {expression}") # Call the calculate tool call_result = await session.call_tool( name="Calculator/calculate", arguments={"expression": expression} ) # Debug: Log the raw call result workflow.logger.info(f"πŸ” Raw call result for '{expression}': {call_result}") workflow.logger.info(f"πŸ” Call result type: {type(call_result)}") workflow.logger.info(f"πŸ” Call result content: {call_result.content}") workflow.logger.info(f"πŸ” Call result isError: {call_result.isError}") # Parse the result - check structuredContent first, then content result_value = None if call_result.structuredContent: # Data is in structuredContent (direct object) workflow.logger.info(f"πŸ” Found structuredContent: {call_result.structuredContent}") result_data = call_result.structuredContent result_value = result_data.get("result", 0.0) results.append({ "expression": expression, "result": result_value, "success": True }) total_sum += result_value workflow.logger.info(f" βœ… Successfully parsed result from structuredContent: {result_value}") elif call_result.content and len(call_result.content) > 0: # Fallback to content (legacy format) content = call_result.content[0] workflow.logger.info(f"πŸ” Content type: {type(content)}, Content: {content}") if hasattr(content, 'text'): workflow.logger.info(f"πŸ” Content text: '{content.text}'") import json try: result_data = json.loads(content.text) result_value = result_data.get("result", 0.0) results.append({ "expression": expression, "result": result_value, "success": True }) total_sum += result_value workflow.logger.info(f" βœ… Successfully parsed result from content: {result_value}") except json.JSONDecodeError as e: workflow.logger.error(f"πŸ” JSON decode error for '{expression}': {e}") workflow.logger.error(f"πŸ” Raw text that failed to parse: '{content.text}'") results.append({ "expression": expression, "error": f"JSON decode error: {e}", "success": False }) else: workflow.logger.warning(f"πŸ” Content has no 'text' attribute for {expression}: {type(content)}") else: workflow.logger.warning(f"πŸ” No content or structuredContent returned for {expression}") workflow.logger.warning(f"πŸ” Call result details: isError={call_result.isError}, content={call_result.content}, structuredContent={call_result.structuredContent}") except Exception as e: workflow.logger.error(f"Error evaluating '{expression}': {e}") results.append({ "expression": expression, "error": str(e), "success": False }) # Demonstrate other calculator operations try: # Test addition workflow.logger.info("Testing addition operation") await session.call_tool( name="Calculator/add", arguments={"a": 10, "b": 5} ) workflow.logger.info("Addition completed successfully") # Test multiplication workflow.logger.info("Testing multiplication operation") await session.call_tool( name="Calculator/multiply", arguments={"a": 7, "b": 8} ) workflow.logger.info("Multiplication completed successfully") # Test sum_list operation workflow.logger.info("Testing list sum operation") await session.call_tool( name="Calculator/sum_list", arguments={"numbers": [1, 2, 3, 4, 5]} ) workflow.logger.info("List sum completed successfully") except Exception as e: workflow.logger.error(f"Error in additional operations: {e}") workflow.logger.info("Calculator demo completed") return CalculatorDemoResult( tools_discovered=tools_count, calculations_performed=len(results), results=results, total_sum=total_sum ) class CalculatorDemoRunner: """Runner for the calculator demo workflow.""" def __init__( self, temporal_host: str = "localhost:7233", namespace: str = "my-caller-namespace", task_queue: str = "calculator-demo", ) -> None: self.temporal_host = temporal_host self.namespace = namespace self.task_queue = task_queue async def run_demo(self, endpoint: str = "mcp-gateway") -> None: """Run the calculator demonstration.""" logger.info(f"Connecting to Temporal at {self.temporal_host}, namespace: {self.namespace}") client = await Client.connect( self.temporal_host, namespace=self.namespace, data_converter=pydantic_data_converter, ) logger.info("Starting demo workflow worker...") async with Worker( client, task_queue=self.task_queue, workflows=[CalculatorDemoWorkflow], ) as worker: logger.info(f"Worker started on task queue: {self.task_queue}") # Execute the workflow workflow_id = f"calculator-demo-{uuid.uuid4()}" logger.info(f"Executing workflow: {workflow_id}") result = await client.execute_workflow( CalculatorDemoWorkflow.run, CalculatorDemoInput(endpoint=endpoint), id=workflow_id, task_queue=worker.task_queue, ) # Display results logger.info("=== Demo Results ===") logger.info(f"Tools discovered: {result.tools_discovered}") logger.info(f"Calculations performed: {result.calculations_performed}") logger.info(f"Total sum of results: {result.total_sum}") logger.info("\nCalculation Results:") for calc_result in result.results: if calc_result.get("success", False): logger.info(f" {calc_result['expression']} = {calc_result['result']}") else: logger.info(f" {calc_result['expression']} = ERROR: {calc_result.get('error', 'Unknown error')}") logger.info("Demo completed successfully!") async def main() -> None: """Main entry point for running the calculator demo.""" import argparse parser = argparse.ArgumentParser(description="Run the Nexus MCP Calculator demo workflow") parser.add_argument( "--temporal-host", default="localhost:7233", help="Temporal server host (default: localhost:7233)" ) parser.add_argument( "--namespace", default="my-caller-namespace", help="Temporal namespace (default: my-caller-namespace)" ) parser.add_argument( "--task-queue", default="calculator-demo", help="Task queue name (default: calculator-demo)" ) parser.add_argument( "--endpoint", default="mcp-gateway", help="Nexus endpoint name (default: mcp-gateway)" ) args = parser.parse_args() runner = CalculatorDemoRunner( temporal_host=args.temporal_host, namespace=args.namespace, task_queue=args.task_queue, ) try: await runner.run_demo(args.endpoint) except Exception as e: logger.error(f"Demo failed: {e}") raise if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/steveandroulakis/temporal-nexus-mcp-demo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server