"""
Structured logging system for Marketing Automation MCP
Provides consistent, searchable logs with performance tracking
"""
import logging
import json
import sys
import time
import traceback
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Union
from contextlib import contextmanager
from logging.handlers import RotatingFileHandler
import structlog
from pythonjsonlogger import jsonlogger
# Performance metrics storage
_performance_metrics: Dict[str, list] = {}
class PerformanceFilter(logging.Filter):
"""Add performance metrics to log records"""
def filter(self, record):
# Add timestamp
record.timestamp = datetime.utcnow().isoformat()
# Add performance context if available
if hasattr(record, 'duration_ms'):
record.performance = {
'duration_ms': record.duration_ms,
'category': getattr(record, 'category', 'general')
}
return True
class CustomJsonFormatter(jsonlogger.JsonFormatter):
"""Custom JSON formatter with additional fields"""
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
# Add standard fields
log_record['timestamp'] = datetime.utcnow().isoformat()
log_record['level'] = record.levelname
log_record['logger'] = record.name
# Add location info
log_record['location'] = {
'file': record.filename,
'line': record.lineno,
'function': record.funcName
}
# Add exception info if present
if record.exc_info:
log_record['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': traceback.format_exception(*record.exc_info)
}
# Add custom fields from extra
if hasattr(record, 'extra_fields'):
log_record.update(record.extra_fields)
def setup_logging(
level: str = "INFO",
log_file: Optional[str] = None,
structured: bool = True,
performance_tracking: bool = True
) -> logging.Logger:
"""Setup structured logging system"""
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer() if structured else structlog.dev.ConsoleRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Setup root logger
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
# Remove existing handlers
root_logger.handlers = []
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
if structured:
formatter = CustomJsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s',
json_ensure_ascii=False
)
else:
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
console_handler.addFilter(PerformanceFilter())
root_logger.addHandler(console_handler)
# File handler if specified
if log_file:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10485760, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
file_handler.addFilter(PerformanceFilter())
root_logger.addHandler(file_handler)
# Setup performance tracking
if performance_tracking:
setup_performance_tracking()
return root_logger
def get_logger(name: str) -> structlog.BoundLogger:
"""Get a structured logger instance"""
return structlog.get_logger(name)
@contextmanager
def log_performance(
operation: str,
logger: Optional[structlog.BoundLogger] = None,
category: str = "general",
**extra_fields
):
"""Context manager for logging performance metrics"""
if logger is None:
logger = get_logger(__name__)
start_time = time.time()
# Log start
logger.info(
f"Starting {operation}",
operation=operation,
category=category,
**extra_fields
)
try:
yield
# Calculate duration
duration_ms = (time.time() - start_time) * 1000
# Store metric
if category not in _performance_metrics:
_performance_metrics[category] = []
_performance_metrics[category].append({
'operation': operation,
'duration_ms': duration_ms,
'timestamp': datetime.utcnow().isoformat(),
'success': True
})
# Log completion
logger.info(
f"Completed {operation}",
operation=operation,
category=category,
duration_ms=round(duration_ms, 2),
**extra_fields
)
except Exception as e:
# Calculate duration
duration_ms = (time.time() - start_time) * 1000
# Store metric
if category not in _performance_metrics:
_performance_metrics[category] = []
_performance_metrics[category].append({
'operation': operation,
'duration_ms': duration_ms,
'timestamp': datetime.utcnow().isoformat(),
'success': False,
'error': str(e)
})
# Log error
logger.error(
f"Failed {operation}",
operation=operation,
category=category,
duration_ms=round(duration_ms, 2),
error=str(e),
**extra_fields
)
raise
def log_api_call(
logger: structlog.BoundLogger,
method: str,
endpoint: str,
status_code: Optional[int] = None,
duration_ms: Optional[float] = None,
platform: Optional[str] = None,
**extra_fields
):
"""Log API call with standardized format"""
log_data = {
'api_call': {
'method': method,
'endpoint': endpoint,
'platform': platform
}
}
if status_code:
log_data['api_call']['status_code'] = status_code
if duration_ms:
log_data['api_call']['duration_ms'] = round(duration_ms, 2)
log_data.update(extra_fields)
if status_code and 200 <= status_code < 300:
logger.info("API call succeeded", **log_data)
else:
logger.error("API call failed", **log_data)
def log_automation_task(
logger: structlog.BoundLogger,
task_type: str,
task_name: str,
status: str,
time_saved_minutes: Optional[float] = None,
cost_saved: Optional[float] = None,
**extra_fields
):
"""Log automation task with ROI metrics"""
log_data = {
'automation': {
'task_type': task_type,
'task_name': task_name,
'status': status
}
}
if time_saved_minutes:
log_data['automation']['time_saved_minutes'] = round(time_saved_minutes, 2)
if cost_saved:
log_data['automation']['cost_saved'] = round(cost_saved, 2)
log_data.update(extra_fields)
logger.info("Automation task completed", **log_data)
def log_security_event(
logger: structlog.BoundLogger,
event_type: str,
severity: str,
details: Dict[str, Any],
**extra_fields
):
"""Log security-related events"""
log_data = {
'security': {
'event_type': event_type,
'severity': severity,
'details': details
}
}
log_data.update(extra_fields)
if severity in ['critical', 'high']:
logger.error("Security event detected", **log_data)
else:
logger.warning("Security event detected", **log_data)
def get_performance_metrics(category: Optional[str] = None) -> Dict[str, Any]:
"""Get collected performance metrics"""
if category:
metrics = _performance_metrics.get(category, [])
else:
metrics = _performance_metrics
# Calculate statistics
stats = {}
for cat, measurements in (metrics.items() if isinstance(metrics, dict) else [(category, metrics)]):
if measurements:
durations = [m['duration_ms'] for m in measurements]
success_count = sum(1 for m in measurements if m.get('success', True))
stats[cat] = {
'count': len(measurements),
'success_rate': success_count / len(measurements),
'avg_duration_ms': sum(durations) / len(durations),
'min_duration_ms': min(durations),
'max_duration_ms': max(durations),
'total_duration_ms': sum(durations)
}
return stats
def clear_performance_metrics():
"""Clear collected performance metrics"""
global _performance_metrics
_performance_metrics = {}
def setup_performance_tracking():
"""Setup automatic performance tracking"""
# This could be extended to send metrics to monitoring systems
pass
# Convenience functions for common log patterns
def log_campaign_optimization(
logger: structlog.BoundLogger,
campaign_id: str,
optimization_type: str,
before_metrics: Dict[str, float],
after_metrics: Dict[str, float],
confidence: float
):
"""Log campaign optimization with before/after metrics"""
improvements = {}
for metric, after_value in after_metrics.items():
if metric in before_metrics:
before_value = before_metrics[metric]
if before_value > 0:
improvement = ((after_value - before_value) / before_value) * 100
improvements[f"{metric}_improvement"] = round(improvement, 2)
logger.info(
"Campaign optimization completed",
campaign_id=campaign_id,
optimization_type=optimization_type,
confidence=round(confidence, 2),
improvements=improvements,
metrics={
'before': before_metrics,
'after': after_metrics
}
)
def log_roi_metrics(
logger: structlog.BoundLogger,
period: str,
hours_saved: float,
cost_saved: float,
roi_percentage: float,
campaign_improvements: Dict[str, float]
):
"""Log ROI metrics with campaign improvements"""
logger.info(
"ROI metrics calculated",
period=period,
roi={
'hours_saved': round(hours_saved, 2),
'cost_saved': round(cost_saved, 2),
'roi_percentage': round(roi_percentage, 2),
'efficiency_gain': '75% reduction in optimization time',
'campaign_roi_improvement': 'average 23% improvement'
},
campaign_improvements=campaign_improvements
)
# Initialize logging on import
setup_logging()