rag_service.py•4.21 kB
import os
import tempfile
from pathlib import Path
from typing import Any
import requests
class RagService:
"""Service class for RAG operations, equivalent to the Java RagService"""
def __init__(self):
self.content_service_url = os.getenv('CONTENT_SERVICE_URL', 'http://localhost:8080')
def get_all_contents(self, name: str | None = None, user_id: str = "invalid") -> list[dict[str, Any]]:
"""List all content names from the organization's database optionally filtered by name"""
url = f"{self.content_service_url}/contents"
params = {}
if name and name.strip():
params['name'] = name
headers = {'userId': user_id}
try:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error fetching contents: {e}")
return []
def search_contents(self, query: str, user_id: str = "invalid") -> list[str]:
"""Search through the organization's content database using semantic search"""
url = f"{self.content_service_url}/contents/search"
params = {'query': query}
headers = {'userId': user_id}
try:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error searching contents: {e}")
return []
def delete_content(self, content_id: str, user_id: str = "invalid") -> None:
"""Delete specific content from the organization's database"""
url = f"{self.content_service_url}/contents/{content_id}"
headers = {'userId': user_id}
try:
response = requests.delete(url, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
print(f"Error deleting content: {e}")
raise
def upload_file(self, content: str, file_name: str, role: str, user_id: str = "invalid") -> None:
"""Upload content file about the organization"""
url = f"{self.content_service_url}/contents/file"
headers = {'userId': user_id}
# Create temporary file
temp_file = self._convert_string_to_file(content, file_name)
try:
with open(temp_file, 'rb') as f:
files = {'file': (file_name, f, 'text/plain')}
data = {
'role': role,
'userId': user_id
}
response = requests.post(url, files=files, data=data, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
print(f"Error uploading file: {e}")
raise
finally:
# Clean up temporary file
if temp_file.exists():
temp_file.unlink()
def upload_url(self, content_url: str, role: str, user_id: str = "invalid") -> None:
"""Upload content url about the organization"""
url = f"{self.content_service_url}/contents/url"
headers = {'userId': user_id}
data = {
'url': content_url,
'role': role,
'userId': user_id
}
try:
response = requests.post(url, data=data, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
print(f"Error uploading URL: {e}")
raise
def _convert_string_to_file(self, content: str, file_name: str) -> Path:
"""Convert string content to a temporary file"""
try:
# Create a temporary file
temp_file = tempfile.NamedTemporaryFile(
mode='w',
suffix='.txt',
prefix=file_name.replace('.', '_'),
delete=False
)
temp_file.write(content)
temp_file.close()
return Path(temp_file.name)
except Exception as e:
raise RuntimeError(f"Error converting string to file: {e}") from e