Skip to main content
Glama

mcp-run-python

Official
by pydantic
steps.md11.8 kB
# Steps Steps are the fundamental units of work in a graph. They're async functions that receive a [`StepContext`][pydantic_graph.beta.step.StepContext] and return a value. ## Creating Steps Steps are created using the [`@g.step`][pydantic_graph.beta.graph_builder.GraphBuilder.step] decorator on the [`GraphBuilder`][pydantic_graph.beta.graph_builder.GraphBuilder]: ```python {title="basic_step.py"} from dataclasses import dataclass from pydantic_graph.beta import GraphBuilder, StepContext @dataclass class MyState: counter: int = 0 g = GraphBuilder(state_type=MyState, output_type=int) @g.step async def increment(ctx: StepContext[MyState, None, None]) -> int: ctx.state.counter += 1 return ctx.state.counter g.add( g.edge_from(g.start_node).to(increment), g.edge_from(increment).to(g.end_node), ) graph = g.build() async def main(): state = MyState() result = await graph.run(state=state) print(result) #> 1 ``` _(This example is complete, it can be run "as is" — you'll need to add `import asyncio; asyncio.run(main())` to run `main`)_ ## Step Context Every step function receives a [`StepContext`][pydantic_graph.beta.step.StepContext] as its first parameter. The context provides access to: - `ctx.state` - The mutable graph state (type: `StateT`) - `ctx.deps` - Injected dependencies (type: `DepsT`) - `ctx.inputs` - Input data for this step (type: `InputT`) ### Accessing State State is shared across all steps in a graph and can be freely mutated: ```python {title="state_access.py"} from dataclasses import dataclass from pydantic_graph.beta import GraphBuilder, StepContext @dataclass class AppState: messages: list[str] async def main(): g = GraphBuilder(state_type=AppState, output_type=list[str]) @g.step async def add_hello(ctx: StepContext[AppState, None, None]) -> None: ctx.state.messages.append('Hello') @g.step async def add_world(ctx: StepContext[AppState, None, None]) -> None: ctx.state.messages.append('World') @g.step async def get_messages(ctx: StepContext[AppState, None, None]) -> list[str]: return ctx.state.messages g.add( g.edge_from(g.start_node).to(add_hello), g.edge_from(add_hello).to(add_world), g.edge_from(add_world).to(get_messages), g.edge_from(get_messages).to(g.end_node), ) graph = g.build() state = AppState(messages=[]) result = await graph.run(state=state) print(result) #> ['Hello', 'World'] ``` _(This example is complete, it can be run "as is" — you'll need to add `import asyncio; asyncio.run(main())` to run `main`)_ ### Working with Inputs Steps can receive and transform input data: ```python {title="step_inputs.py"} from dataclasses import dataclass from pydantic_graph.beta import GraphBuilder, StepContext @dataclass class SimpleState: pass async def main(): g = GraphBuilder( state_type=SimpleState, input_type=int, output_type=str, ) @g.step async def double_it(ctx: StepContext[SimpleState, None, int]) -> int: """Double the input value.""" return ctx.inputs * 2 @g.step async def stringify(ctx: StepContext[SimpleState, None, int]) -> str: """Convert to a formatted string.""" return f'Result: {ctx.inputs}' g.add( g.edge_from(g.start_node).to(double_it), g.edge_from(double_it).to(stringify), g.edge_from(stringify).to(g.end_node), ) graph = g.build() result = await graph.run(state=SimpleState(), inputs=21) print(result) #> Result: 42 ``` _(This example is complete, it can be run "as is" — you'll need to add `import asyncio; asyncio.run(main())` to run `main`)_ ## Dependency Injection Steps can access injected dependencies through `ctx.deps`: ```python {title="dependencies.py"} from dataclasses import dataclass from pydantic_graph.beta import GraphBuilder, StepContext @dataclass class AppState: pass @dataclass class AppDeps: """Dependencies injected into the graph.""" multiplier: int async def main(): g = GraphBuilder( state_type=AppState, deps_type=AppDeps, input_type=int, output_type=int, ) @g.step async def multiply(ctx: StepContext[AppState, AppDeps, int]) -> int: """Multiply input by the injected multiplier.""" return ctx.inputs * ctx.deps.multiplier g.add( g.edge_from(g.start_node).to(multiply), g.edge_from(multiply).to(g.end_node), ) graph = g.build() deps = AppDeps(multiplier=10) result = await graph.run(state=AppState(), deps=deps, inputs=5) print(result) #> 50 ``` _(This example is complete, it can be run "as is" — you'll need to add `import asyncio; asyncio.run(main())` to run `main`)_ ## Customizing Steps ### Custom Node IDs By default, step node IDs are inferred from the function name. You can override this: ```python {title="custom_id.py" requires="basic_step.py"} from pydantic_graph.beta import StepContext from basic_step import MyState, g @g.step(node_id='my_custom_id') async def my_step(ctx: StepContext[MyState, None, None]) -> int: return 42 # The node ID is now 'my_custom_id' instead of 'my_step' ``` ### Human-Readable Labels Labels provide documentation for diagram generation: ```python {title="labels.py" requires="basic_step.py"} from pydantic_graph.beta import StepContext from basic_step import MyState, g @g.step(label='Increment the counter') async def increment(ctx: StepContext[MyState, None, None]) -> int: ctx.state.counter += 1 return ctx.state.counter # Access the label programmatically print(increment.label) #> Increment the counter ``` ## Sequential Steps Multiple steps can be chained sequentially: ```python {title="sequential.py"} from dataclasses import dataclass from pydantic_graph.beta import GraphBuilder, StepContext @dataclass class MathState: operations: list[str] async def main(): g = GraphBuilder( state_type=MathState, input_type=int, output_type=int, ) @g.step async def add_five(ctx: StepContext[MathState, None, int]) -> int: ctx.state.operations.append('add 5') return ctx.inputs + 5 @g.step async def multiply_by_two(ctx: StepContext[MathState, None, int]) -> int: ctx.state.operations.append('multiply by 2') return ctx.inputs * 2 @g.step async def subtract_three(ctx: StepContext[MathState, None, int]) -> int: ctx.state.operations.append('subtract 3') return ctx.inputs - 3 # Connect steps sequentially g.add( g.edge_from(g.start_node).to(add_five), g.edge_from(add_five).to(multiply_by_two), g.edge_from(multiply_by_two).to(subtract_three), g.edge_from(subtract_three).to(g.end_node), ) graph = g.build() state = MathState(operations=[]) result = await graph.run(state=state, inputs=10) print(f'Result: {result}') #> Result: 27 print(f'Operations: {state.operations}') #> Operations: ['add 5', 'multiply by 2', 'subtract 3'] ``` _(This example is complete, it can be run "as is" — you'll need to add `import asyncio; asyncio.run(main())` to run `main`)_ The computation is: `(10 + 5) * 2 - 3 = 27` ## Streaming Steps In addition to regular steps that return a single value, you can create streaming steps that yield multiple values over time using the [`@g.stream`][pydantic_graph.beta.graph_builder.GraphBuilder.stream] decorator: ```python {title="streaming_step.py"} from dataclasses import dataclass from pydantic_graph.beta import GraphBuilder, StepContext from pydantic_graph.beta.join import reduce_list_append @dataclass class SimpleState: pass g = GraphBuilder(state_type=SimpleState, output_type=list[int]) @g.stream async def generate_stream(ctx: StepContext[SimpleState, None, None]): """Stream numbers from 1 to 5.""" for i in range(1, 6): yield i @g.step async def square(ctx: StepContext[SimpleState, None, int]) -> int: return ctx.inputs * ctx.inputs collect = g.join(reduce_list_append, initial_factory=list[int]) g.add( g.edge_from(g.start_node).to(generate_stream), # The stream output is an AsyncIterable, so we can map over it g.edge_from(generate_stream).map().to(square), g.edge_from(square).to(collect), g.edge_from(collect).to(g.end_node), ) graph = g.build() async def main(): result = await graph.run(state=SimpleState()) print(sorted(result)) #> [1, 4, 9, 16, 25] ``` _(This example is complete, it can be run "as is" — you'll need to add `import asyncio; asyncio.run(main())` to run `main`)_ ### How Streaming Steps Work Streaming steps return an `AsyncIterable` that yields values over time. When you use `.map()` on a streaming step's output, the graph processes each yielded value as it becomes available, creating parallel tasks dynamically. This is particularly useful for: - Processing data from APIs that stream responses - Handling real-time data feeds - Progressive processing of large datasets - Any scenario where you want to start processing results before all data is available Like regular steps, streaming steps can also have custom node IDs and labels: ```python {title="labeled_stream.py" requires="streaming_step.py"} from pydantic_graph.beta import StepContext from streaming_step import SimpleState, g @g.stream(node_id='my_stream', label='Generate numbers progressively') async def labeled_stream(ctx: StepContext[SimpleState, None, None]): for i in range(10): yield i ``` ## Edge Building Convenience Methods The builder provides helper methods for common edge patterns: ### Simple Edges with `add_edge()` ```python {title="add_edge_example.py"} from dataclasses import dataclass from pydantic_graph.beta import GraphBuilder, StepContext @dataclass class SimpleState: pass async def main(): g = GraphBuilder(state_type=SimpleState, output_type=int) @g.step async def step_a(ctx: StepContext[SimpleState, None, None]) -> int: return 10 @g.step async def step_b(ctx: StepContext[SimpleState, None, int]) -> int: return ctx.inputs + 5 # Using add_edge() for simple connections g.add_edge(g.start_node, step_a) g.add_edge(step_a, step_b, label='from a to b') g.add_edge(step_b, g.end_node) graph = g.build() result = await graph.run(state=SimpleState()) print(result) #> 15 ``` _(This example is complete, it can be run "as is" — you'll need to add `import asyncio; asyncio.run(main())` to run `main`)_ ## Type Safety The beta graph API provides strong type checking through generics. Type parameters on [`StepContext`][pydantic_graph.beta.step.StepContext] ensure: - State access is properly typed - Dependencies are correctly typed - Input/output types match across edges ```python from dataclasses import dataclass from pydantic_graph.beta import GraphBuilder, StepContext @dataclass class MyState: pass g = GraphBuilder(state_type=MyState, output_type=str) # Type checker will catch mismatches @g.step async def expects_int(ctx: StepContext[MyState, None, int]) -> str: return str(ctx.inputs) @g.step async def returns_str(ctx: StepContext[MyState, None, None]) -> str: return 'hello' # This would be a type error - expects_int needs int input, but returns_str outputs str # g.add(g.edge_from(returns_str).to(expects_int)) # Type error! ``` ## Next Steps - Learn about [parallel execution](parallel.md) with broadcasting and mapping - Understand [join nodes](joins.md) for aggregating parallel results - Explore [conditional branching](decisions.md) with decision nodes

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pydantic/pydantic-ai'

If you have feedback or need assistance with the MCP directory API, please join our Discord server