openai_client.py•12.7 kB
"""
OpenAI client for AI-powered issue analysis and code generation
"""
import asyncio
import json
import logging
import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
try:
import openai
except ImportError:
openai = None
from utils.error_handler import retry_on_failure
logger = logging.getLogger(__name__)
class OpenAIError(Exception):
"""OpenAI-specific errors"""
def __init__(self, message: str, status_code: Optional[int] = None):
super().__init__(message)
self.status_code = status_code
@dataclass
class AIAnalysisResult:
"""Result of AI analysis"""
issue_key: str
analysis: str
suggested_fixes: List[Dict[str, Any]]
confidence_score: float
model_used: str
tokens_used: int
cost_estimate: float
class OpenAIClient:
"""OpenAI client for issue analysis and code generation"""
# Cost per 1K tokens (approximate, as of 2024)
COST_PER_1K_TOKENS = {
"gpt-4-turbo-preview": {"input": 0.01, "output": 0.03},
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.001, "output": 0.002}
}
def __init__(self, config: Dict[str, Any]):
if not openai:
raise OpenAIError("OpenAI library not installed. Run: pip install openai")
self.config = config
self.client = openai.AsyncOpenAI(
api_key=config["api_key"],
base_url=config.get("base_url", "https://api.openai.com/v1"),
timeout=config.get("timeout", 30)
)
self.model = config.get("model", "gpt-4-turbo-preview")
self.max_tokens = config.get("max_tokens", 2000)
self.temperature = config.get("temperature", 0.7)
self.max_retries = config.get("max_retries", 3)
logger.info(f"OpenAI client initialized with model: {self.model}")
@retry_on_failure(max_retries=3, delay=1.0)
async def test_connection(self) -> bool:
"""Test OpenAI API connection"""
try:
# Simple test call
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": "Hello"}],
max_tokens=10
)
logger.info("OpenAI connection test successful")
return True
except Exception as e:
logger.error(f"OpenAI connection test failed: {e}")
raise OpenAIError(f"Failed to connect to OpenAI: {str(e)}")
async def analyze_issue(self, issue: Dict[str, Any],
include_context: bool = True,
validation_level: str = "basic") -> AIAnalysisResult:
"""
Analyze a Jira issue and generate code fixes
Args:
issue: Jira issue data
include_context: Whether to include repository context
validation_level: Validation strictness level
Returns:
AIAnalysisResult with analysis and suggested fixes
"""
try:
# Build the analysis prompt
prompt = self._build_analysis_prompt(issue, include_context, validation_level)
# Call OpenAI API
response = await self._call_openai_api(prompt)
# Parse and validate response
analysis_data = self._parse_ai_response(response.choices[0].message.content)
# Calculate cost
tokens_used = response.usage.total_tokens if response.usage else 0
cost_estimate = self._calculate_cost(tokens_used)
# Create result
result = AIAnalysisResult(
issue_key=issue["key"],
analysis=analysis_data["analysis"],
suggested_fixes=analysis_data["suggested_fixes"],
confidence_score=analysis_data.get("confidence_score", 0.8),
model_used=self.model,
tokens_used=tokens_used,
cost_estimate=cost_estimate
)
logger.info(f"Successfully analyzed issue {issue['key']} using {self.model}")
return result
except Exception as e:
logger.error(f"Failed to analyze issue {issue['key']}: {e}")
raise OpenAIError(f"Issue analysis failed: {str(e)}")
def _build_analysis_prompt(self, issue: Dict[str, Any],
include_context: bool, validation_level: str) -> str:
"""Build the prompt for issue analysis"""
issue_key = issue.get("key", "Unknown")
summary = issue.get("fields", {}).get("summary", "No summary")
description = issue.get("fields", {}).get("description", "No description")
status = issue.get("fields", {}).get("status", {}).get("name", "Unknown")
priority = issue.get("fields", {}).get("priority", {}).get("name", "Unknown")
prompt = f"""You are an expert software engineer and SRE specialist. Analyze this Jira issue and provide code fixes.
ISSUE DETAILS:
- Key: {issue_key}
- Summary: {summary}
- Description: {description}
- Status: {status}
- Priority: {priority}
REQUIREMENTS:
1. Analyze the issue and identify the root cause
2. Suggest specific code changes to fix the issue
3. Provide confidence score (0.0 to 1.0)
4. Ensure fixes are secure and follow best practices
5. Validation level: {validation_level}
RESPONSE FORMAT (JSON):
{{
"analysis": "Detailed analysis of the issue and root cause",
"suggested_fixes": [
{{
"path": "relative/path/to/file.py",
"content": "Complete file content with fixes",
"action": "update|create|delete",
"language": "python|javascript|java|etc",
"description": "What this fix does"
}}
],
"confidence_score": 0.85,
"security_considerations": "Any security implications",
"testing_recommendations": "How to test the fix"
}}
IMPORTANT:
- Only suggest changes you're confident about
- Include complete file content, not just diffs
- Follow secure coding practices
- Consider performance implications
- Provide actionable, specific fixes"""
if include_context:
prompt += "\n\nCONTEXT: Consider this is part of a larger codebase. Ensure compatibility."
return prompt
async def _call_openai_api(self, prompt: str) -> Any:
"""Make the actual API call to OpenAI"""
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": "You are an expert software engineer specializing in bug fixes and code analysis. Always respond with valid JSON."
},
{
"role": "user",
"content": prompt
}
],
temperature=self.temperature,
max_tokens=self.max_tokens,
response_format={"type": "json_object"} # Ensure JSON response
)
return response
except openai.RateLimitError as e:
raise OpenAIError(f"Rate limit exceeded: {str(e)}", 429)
except openai.AuthenticationError as e:
raise OpenAIError(f"Authentication failed: {str(e)}", 401)
except openai.APIError as e:
raise OpenAIError(f"API error: {str(e)}", getattr(e, 'status_code', None))
except Exception as e:
raise OpenAIError(f"Unexpected error: {str(e)}")
def _parse_ai_response(self, response_content: str) -> Dict[str, Any]:
"""Parse and validate AI response"""
try:
# Parse JSON
data = json.loads(response_content)
# Validate required fields
required_fields = ["analysis", "suggested_fixes", "confidence_score"]
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
# Validate suggested_fixes structure
if not isinstance(data["suggested_fixes"], list):
raise ValueError("suggested_fixes must be a list")
for fix in data["suggested_fixes"]:
if not isinstance(fix, dict):
raise ValueError("Each fix must be a dictionary")
required_fix_fields = ["path", "content", "action"]
for field in required_fix_fields:
if field not in fix:
raise ValueError(f"Fix missing required field: {field}")
# Validate action
if fix["action"] not in ["create", "update", "delete"]:
raise ValueError(f"Invalid action: {fix['action']}")
# Validate confidence score
confidence = data["confidence_score"]
if not isinstance(confidence, (int, float)) or not (0.0 <= confidence <= 1.0):
raise ValueError("confidence_score must be a number between 0.0 and 1.0")
return data
except json.JSONDecodeError as e:
raise OpenAIError(f"Invalid JSON response: {str(e)}")
except ValueError as e:
raise OpenAIError(f"Invalid response format: {str(e)}")
def _calculate_cost(self, tokens_used: int) -> float:
"""Calculate estimated cost for the API call"""
if self.model not in self.COST_PER_1K_TOKENS:
return 0.0
# Rough estimate (assuming 70% input, 30% output)
input_tokens = int(tokens_used * 0.7)
output_tokens = int(tokens_used * 0.3)
costs = self.COST_PER_1K_TOKENS[self.model]
input_cost = (input_tokens / 1000) * costs["input"]
output_cost = (output_tokens / 1000) * costs["output"]
return round(input_cost + output_cost, 6)
async def generate_commit_message(self, issue_key: str, summary: str,
files_changed: List[str]) -> str:
"""Generate a commit message for the fix"""
try:
prompt = f"""Generate a concise, professional commit message for this fix:
Issue: {issue_key}
Summary: {summary}
Files changed: {', '.join(files_changed)}
Format: "fix({issue_key}): brief description"
Keep it under 72 characters.
Be specific about what was fixed."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=50,
temperature=0.3
)
commit_msg = response.choices[0].message.content.strip()
# Clean up the message
commit_msg = re.sub(r'^["\']|["\']$', '', commit_msg) # Remove quotes
commit_msg = commit_msg[:72] # Limit length
return commit_msg
except Exception as e:
logger.warning(f"Failed to generate commit message: {e}")
return f"fix({issue_key}): {summary[:50]}..."
async def generate_mr_description(self, issue: Dict[str, Any],
analysis_result: AIAnalysisResult) -> str:
"""Generate merge request description"""
try:
issue_key = issue["key"]
summary = issue.get("fields", {}).get("summary", "")
description = f"""## AI-Generated Fix for {issue_key}
**Issue Summary:** {summary}
**Analysis:**
{analysis_result.analysis}
**Changes Made:**
"""
for fix in analysis_result.suggested_fixes:
description += f"- `{fix['path']}`: {fix.get('description', 'Updated')}\n"
description += f"""
**Confidence Score:** {analysis_result.confidence_score:.2f}
**Model Used:** {analysis_result.model_used}
**Testing Recommendations:**
Please verify the fix by testing the affected functionality.
**Security Considerations:**
This fix has been generated with security best practices in mind.
---
*This fix was generated automatically by the SRE AI Assistant*
"""
return description
except Exception as e:
logger.warning(f"Failed to generate MR description: {e}")
return f"AI-generated fix for {issue.get('key', 'issue')}"