"""
FastAPI service for TaskManager operations with x402 payment integration.
Converts TaskManager class methods to HTTP endpoints.
"""
import os
from datetime import datetime
from typing import Dict, Any
from fastapi import FastAPI, HTTPException, status
from dotenv import load_dotenv
from x402.fastapi.middleware import require_payment
from ..core.task_manager import TaskManager
from ..utils.logging_config import setup_logging
from ..utils.error_handling import handle_database_error
# Load environment variables
load_dotenv()
# Configure logging
logger = setup_logging(__name__)
# Get payment configuration from environment
PAY_TO_ADDRESS = os.getenv("PAY_TO_ADDRESS", "0x671cE47E4F38051ba3A990Ba306E2885C2Fe4102")
X402_NETWORK = os.getenv("X402_NETWORK", "base-sepolia")
TASK_CREATION_PRICE = os.getenv("TASK_CREATION_PRICE", "$0.01")
API_HOST = os.getenv("API_HOST", "0.0.0.0")
API_PORT = int(os.getenv("API_PORT", "8000"))
# Initialize FastAPI app
app = FastAPI(
title="TimeLooker Task Manager API",
description="API service for managing TimeLooker search monitoring tasks with x402 payment integration",
version="1.0.0"
)
# Apply payment middleware for create task endpoint
app.middleware("http")(
require_payment(
path="/tasks/",
amount=TASK_CREATION_PRICE,
pay_to_address=PAY_TO_ADDRESS,
network_id=X402_NETWORK,
)
)
# Initialize TaskManager instance
task_manager = TaskManager()
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
@app.post("/tasks/")
async def create_task(request: Dict[str, Any]):
"""Create a new monitoring task."""
try:
task_id = task_manager.create_task(
task_description=request["task_description"],
frequency_minutes=request["frequency_minutes"],
runtime_minutes=request.get("runtime_minutes", 1440),
recipient_email=request["recipient_email"],
sender_email=request.get("sender_email", "fortnightlydevs@gmail.com")
)
return {"task_id": task_id}
except Exception as e:
logger.error(f"Error creating task: {e}")
raise handle_database_error(e, "create task")
@app.get("/tasks/{task_id}")
async def get_task(task_id: int):
"""Get task configuration by ID."""
try:
task_config = task_manager.get_task(task_id)
if not task_config:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task {task_id} not found"
)
return task_config
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting task {task_id}: {e}")
raise handle_database_error(e, f"get task {task_id}")
@app.get("/tasks/{task_id}/results")
async def get_previous_search_results(task_id: int, limit: int = 100):
"""Get previous search results for a task."""
try:
results = task_manager.get_previous_search_results(task_id, limit)
return {"results": results}
except Exception as e:
logger.error(f"Error getting previous results for task {task_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting previous results: {str(e)}"
)
@app.post("/tasks/{task_id}/results")
async def save_search_results(task_id: int, request: Dict[str, Any]):
"""Save current search results to database."""
try:
success = task_manager.save_search_results(task_id, request["search_results"])
return {
"success": success,
"message": f"Saved {len(request['search_results'])} results" if success else "Failed to save results"
}
except Exception as e:
logger.error(f"Error saving search results for task {task_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error saving search results: {str(e)}"
)
@app.post("/executions/")
async def create_execution_record(task_id: int):
"""Create a new execution record."""
try:
execution_id = task_manager.create_execution_record(task_id)
return {"execution_id": execution_id}
except Exception as e:
logger.error(f"Error creating execution record for task {task_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating execution record: {str(e)}"
)
@app.put("/executions/{execution_id}")
async def update_execution_record(execution_id: int, request: Dict[str, Any]):
"""Update an execution record."""
try:
success = task_manager.update_execution_record(
execution_id=execution_id,
status=request["status"],
items_found=request.get("items_found", 0),
new_items_count=request.get("new_items_count", 0),
notification_sent=request.get("notification_sent", False),
error_message=request.get("error_message")
)
return {
"success": success,
"message": "Execution record updated" if success else "Failed to update execution record"
}
except Exception as e:
logger.error(f"Error updating execution record {execution_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error updating execution record: {str(e)}"
)
@app.post("/tasks/{task_id}/notify")
async def send_email_notification(task_id: int, request: Dict[str, Any]):
"""Send email notification about new items found."""
try:
success = task_manager.send_email_notification(
request["task_config"],
request["new_items"]
)
return {
"success": success,
"message": "Email notification sent" if success else "Failed to send email notification"
}
except Exception as e:
logger.error(f"Error sending email notification for task {task_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error sending email notification: {str(e)}"
)
@app.get("/tasks/{task_id}/should-run")
async def should_task_run(task_id: int):
"""Check if a task should run based on its frequency and last execution."""
try:
should_run = task_manager.should_task_run(task_id)
return {"should_run": should_run}
except Exception as e:
logger.error(f"Error checking if task {task_id} should run: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error checking task run status: {str(e)}"
)
@app.delete("/tasks/{task_id}")
async def deactivate_task(task_id: int):
"""Deactivate a task (soft delete)."""
try:
success = task_manager.deactivate_task(task_id)
return {
"success": success,
"message": "Task deactivated" if success else "Failed to deactivate task"
}
except Exception as e:
logger.error(f"Error deactivating task {task_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deactivating task: {str(e)}"
)
@app.get("/tasks/")
async def get_active_tasks():
"""Get all active tasks."""
try:
tasks = task_manager.get_active_tasks()
return {"tasks": tasks}
except Exception as e:
logger.error(f"Error getting active tasks: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting active tasks: {str(e)}"
)
if __name__ == "__main__":
import uvicorn
logger.info(f"Starting TimeLooker Task Manager API with x402 payment integration")
logger.info(f"Payment required for POST /tasks/ - {TASK_CREATION_PRICE} to {PAY_TO_ADDRESS}")
logger.info(f"Server starting on {API_HOST}:{API_PORT}")
uvicorn.run(app, host=API_HOST, port=API_PORT)