"""
OpenAI provider implementation.
This module implements the BaseLLMProvider interface for OpenAI's API,
including support for both standard Chat Completions and the newer Responses API.
Key Features:
- Function calling with automatic tool execution
- Streaming support (optional)
- Token usage tracking
- Error handling with retries
"""
from __future__ import annotations
import json
from typing import Any, Dict, List, Optional
from openai import AsyncOpenAI, APIError, RateLimitError as OpenAIRateLimitError
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from .base import (
BaseLLMProvider,
LLMResponse,
ToolCall,
ToolCallStatus,
LLMProviderError,
RateLimitError,
AuthenticationError,
)
class OpenAIProvider(BaseLLMProvider):
"""
OpenAI implementation of the LLM provider interface.
Supports:
- GPT-4, GPT-4 Turbo, GPT-3.5
- Function calling
- Parallel tool calls
- Automatic retries with exponential backoff
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize OpenAI provider.
Args:
config: Must contain:
- api_key: OpenAI API key
- model: Model name (e.g., 'gpt-4o')
- temperature: Optional temperature override
- max_tokens: Optional max tokens override
"""
super().__init__(config)
self.client = AsyncOpenAI(api_key=config["api_key"])
self.model = config["model"]
self.default_temperature = config.get("temperature", 0.0)
self.default_max_tokens = config.get("max_tokens", 4096)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(OpenAIRateLimitError),
)
async def generate(
self,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
**kwargs
) -> LLMResponse:
"""
Generate a response using OpenAI's Chat Completions API.
This method handles a single generation call. For multi-turn tool calling,
use generate_with_tools() instead.
"""
try:
# Prepare API call parameters
params = {
"model": self.model,
"messages": messages,
"temperature": temperature if temperature is not None else self.default_temperature,
"max_tokens": max_tokens if max_tokens is not None else self.default_max_tokens,
}
# Add tools if provided (OpenAI calls them "functions")
if tools:
params["tools"] = self._convert_mcp_tools_to_openai(tools)
params["tool_choice"] = "auto"
# Make API call
response = await self.client.chat.completions.create(**params)
# Parse response
choice = response.choices[0]
message = choice.message
# Extract text content
content = message.content or ""
# Extract tool calls if present
tool_calls = []
if message.tool_calls:
for tc in message.tool_calls:
try:
args = json.loads(tc.function.arguments)
except json.JSONDecodeError:
args = {}
tool_calls.append(ToolCall(
id=tc.id,
name=tc.function.name,
arguments=args,
status=ToolCallStatus.PENDING,
))
# Extract usage stats
usage = {
"prompt_tokens": response.usage.prompt_tokens if response.usage else 0,
"completion_tokens": response.usage.completion_tokens if response.usage else 0,
"total_tokens": response.usage.total_tokens if response.usage else 0,
}
return LLMResponse(
content=content,
tool_calls=tool_calls,
finish_reason=choice.finish_reason,
usage=usage,
raw_response=response,
)
except OpenAIRateLimitError as e:
raise RateLimitError(
"OpenAI rate limit exceeded",
provider="openai",
original_error=e,
)
except APIError as e:
if "invalid_api_key" in str(e).lower():
raise AuthenticationError(
"Invalid OpenAI API key",
provider="openai",
original_error=e,
)
raise LLMProviderError(
f"OpenAI API error: {str(e)}",
provider="openai",
original_error=e,
)
except Exception as e:
raise LLMProviderError(
f"Unexpected error: {str(e)}",
provider="openai",
original_error=e,
)
async def generate_with_tools(
self,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]],
max_turns: int = 10,
**kwargs
) -> tuple[str, List[Dict[str, Any]]]:
"""
Generate response with automatic tool calling loop.
This implements the full agentic workflow:
1. Call LLM with tools
2. Execute any requested tools
3. Feed results back to LLM
4. Repeat until LLM provides final answer
The conversation history is maintained and returned so the caller
can persist it for multi-turn conversations.
"""
conversation = list(messages) # Copy to avoid mutating input
for turn in range(max_turns):
# Generate response
response = await self.generate(
messages=conversation,
tools=tools,
**kwargs
)
# If no tool calls, we're done
if not response.has_tool_calls:
return response.content, conversation
# Add assistant's tool call message to conversation
tool_call_message = {
"role": "assistant",
"content": response.content or None,
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments),
}
}
for tc in response.tool_calls
]
}
conversation.append(tool_call_message)
# Execute tools (this would be done by the orchestrator in practice)
# For now, we just format them for the next turn
# The actual execution happens in the orchestrator layer
# Note: In the real implementation, the orchestrator will:
# 1. Call this method
# 2. See that tool_calls were made
# 3. Execute the tools via MCP
# 4. Add tool results to conversation
# 5. Call this method again
# For this base implementation, we return early if tools are needed
return "", conversation
# Max turns exceeded
from .base import MaxTurnsExceededError
raise MaxTurnsExceededError(
f"Tool calling loop exceeded {max_turns} turns",
provider="openai",
)
def format_tool_result(self, tool_call: ToolCall) -> Dict[str, Any]:
"""
Format tool result for OpenAI's expected format.
OpenAI expects tool results as messages with role="tool".
"""
return {
"role": "tool",
"tool_call_id": tool_call.id,
"name": tool_call.name,
"content": json.dumps(tool_call.result) if tool_call.result else "{}",
}
def _convert_mcp_tools_to_openai(self, mcp_tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Convert MCP tool format to OpenAI's function calling format.
MCP tools look like:
{
"type": "function",
"name": "get_weather",
"description": "Get weather for a location",
"parameters": {...}
}
OpenAI expects:
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a location",
"parameters": {...}
}
}
"""
openai_tools = []
for tool in mcp_tools:
openai_tools.append({
"type": "function",
"function": {
"name": tool["name"],
"description": tool.get("description", ""),
"parameters": tool.get("parameters", {}),
}
})
return openai_tools
@property
def provider_name(self) -> str:
return "openai"
@property
def model_name(self) -> str:
return self.model