"""
HTTP client wrapper for TaskManager API with x402 payment integration.
Maintains the same interface as the original TaskManager class for drop-in replacement.
"""
import os
import asyncio
from typing import List, Dict, Any, Optional
from eth_account import Account
from x402.clients.httpx import x402HttpxClient
from x402.clients.base import decode_x_payment_response
from ..utils.logging_config import get_logger
class TaskManager:
"""
HTTP client wrapper that maintains the same interface as the original TaskManager class.
Makes HTTP calls to the FastAPI TaskManager service with x402 payment support.
"""
def __init__(self, base_url: Optional[str] = None, private_key: Optional[str] = None):
"""
Initialize TaskManager HTTP client with x402 payment support.
Args:
base_url: Base URL of the TaskManager API service.
Defaults to environment variable TASK_MANAGER_API_URL or http://localhost:8000
private_key: Ethereum private key for payments.
Defaults to environment variable PRIVATE_KEY
"""
self.logger = get_logger(__name__)
self.base_url = base_url or os.getenv('TASK_MANAGER_API_URL', 'http://localhost:8000')
self.base_url = self.base_url.rstrip('/') # Remove trailing slash
# Setup Ethereum account for x402 payments
private_key = private_key or os.getenv('PRIVATE_KEY')
if not private_key:
self.logger.warning("No PRIVATE_KEY provided - payment-required endpoints will fail")
self.account = None
else:
try:
self.account = Account.from_key(private_key)
self.logger.info(f"Initialized payment account: {self.account.address}")
except Exception as e:
self.logger.error(f"Failed to initialize payment account: {e}")
self.account = None
self.logger.info(f"TaskManager client initialized with base URL: {self.base_url}")
async def _make_async_request(self, method: str, endpoint: str, **kwargs) -> Any:
"""
Make HTTP request with x402 payment support.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint path
**kwargs: Additional arguments for httpx request
Returns:
Response data
Raises:
Exception: If request fails after retries
"""
url = f"{self.base_url}{endpoint}"
try:
if not self.account:
raise Exception("No PRIVATE_KEY configured - cannot make requests to x402-enabled API")
# Use x402HttpxClient for all requests (handles both paid and free endpoints)
async with x402HttpxClient(account=self.account, base_url=self.base_url) as client:
response = await client.request(method, endpoint, **kwargs)
# Log payment information if present
if "X-Payment-Response" in response.headers:
try:
payment_response = decode_x_payment_response(
response.headers["X-Payment-Response"]
)
self.logger.info(f"Payment made - Transaction: {payment_response.get('transaction')}")
except Exception as e:
self.logger.warning(f"Failed to decode payment response: {e}")
content = await response.aread()
# Handle different response types
if response.headers.get('content-type', '').startswith('application/json'):
import json
return json.loads(content.decode())
else:
return content.decode()
except Exception as e:
self.logger.error(f"Error calling {method} {url}: {e}")
raise Exception(f"Task Manager API error: {e}")
async def create_task(self, task_description: str, frequency_minutes: int, runtime_minutes: int,
recipient_email: str, sender_email: str = None) -> int:
"""
Create a new monitoring task.
Args:
task_description: Natural language description of what to search for
frequency_minutes: How often to check for updates (in minutes)
runtime_minutes: How long the task should run (in minutes)
recipient_email: Email address to send notifications to
sender_email: Email address to send notifications from
Returns:
Task ID of the created task
"""
try:
data = {
"task_description": task_description,
"frequency_minutes": frequency_minutes,
"runtime_minutes": runtime_minutes,
"recipient_email": recipient_email,
"sender_email": sender_email or os.getenv("DEFAULT_SENDER_EMAIL")
}
response = await self._make_async_request("POST", "/tasks/", json=data)
return response["task_id"]
except Exception as e:
self.logger.error(f"Error creating task: {e}")
raise
async def get_task(self, task_id: int) -> Optional[Dict[str, Any]]:
"""
Get task configuration by ID.
Args:
task_id: ID of the task to retrieve
Returns:
Task configuration as dictionary, or None if not found
"""
try:
response = await self._make_async_request("GET", f"/tasks/{task_id}")
return dict(response)
except Exception as e:
if "404" in str(e):
return None
self.logger.error(f"Error getting task {task_id}: {e}")
return None
async def get_previous_search_results(self, task_id: int, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get previous search results for a task.
Args:
task_id: ID of the task
limit: Maximum number of results to return
Returns:
List of previous search results
"""
try:
params = {"limit": limit}
response = await self._make_async_request("GET", f"/tasks/{task_id}/results", params=params)
return response.get("results", [])
except Exception as e:
self.logger.error(f"Error getting previous results for task {task_id}: {e}")
return []
async def save_search_results(self, task_id: int, search_results: List[Dict[str, Any]]) -> bool:
"""
Save current search results to database.
Args:
task_id: ID of the task
search_results: List of search results to save
Returns:
True if successful, False otherwise
"""
try:
data = {"search_results": search_results}
response = await self._make_async_request("POST", f"/tasks/{task_id}/results", json=data)
return response.get("success", False)
except Exception as e:
self.logger.error(f"Error saving search results for task {task_id}: {e}")
return False
async def create_execution_record(self, task_id: int) -> int:
"""
Create a new execution record.
Args:
task_id: ID of the task being executed
Returns:
Execution ID
"""
try:
params = {"task_id": task_id}
response = await self._make_async_request("POST", "/executions/", params=params)
return response["execution_id"]
except Exception as e:
self.logger.error(f"Error creating execution record for task {task_id}: {e}")
raise
async def update_execution_record(self, execution_id: int, status: str, items_found: int = 0,
new_items_count: int = 0, notification_sent: bool = False,
error_message: str = None) -> bool:
"""
Update an execution record.
Args:
execution_id: ID of the execution record
status: New status (completed, failed, timeout)
items_found: Total number of items found
new_items_count: Number of new items found
notification_sent: Whether notification was sent
error_message: Error message if any
Returns:
True if successful, False otherwise
"""
try:
data = {
"status": status,
"items_found": items_found,
"new_items_count": new_items_count,
"notification_sent": notification_sent,
"error_message": error_message
}
response = await self._make_async_request("PUT", f"/executions/{execution_id}", json=data)
return response.get("success", False)
except Exception as e:
self.logger.error(f"Error updating execution record {execution_id}: {e}")
return False
async def send_email_notification(self, task_config: Dict[str, Any], new_items: List[Dict[str, Any]]) -> bool:
"""
Send email notification about new items found.
Args:
task_config: Task configuration dictionary
new_items: List of new items to include in email
Returns:
True if email was sent successfully, False otherwise
"""
try:
task_id = task_config.get("id")
data = {
"task_config": task_config,
"new_items": new_items
}
response = await self._make_async_request("POST", f"/tasks/{task_id}/notify", json=data)
return response.get("success", False)
except Exception as e:
self.logger.error(f"Error sending email notification: {e}")
return False
async def should_task_run(self, task_id: int) -> bool:
"""
Check if a task should run based on its frequency and last execution.
Args:
task_id: ID of the task to check
Returns:
True if task should run, False otherwise
"""
try:
response = await self._make_async_request("GET", f"/tasks/{task_id}/should-run")
return response.get("should_run", False)
except Exception as e:
self.logger.error(f"Error checking if task {task_id} should run: {e}")
return False
async def deactivate_task(self, task_id: int) -> bool:
"""
Deactivate a task (soft delete).
Args:
task_id: ID of the task to deactivate
Returns:
True if successful, False otherwise
"""
try:
response = await self._make_async_request("DELETE", f"/tasks/{task_id}")
return response.get("success", False)
except Exception as e:
self.logger.error(f"Error deactivating task {task_id}: {e}")
return False
async def get_active_tasks(self) -> List[Dict[str, Any]]:
"""
Get all active tasks.
Returns:
List of active task configurations
"""
try:
response = await self._make_async_request("GET", "/tasks/")
return response.get("tasks", [])
except Exception as e:
self.logger.error(f"Error getting active tasks: {e}")
return []