parallel.py•4.37 kB
"""
Example of using Temporal as the execution engine for MCP Agent workflows.
This example demonstrates how to create a workflow using the app.workflow and app.workflow_run
decorators, and how to run it using the Temporal executor.
"""
import asyncio
from mcp_agent.agents.agent import Agent
from mcp_agent.executor.temporal import TemporalExecutor
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.workflows.parallel.parallel_llm import ParallelLLM
from main import app
SHORT_STORY = """
The Battle of Glimmerwood
In the heart of Glimmerwood, a mystical forest knowed for its radiant trees, a small village thrived.
The villagers, who were live peacefully, shared their home with the forest's magical creatures,
especially the Glimmerfoxes whose fur shimmer like moonlight.
One fateful evening, the peace was shaterred when the infamous Dark Marauders attack.
Lead by the cunning Captain Thorn, the bandits aim to steal the precious Glimmerstones which was believed to grant immortality.
Amidst the choas, a young girl named Elara stood her ground, she rallied the villagers and devised a clever plan.
Using the forests natural defenses they lured the marauders into a trap.
As the bandits aproached the village square, a herd of Glimmerfoxes emerged, blinding them with their dazzling light,
the villagers seized the opportunity to captured the invaders.
Elara's bravery was celebrated and she was hailed as the "Guardian of Glimmerwood".
The Glimmerstones were secured in a hidden grove protected by an ancient spell.
However, not all was as it seemed. The Glimmerstones true power was never confirm,
and whispers of a hidden agenda linger among the villagers.
"""
@app.workflow
class ParallelWorkflow(Workflow[str]):
"""
A simple workflow that demonstrates the basic structure of a Temporal workflow.
"""
@app.workflow_run
async def run(self, input: str) -> WorkflowResult[str]:
"""
Run the workflow, processing the input data.
Args:
input_data: The data to process
Returns:
A WorkflowResult containing the processed data
"""
proofreader = Agent(
name="proofreader",
instruction=""""Review the short story for grammar, spelling, and punctuation errors.
Identify any awkward phrasing or structural issues that could improve clarity.
Provide detailed feedback on corrections.""",
)
fact_checker = Agent(
name="fact_checker",
instruction="""Verify the factual consistency within the story. Identify any contradictions,
logical inconsistencies, or inaccuracies in the plot, character actions, or setting.
Highlight potential issues with reasoning or coherence.""",
)
style_enforcer = Agent(
name="style_enforcer",
instruction="""Analyze the story for adherence to style guidelines.
Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to
enhance storytelling, readability, and engagement.""",
)
grader = Agent(
name="grader",
instruction="""Compile the feedback from the Proofreader, Fact Checker, and Style Enforcer
into a structured report. Summarize key issues and categorize them by type.
Provide actionable recommendations for improving the story,
and give an overall grade based on the feedback.""",
)
parallel = ParallelLLM(
fan_in_agent=grader,
fan_out_agents=[proofreader, fact_checker, style_enforcer],
llm_factory=OpenAIAugmentedLLM,
context=app.context,
)
result = await parallel.generate_str(
message=f"Student short story submission: {input}",
)
return WorkflowResult(value=result)
async def main():
async with app.run() as orchestrator_app:
executor: TemporalExecutor = orchestrator_app.executor
handle = await executor.start_workflow(
"ParallelWorkflow",
SHORT_STORY,
)
a = await handle.result()
print(a)
if __name__ == "__main__":
asyncio.run(main())