"""Example 2: Manager/Business Logic Logging
Demonstrates manual logging in the logic layer (managers, orchestrators).
This pattern is used for business operations, state changes, and resource management.
Uses standard Python logging with correlation ID support.
"""
import asyncio
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from src.utils import get_global_logger, setup_correlation_logging
from src.utils.context import set_correlation_id, generate_correlation_id
# Setup correlation logging
setup_correlation_logging()
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s: %(message)s")
@dataclass
class Workflow:
"""Example workflow model."""
workflow_id: str
name: str
parameters: dict
created_at: datetime
class WorkflowManager:
"""Example manager class with comprehensive logging."""
def __init__(self, workflows_dir: Path):
self.logger = get_global_logger(__name__)
self.workflows_dir = workflows_dir
self._cache = {}
def load_workflow(self, workflow_id: str) -> Workflow:
"""Load workflow with caching and logging."""
# Check cache first
if workflow_id in self._cache:
self.logger.debug(
f"Workflow cache hit: {workflow_id} (cache_size={len(self._cache)})"
)
return self._cache[workflow_id]
# Cache miss - load from disk
self.logger.info(f"Loading workflow from disk: {workflow_id}")
try:
# Simulate file read
import time
time.sleep(0.1)
# Mock workflow data
workflow_data = {
"workflow_id": workflow_id,
"name": f"Workflow {workflow_id}",
"parameters": {
"PARAM_PROMPT": {"type": "str", "required": True},
"PARAM_INT_STEPS": {"type": "int", "default": 20},
},
}
workflow = Workflow(
workflow_id=workflow_id,
name=workflow_data["name"],
parameters=workflow_data["parameters"],
created_at=datetime.now(timezone.utc),
)
# Cache it
self._cache[workflow_id] = workflow
self.logger.debug(
f"Workflow loaded successfully: {workflow_id} "
f"(params={len(workflow.parameters)}, cache_size={len(self._cache)})"
)
return workflow
except FileNotFoundError:
path = self.workflows_dir / f"{workflow_id}.json"
self.logger.error(
f"Workflow file not found: {workflow_id} (path={path})",
exc_info=True
)
raise
except json.JSONDecodeError as e:
self.logger.error(
f"Invalid workflow JSON: {workflow_id} (error={e})",
exc_info=True
)
raise
def list_workflows(self) -> list[str]:
"""List available workflows with logging."""
self.logger.info(f"Listing workflows (cache_size={len(self._cache)})")
# Simulate directory scan
import time
time.sleep(0.05)
workflow_ids = ["generate_image", "generate_song", "basic_api_test"]
self.logger.debug(f"Found {len(workflow_ids)} workflows: {workflow_ids}")
return workflow_ids
def apply_overrides(self, workflow: Workflow, overrides: dict) -> Workflow:
"""Apply parameter overrides with validation logging."""
self.logger.info(
f"Applying workflow overrides: {workflow.workflow_id} "
f"(override_count={len(overrides)}, keys={list(overrides.keys())})"
)
valid_overrides = {}
invalid_overrides = []
for key, value in overrides.items():
if key in workflow.parameters:
valid_overrides[key] = value
else:
invalid_overrides.append(key)
if invalid_overrides:
self.logger.warning(
f"Ignoring invalid overrides for {workflow.workflow_id}: "
f"{invalid_overrides} (valid={list(valid_overrides.keys())})"
)
# Apply valid overrides
workflow.parameters.update(valid_overrides)
self.logger.debug(
f"Overrides applied successfully: {workflow.workflow_id} "
f"(applied={len(valid_overrides)})"
)
return workflow
def cleanup_cache(self, max_age_seconds: int = 3600) -> int:
"""Clean up old cache entries with logging."""
self.logger.info(
f"Starting cache cleanup (current_size={len(self._cache)}, "
f"max_age={max_age_seconds}s)"
)
now = datetime.now(timezone.utc)
to_remove = []
for workflow_id, workflow in self._cache.items():
age_seconds = (now - workflow.created_at).total_seconds()
if age_seconds > max_age_seconds:
to_remove.append(workflow_id)
for workflow_id in to_remove:
del self._cache[workflow_id]
self.logger.debug(f"Removed expired cache entry: {workflow_id}")
self.logger.info(
f"Cache cleanup complete: removed={len(to_remove)}, "
f"remaining={len(self._cache)}"
)
return len(to_remove)
def main():
"""Run manager examples."""
# Set correlation ID for this execution
correlation_id = generate_correlation_id()
set_correlation_id(correlation_id)
workflows_dir = Path("./workflows")
manager = WorkflowManager(workflows_dir)
print("\n=== Example 1: List workflows ===")
workflows = manager.list_workflows()
print(f"Workflows: {workflows}\n")
print("\n=== Example 2: Load workflow (cache miss) ===")
workflow1 = manager.load_workflow("generate_image")
print(f"Loaded: {workflow1.name}\n")
print("\n=== Example 3: Load workflow (cache hit) ===")
workflow2 = manager.load_workflow("generate_image")
print(f"Loaded (cached): {workflow2.name}\n")
print("\n=== Example 4: Apply overrides ===")
overrides = {
"PARAM_PROMPT": "a beautiful sunset",
"PARAM_INT_STEPS": 30,
"INVALID_PARAM": "should be ignored",
}
workflow_modified = manager.apply_overrides(workflow1, overrides)
print(f"Applied overrides: {list(overrides.keys())}\n")
print("\n=== Example 5: Cache cleanup ===")
removed = manager.cleanup_cache(max_age_seconds=0) # Remove all
print(f"Removed {removed} cache entries\n")
if __name__ == "__main__":
main()