"""
Materialized View Engine (MVE) API client for accessing materialized view data.
Uses configuration-based credentials instead of hardcoded values.
"""
import base64
import requests
from typing import Dict, Any, Optional
from .config import ServerConfig
class MVEClient:
"""Client for accessing Materialized View Engine API."""
def __init__(self, config: ServerConfig):
self.config = config
self.mve_api_base = config.get_mve_api_base()
self.credentials = config.get_mve_credentials()
if not self.mve_api_base:
raise ValueError("MVE API base URL not configured. Set MVE_API_BASE environment variable or add mve_api_base to config file.")
if not self.credentials:
raise ValueError("MVE API credentials not configured. Set MVE_USERNAME and MVE_PASSWORD environment variables or add mve_username and mve_password to config file.")
def _get_headers(self) -> Dict[str, str]:
"""Get authentication headers for MVE API."""
username, password = self.credentials
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
return {
'Authorization': f'Basic {encoded_credentials}',
'Content-Type': 'application/json',
'X-API-Key': '7ba11aa3-6df2-4d19-bb02-86da4f7a9d9d' # This might need to be configurable too
}
def query_materialized_view(self, job_id: int, mv_name: str, limit: int = 100) -> Dict[str, Any]:
"""
Query a materialized view by job ID and MV name.
Args:
job_id: The SSB job ID
mv_name: The materialized view name
limit: Maximum number of records to return
Returns:
Dictionary containing the query results
"""
if not self.mve_api_base:
raise ValueError("MVE API base URL not configured")
url = f"{self.mve_api_base}/query/{job_id}/{mv_name}"
params = {
'key': '7ba11aa3-6df2-4d19-bb02-86da4f7a9d9d', # This might need to be configurable too
'limit': limit
}
try:
response = requests.get(
url,
headers=self._get_headers(),
params=params,
verify=False, # Use config.verify_ssl instead
timeout=30
)
if response.status_code == 200:
return {
'status': 'success',
'data': response.json(),
'record_count': len(response.json()) if isinstance(response.json(), list) else 1
}
else:
return {
'status': 'error',
'error_code': response.status_code,
'error_message': response.text
}
except Exception as e:
return {
'status': 'error',
'error_code': 'exception',
'error_message': str(e)
}
def list_materialized_views(self, job_id: int) -> Dict[str, Any]:
"""
List materialized views for a specific job.
Args:
job_id: The SSB job ID
Returns:
Dictionary containing the list of materialized views
"""
if not self.mve_api_base:
raise ValueError("MVE API base URL not configured")
url = f"{self.mve_api_base}/jobs/{job_id}/mvs"
try:
response = requests.get(
url,
headers=self._get_headers(),
verify=False, # Use config.verify_ssl instead
timeout=30
)
if response.status_code == 200:
return {
'status': 'success',
'data': response.json()
}
else:
return {
'status': 'error',
'error_code': response.status_code,
'error_message': response.text
}
except Exception as e:
return {
'status': 'error',
'error_code': 'exception',
'error_message': str(e)
}