"""LLM client for OpenAI-compatible APIs."""
import logging
from typing import AsyncIterator
import httpx
from .providers import ProviderConfig
logger = logging.getLogger(__name__)
class LLMClient:
"""Client for OpenAI-compatible LLM APIs."""
def __init__(self, config: ProviderConfig):
self.config = config
self.system_prompt: str | None = None
self.history: list[dict] = []
# Build headers
headers = {"Content-Type": "application/json"}
if config.api_key:
headers["Authorization"] = f"Bearer {config.api_key}"
self.client = httpx.AsyncClient(
base_url=config.api_url,
headers=headers,
timeout=120.0,
)
@property
def provider_name(self) -> str:
return self.config.name
def set_system_prompt(self, prompt: str) -> None:
"""Set the system prompt and reset conversation."""
self.system_prompt = prompt
self.history.clear()
def reset(self) -> None:
"""Reset conversation history."""
self.history.clear()
async def send_message(self, message: str) -> tuple[str, int, int]:
"""Send a message and get response.
Args:
message: User message
Returns:
Tuple of (response text, input tokens, output tokens)
"""
# Build messages
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
messages.extend(self.history)
messages.append({"role": "user", "content": message})
# Make request
try:
response = await self.client.post(
"/chat/completions",
json={
"model": self.config.model,
"messages": messages,
},
)
response.raise_for_status()
data = response.json()
# Extract response
content = data["choices"][0]["message"]["content"]
usage = data.get("usage", {})
tokens_in = usage.get("prompt_tokens", 0)
tokens_out = usage.get("completion_tokens", 0)
# Update history
self.history.append({"role": "user", "content": message})
self.history.append({"role": "assistant", "content": content})
return content, tokens_in, tokens_out
except httpx.HTTPStatusError as e:
logger.error(f"LLM API error: {e.response.status_code} - {e.response.text}")
raise
except Exception as e:
logger.error(f"LLM request failed: {e}")
raise
async def send_message_streaming(
self, message: str
) -> AsyncIterator[str]:
"""Send a message and stream the response.
Args:
message: User message
Yields:
Response text chunks
"""
# Build messages
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
messages.extend(self.history)
messages.append({"role": "user", "content": message})
# Make streaming request
full_response = ""
async with self.client.stream(
"POST",
"/chat/completions",
json={
"model": self.config.model,
"messages": messages,
"stream": True,
},
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
import json
try:
chunk = json.loads(data)
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
full_response += content
yield content
except json.JSONDecodeError:
continue
# Update history
self.history.append({"role": "user", "content": message})
self.history.append({"role": "assistant", "content": full_response})
async def close(self) -> None:
"""Close the HTTP client."""
await self.client.aclose()