"""
统一日志系统
提供结构化日志记录、性能监控和错误追踪功能。
"""
import logging
import sys
import time
import traceback
from typing import Optional, Dict, Any, Union
from pathlib import Path
from enum import Enum
import json
from datetime import datetime
class LogLevel(Enum):
"""日志级别"""
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class LoggerCategory(Enum):
"""日志类别"""
SYSTEM = "system"
SECURITY = "security"
PERFORMANCE = "performance"
USER_ACTION = "user_action"
CACHE = "cache"
FILE_OPERATION = "file_operation"
TEMPLATE = "template"
NETWORK = "network"
class StructuredLogger:
"""结构化日志记录器"""
def __init__(
self,
name: str,
level: LogLevel = LogLevel.INFO,
log_file: Optional[str] = None,
format_type: str = "json"
):
"""
初始化结构化日志记录器
Args:
name: 记录器名称
level: 日志级别
log_file: 日志文件路径
format_type: 日志格式 (json/text)
"""
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, level.value))
# 清除现有处理器
self.logger.handlers.clear()
# 创建格式化器
if format_type == "json":
formatter = self._create_json_formatter()
else:
formatter = self._create_text_formatter()
# 添加控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# 添加文件处理器(如果指定)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def _create_json_formatter(self) -> logging.Formatter:
"""创建JSON格式化器"""
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# 添加额外的结构化数据
if hasattr(record, 'category'):
log_data['category'] = record.category.value
if hasattr(record, 'user_id'):
log_data['user_id'] = record.user_id
if hasattr(record, 'request_id'):
log_data['request_id'] = record.request_id
if hasattr(record, 'duration_ms'):
log_data['duration_ms'] = record.duration_ms
if hasattr(record, 'metadata'):
log_data['metadata'] = record.metadata
# 添加异常信息
if record.exc_info:
log_data['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': traceback.format_exception(*record.exc_info)
}
return json.dumps(log_data, ensure_ascii=False)
return JSONFormatter()
def _create_text_formatter(self) -> logging.Formatter:
"""创建文本格式化器"""
return logging.Formatter(
fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def debug(
self,
message: str,
category: LoggerCategory = LoggerCategory.SYSTEM,
**kwargs
):
"""记录调试信息"""
self._log(LogLevel.DEBUG, message, category, **kwargs)
def info(
self,
message: str,
category: LoggerCategory = LoggerCategory.SYSTEM,
**kwargs
):
"""记录信息"""
self._log(LogLevel.INFO, message, category, **kwargs)
def warning(
self,
message: str,
category: LoggerCategory = LoggerCategory.SYSTEM,
**kwargs
):
"""记录警告"""
self._log(LogLevel.WARNING, message, category, **kwargs)
def error(
self,
message: str,
category: LoggerCategory = LoggerCategory.SYSTEM,
exception: Optional[Exception] = None,
**kwargs
):
"""记录错误"""
self._log(LogLevel.ERROR, message, category, exception=exception, **kwargs)
def critical(
self,
message: str,
category: LoggerCategory = LoggerCategory.SYSTEM,
exception: Optional[Exception] = None,
**kwargs
):
"""记录致命错误"""
self._log(LogLevel.CRITICAL, message, category, exception=exception, **kwargs)
def _log(
self,
level: LogLevel,
message: str,
category: LoggerCategory,
exception: Optional[Exception] = None,
**kwargs
):
"""内部日志记录方法"""
extra = {
'category': category,
'metadata': kwargs
}
if exception:
self.logger.log(
getattr(logging, level.value),
f"{message}: {str(exception)}",
exc_info=(type(exception), exception, exception.__traceback__),
extra=extra
)
else:
self.logger.log(
getattr(logging, level.value),
message,
extra=extra
)
class PerformanceLogger:
"""性能日志记录器"""
def __init__(self, logger: StructuredLogger):
self.logger = logger
self.start_times: Dict[str, float] = {}
def start_timer(self, operation_id: str):
"""开始计时"""
self.start_times[operation_id] = time.time()
self.logger.debug(
f"开始操作: {operation_id}",
category=LoggerCategory.PERFORMANCE,
operation_id=operation_id
)
def end_timer(self, operation_id: str, metadata: Optional[Dict[str, Any]] = None):
"""结束计时"""
if operation_id not in self.start_times:
self.logger.warning(
f"操作 {operation_id} 未开始计时",
category=LoggerCategory.PERFORMANCE
)
return
duration = time.time() - self.start_times[operation_id]
del self.start_times[operation_id]
log_data = {
'operation_id': operation_id,
'duration_ms': round(duration * 1000, 2)
}
if metadata:
log_data['metadata'] = metadata
self.logger.info(
f"操作完成: {operation_id}",
category=LoggerCategory.PERFORMANCE,
**log_data
)
def log_slow_query(self, query: str, duration: float, threshold: float = 1.0):
"""记录慢查询"""
if duration > threshold:
self.logger.warning(
f"慢查询检测: {query}",
category=LoggerCategory.PERFORMANCE,
query=query,
duration_ms=round(duration * 1000, 2),
threshold_ms=round(threshold * 1000, 2)
)
class SecurityLogger:
"""安全日志记录器"""
def __init__(self, logger: StructuredLogger):
self.logger = logger
def log_access_attempt(self, user_id: str, resource: str, success: bool):
"""记录访问尝试"""
self.logger.info(
f"访问尝试: {user_id} -> {resource}",
category=LoggerCategory.SECURITY,
user_id=user_id,
resource=resource,
access_granted=success
)
def log_security_event(self, event_type: str, details: Dict[str, Any]):
"""记录安全事件"""
self.logger.warning(
f"安全事件: {event_type}",
category=LoggerCategory.SECURITY,
event_type=event_type,
**details
)
def log_permission_denied(self, user_id: str, resource: str, reason: str):
"""记录权限拒绝"""
self.logger.warning(
f"权限拒绝: {user_id} 无法访问 {resource}",
category=LoggerCategory.SECURITY,
user_id=user_id,
resource=resource,
reason=reason
)
# 全局日志记录器实例
def create_logger(
name: str,
level: LogLevel = LogLevel.INFO,
log_file: Optional[str] = None
) -> StructuredLogger:
"""创建日志记录器"""
# 如果没有指定日志文件,使用默认路径
if log_file is None:
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = str(log_dir / f"{name}.log")
return StructuredLogger(name, level, log_file)
# 默认日志记录器
default_logger = create_logger("folder_docs_mcp")
performance_logger = PerformanceLogger(default_logger)
security_logger = SecurityLogger(default_logger)