We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/89jobrien/mcp-joecc'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Core training pipeline for OpenPipe ART.
Implements the training workflow:
1. Generate training inputs from task descriptions
2. Execute rollouts to collect trajectories
3. Score trajectories using RULER
4. Train the model using GRPO
"""
from __future__ import annotations
import os
from datetime import datetime
from typing import TYPE_CHECKING
import art
from litellm import acompletion
from loguru import logger
from pydantic import BaseModel, Field
from mcp_task_aggregator.models.training import (
TaskInput,
TrainingMetrics,
TrainingRun,
TrainingStatus,
TrajectoryMessage,
TrajectoryRecord,
)
if TYPE_CHECKING:
from mcp_task_aggregator.storage.database import Database
class TrainingDataset(BaseModel):
"""Generated training inputs."""
inputs: list[str] = Field(description="List of training input strings")
class TrainingPipeline:
"""OpenPipe ART training pipeline for task management agents.
Handles the full training workflow from input generation through
model training using GRPO.
"""
def __init__(
self,
model_name: str,
project_name: str,
*,
api_key: str | None = None,
input_generation_model: str = "openrouter/openai/gpt-4o-mini",
ruler_model: str = "openrouter/openai/gpt-4o-mini",
system_prompt_model: str = "openrouter/openai/gpt-4o-mini",
) -> None:
"""Initialize the training pipeline.
Args:
model_name: Name for the trained model.
project_name: ART project name for tracking.
api_key: OpenRouter API key (or use OPENROUTER_API_KEY env var).
input_generation_model: Model for generating training inputs.
ruler_model: Model for RULER scoring.
system_prompt_model: Model for generating system prompts.
"""
self.model_name = model_name
self.project_name = project_name
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
self.input_generation_model = input_generation_model
self.ruler_model = ruler_model
self.system_prompt_model = system_prompt_model
self._model: art.TrainableModel | None = None
self._backend: art.LocalBackend | None = None
self._system_prompt: str | None = None
async def initialize(self, base_model: str = "Qwen/Qwen2.5-1.5B-Instruct") -> None:
"""Initialize the ART model and backend.
Args:
base_model: HuggingFace model ID for the base model.
"""
if not self.api_key:
raise ValueError("OPENROUTER_API_KEY is required for training")
os.environ["OPENROUTER_API_KEY"] = self.api_key
self._model = art.TrainableModel(
name=self.model_name,
project=self.project_name,
base_model=base_model,
)
self._backend = art.LocalBackend(
in_process=True,
path="./.art",
)
await self._model.register(self._backend)
logger.info(f"Initialized model {self.model_name} with base {base_model}")
async def generate_system_prompt(self, task_description: str) -> str:
"""Generate a system prompt for the task.
Args:
task_description: Description of the task to train for.
Returns:
Generated system prompt.
"""
messages = [
{
"role": "system",
"content": (
"Generate a clear, concise system prompt for a model that will "
"perform the following task. The prompt should be direct and instructional."
),
},
{
"role": "user",
"content": f"Task: {task_description}\n\nGenerate a system prompt for this task.",
},
]
response = await acompletion(
model=self.system_prompt_model,
messages=messages,
temperature=0.3,
)
self._system_prompt = response.choices[0].message.content.strip()
logger.info(f"Generated system prompt: {self._system_prompt[:100]}...")
return self._system_prompt
async def generate_training_inputs(
self,
task_description: str,
num_examples: int = 50,
) -> list[str]:
"""Generate diverse training inputs for the task.
Args:
task_description: Description of the task.
num_examples: Number of examples to generate.
Returns:
List of training input strings.
"""
system_prompt = f"""You are a helpful assistant that generates diverse, high-quality training inputs.
Task: {task_description}
Generate {num_examples} diverse INPUT examples that someone might provide for this task.
Make sure the inputs:
1. Cover a wide range of cases and edge cases
2. Are realistic and practical
3. Vary in length and complexity
4. Represent real-world scenarios
Only generate the INPUTS, not the outputs. RULER will evaluate the model's attempts automatically.
"""
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": (
f"Generate {num_examples} input examples for the task described above. "
"Return them as a JSON object with key 'inputs' containing a list of strings."
),
},
]
logger.info(f"Generating {num_examples} training inputs...")
inputs: list[str] = []
max_retries = 5
for attempt in range(max_retries):
if len(inputs) >= num_examples:
break
try:
response = await acompletion(
model=self.input_generation_model,
messages=messages,
response_format=TrainingDataset,
temperature=1.0,
)
dataset = TrainingDataset.model_validate_json(response.choices[0].message.content)
inputs = dataset.inputs
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
continue
if len(inputs) < num_examples:
raise ValueError(f"Failed to generate {num_examples} training inputs after {max_retries} attempts")
logger.info(f"Generated {len(inputs)} training inputs")
return inputs[:num_examples]
async def rollout(self, task_input: TaskInput) -> art.Trajectory:
"""Execute a single rollout for a task input.
Args:
task_input: The input to process.
Returns:
Trajectory containing the conversation and metadata.
"""
if not self._model:
raise RuntimeError("Pipeline not initialized. Call initialize() first.")
if not self._system_prompt:
raise RuntimeError("System prompt not set. Call generate_system_prompt() first.")
traj = art.Trajectory(
reward=0.0,
messages_and_choices=[],
metadata={
"step": task_input.step,
"input": task_input.input_text,
},
)
# Build the conversation
traj.messages_and_choices = [
{"role": "system", "content": self._system_prompt},
{"role": "user", "content": task_input.input_text},
]
# Get model response
litellm_model_name = f"hosted_vllm/{self._model.name}" if self._model.trainable else self._model.name
response = await acompletion(
model=litellm_model_name,
base_url=self._model.inference_base_url,
api_key=self._model.inference_api_key,
temperature=0.7,
messages=traj.messages(),
caching=False,
)
# Add the model's response
from art.utils.litellm import convert_litellm_choice_to_openai
traj.messages_and_choices.append(convert_litellm_choice_to_openai(response.choices[0]))
return traj
async def gather_trajectories(
self,
task_inputs: list[TaskInput],
rollouts_per_group: int = 4,
) -> list[art.TrajectoryGroup]:
"""Gather trajectory groups for a batch of inputs.
Args:
task_inputs: List of task inputs to process.
rollouts_per_group: Number of rollouts per input for RULER comparison.
Returns:
List of trajectory groups for training.
"""
groups = []
for task_input in task_inputs:
groups.append(
art.TrajectoryGroup(self.rollout(task_input) for _ in range(rollouts_per_group))
)
finished_groups = await art.gather_trajectory_groups(
groups,
pbar_desc="Generating responses",
max_exceptions=rollouts_per_group * len(task_inputs),
)
return finished_groups
async def score_with_ruler(
self,
groups: list[art.TrajectoryGroup],
max_retries: int = 10,
) -> list[art.TrajectoryGroup]:
"""Score trajectory groups using RULER.
Args:
groups: Trajectory groups to score.
max_retries: Maximum retries for API rate limiting.
Returns:
Scored trajectory groups.
"""
from art.rewards import ruler_score_group
judged_groups = []
for group in groups:
judged_group = None
for _ in range(max_retries):
try:
judged_group = await ruler_score_group(group, self.ruler_model, debug=False)
break
except Exception as e:
logger.warning(f"Error scoring group: {e}")
continue
if judged_group is None:
raise RuntimeError("Failed to score trajectory group after max retries")
judged_groups.append(judged_group)
return judged_groups
async def train_step(
self,
trajectory_groups: list[art.TrajectoryGroup],
learning_rate: float = 1e-5,
) -> None:
"""Execute a single training step.
Args:
trajectory_groups: Scored trajectory groups.
learning_rate: Learning rate for training.
"""
if not self._model:
raise RuntimeError("Pipeline not initialized. Call initialize() first.")
await self._model.delete_checkpoints()
await self._model.train(
trajectory_groups,
config=art.TrainConfig(learning_rate=learning_rate),
)
async def run_training(
self,
training_run: TrainingRun,
_db: Database | None = None,
) -> TrainingRun:
"""Execute a complete training run.
Args:
training_run: Configuration for the training run.
_db: Optional database for persisting progress (reserved for future use).
Returns:
Updated training run with results.
"""
from art.utils import iterate_dataset
try:
# Initialize
training_run.status = TrainingStatus.GENERATING
training_run.started_at = datetime.now()
# Generate system prompt
self._system_prompt = await self.generate_system_prompt(training_run.task_description)
training_run.system_prompt = self._system_prompt
# Generate training inputs
training_inputs = await self.generate_training_inputs(
training_run.task_description,
num_examples=training_run.num_examples,
)
# Convert to TaskInput objects
task_inputs = [TaskInput(step=0, input_text=inp) for inp in training_inputs]
# Create training iterator
training_run.status = TrainingStatus.COLLECTING
training_iterator = iterate_dataset(
task_inputs,
groups_per_step=1,
num_epochs=training_run.num_epochs,
initial_step=await self._model.get_step() if self._model else 0,
)
total_trajectories = 0
total_reward = 0.0
for batch in training_iterator:
logger.info(f"Training step {batch.step}, epoch {batch.epoch}")
# Update step in task inputs
for task_input in batch.items:
task_input.step = batch.step
# Collect trajectories
finished_groups = await self.gather_trajectories(
batch.items,
rollouts_per_group=training_run.rollouts_per_group,
)
# Score with RULER
training_run.status = TrainingStatus.SCORING
judged_groups = await self.score_with_ruler(finished_groups)
# Train
training_run.status = TrainingStatus.TRAINING
await self.train_step(judged_groups, learning_rate=training_run.learning_rate)
# Update metrics
for group in judged_groups:
for traj in group.trajectories:
total_trajectories += 1
total_reward += traj.reward
logger.info(f"Completed training step {batch.step}")
# Check max steps
if training_run.max_steps and batch.step >= training_run.max_steps:
logger.info(f"Reached maximum training steps ({training_run.max_steps})")
break
# Complete
training_run.status = TrainingStatus.COMPLETED
training_run.completed_at = datetime.now()
training_run.metrics = TrainingMetrics(
total_trajectories=total_trajectories,
avg_reward=total_reward / total_trajectories if total_trajectories > 0 else None,
epoch=training_run.num_epochs,
)
logger.info("Training completed successfully")
except Exception as e:
training_run.status = TrainingStatus.FAILED
training_run.error = str(e)
training_run.completed_at = datetime.now()
logger.error(f"Training failed: {e}")
raise
return training_run
def trajectory_to_record(
self,
trajectory: art.Trajectory,
run_id: int,
task_input: TaskInput,
) -> TrajectoryRecord:
"""Convert an ART trajectory to a record for storage.
Args:
trajectory: ART trajectory to convert.
run_id: Training run ID.
task_input: Original task input.
Returns:
TrajectoryRecord for database storage.
"""
messages = [
TrajectoryMessage(role=m["role"], content=m["content"]) for m in trajectory.messages()
]
return TrajectoryRecord(
run_id=run_id,
task_input=task_input,
messages=messages,
reward=trajectory.reward,
metadata=trajectory.metadata or {},
)