"""Demonstration workflow showing how to call MCP tools from Temporal workflows."""
import asyncio
import uuid
import logging
from typing import List, Dict, Any
from mcp import ClientSession
from nexusmcp import WorkflowTransport
from pydantic import BaseModel
from temporalio import workflow
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.worker import Worker
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CalculatorDemoInput(BaseModel):
"""Input for the calculator demonstration workflow."""
endpoint: str = "mcp-gateway"
expressions: List[str] = [
"2 + 3 * 4",
"10 / 2",
"2 ** 3",
"abs(-5)",
"round(3.14159, 2)"
]
class CalculatorDemoResult(BaseModel):
"""Result of the calculator demonstration workflow."""
tools_discovered: int
calculations_performed: int
results: List[Dict[str, Any]]
total_sum: float
# The workflow must have the sandbox disabled to make MCP calls
@workflow.defn(sandboxed=False)
class CalculatorDemoWorkflow:
"""Demonstration workflow that uses MCP calculator tools.
This workflow shows how to:
1. Connect to MCP tools through Nexus
2. Discover available tools
3. Call various calculator operations
4. Handle results and errors
"""
@workflow.run
async def run(self, input: CalculatorDemoInput) -> CalculatorDemoResult:
"""Run the calculator demonstration."""
workflow.logger.info(f"Starting calculator demo with endpoint: {input.endpoint}")
# Connect to MCP tools through the Nexus endpoint
transport = WorkflowTransport(input.endpoint)
results = []
total_sum = 0.0
async with transport.connect() as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
# Initialize the MCP session
await session.initialize()
workflow.logger.info("MCP session initialized")
# Discover available tools
list_tools_result = await session.list_tools()
tools_count = len(list_tools_result.tools)
workflow.logger.info(f"Discovered {tools_count} MCP tools")
# Log available tools
for tool in list_tools_result.tools:
workflow.logger.info(f" - {tool.name}: {tool.description}")
# Perform calculations using the expression evaluator
for expression in input.expressions:
try:
workflow.logger.info(f"Evaluating expression: {expression}")
# Call the calculate tool
call_result = await session.call_tool(
name="Calculator/calculate",
arguments={"expression": expression}
)
# Debug: Log the raw call result
workflow.logger.info(f"π Raw call result for '{expression}': {call_result}")
workflow.logger.info(f"π Call result type: {type(call_result)}")
workflow.logger.info(f"π Call result content: {call_result.content}")
workflow.logger.info(f"π Call result isError: {call_result.isError}")
# Parse the result - check structuredContent first, then content
result_value = None
if call_result.structuredContent:
# Data is in structuredContent (direct object)
workflow.logger.info(f"π Found structuredContent: {call_result.structuredContent}")
result_data = call_result.structuredContent
result_value = result_data.get("result", 0.0)
results.append({
"expression": expression,
"result": result_value,
"success": True
})
total_sum += result_value
workflow.logger.info(f" β
Successfully parsed result from structuredContent: {result_value}")
elif call_result.content and len(call_result.content) > 0:
# Fallback to content (legacy format)
content = call_result.content[0]
workflow.logger.info(f"π Content type: {type(content)}, Content: {content}")
if hasattr(content, 'text'):
workflow.logger.info(f"π Content text: '{content.text}'")
import json
try:
result_data = json.loads(content.text)
result_value = result_data.get("result", 0.0)
results.append({
"expression": expression,
"result": result_value,
"success": True
})
total_sum += result_value
workflow.logger.info(f" β
Successfully parsed result from content: {result_value}")
except json.JSONDecodeError as e:
workflow.logger.error(f"π JSON decode error for '{expression}': {e}")
workflow.logger.error(f"π Raw text that failed to parse: '{content.text}'")
results.append({
"expression": expression,
"error": f"JSON decode error: {e}",
"success": False
})
else:
workflow.logger.warning(f"π Content has no 'text' attribute for {expression}: {type(content)}")
else:
workflow.logger.warning(f"π No content or structuredContent returned for {expression}")
workflow.logger.warning(f"π Call result details: isError={call_result.isError}, content={call_result.content}, structuredContent={call_result.structuredContent}")
except Exception as e:
workflow.logger.error(f"Error evaluating '{expression}': {e}")
results.append({
"expression": expression,
"error": str(e),
"success": False
})
# Demonstrate other calculator operations
try:
# Test addition
workflow.logger.info("Testing addition operation")
await session.call_tool(
name="Calculator/add",
arguments={"a": 10, "b": 5}
)
workflow.logger.info("Addition completed successfully")
# Test multiplication
workflow.logger.info("Testing multiplication operation")
await session.call_tool(
name="Calculator/multiply",
arguments={"a": 7, "b": 8}
)
workflow.logger.info("Multiplication completed successfully")
# Test sum_list operation
workflow.logger.info("Testing list sum operation")
await session.call_tool(
name="Calculator/sum_list",
arguments={"numbers": [1, 2, 3, 4, 5]}
)
workflow.logger.info("List sum completed successfully")
except Exception as e:
workflow.logger.error(f"Error in additional operations: {e}")
workflow.logger.info("Calculator demo completed")
return CalculatorDemoResult(
tools_discovered=tools_count,
calculations_performed=len(results),
results=results,
total_sum=total_sum
)
class CalculatorDemoRunner:
"""Runner for the calculator demo workflow."""
def __init__(
self,
temporal_host: str = "localhost:7233",
namespace: str = "my-caller-namespace",
task_queue: str = "calculator-demo",
) -> None:
self.temporal_host = temporal_host
self.namespace = namespace
self.task_queue = task_queue
async def run_demo(self, endpoint: str = "mcp-gateway") -> None:
"""Run the calculator demonstration."""
logger.info(f"Connecting to Temporal at {self.temporal_host}, namespace: {self.namespace}")
client = await Client.connect(
self.temporal_host,
namespace=self.namespace,
data_converter=pydantic_data_converter,
)
logger.info("Starting demo workflow worker...")
async with Worker(
client,
task_queue=self.task_queue,
workflows=[CalculatorDemoWorkflow],
) as worker:
logger.info(f"Worker started on task queue: {self.task_queue}")
# Execute the workflow
workflow_id = f"calculator-demo-{uuid.uuid4()}"
logger.info(f"Executing workflow: {workflow_id}")
result = await client.execute_workflow(
CalculatorDemoWorkflow.run,
CalculatorDemoInput(endpoint=endpoint),
id=workflow_id,
task_queue=worker.task_queue,
)
# Display results
logger.info("=== Demo Results ===")
logger.info(f"Tools discovered: {result.tools_discovered}")
logger.info(f"Calculations performed: {result.calculations_performed}")
logger.info(f"Total sum of results: {result.total_sum}")
logger.info("\nCalculation Results:")
for calc_result in result.results:
if calc_result.get("success", False):
logger.info(f" {calc_result['expression']} = {calc_result['result']}")
else:
logger.info(f" {calc_result['expression']} = ERROR: {calc_result.get('error', 'Unknown error')}")
logger.info("Demo completed successfully!")
async def main() -> None:
"""Main entry point for running the calculator demo."""
import argparse
parser = argparse.ArgumentParser(description="Run the Nexus MCP Calculator demo workflow")
parser.add_argument(
"--temporal-host",
default="localhost:7233",
help="Temporal server host (default: localhost:7233)"
)
parser.add_argument(
"--namespace",
default="my-caller-namespace",
help="Temporal namespace (default: my-caller-namespace)"
)
parser.add_argument(
"--task-queue",
default="calculator-demo",
help="Task queue name (default: calculator-demo)"
)
parser.add_argument(
"--endpoint",
default="mcp-gateway",
help="Nexus endpoint name (default: mcp-gateway)"
)
args = parser.parse_args()
runner = CalculatorDemoRunner(
temporal_host=args.temporal_host,
namespace=args.namespace,
task_queue=args.task_queue,
)
try:
await runner.run_demo(args.endpoint)
except Exception as e:
logger.error(f"Demo failed: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())