import logging.config
import traceback
from datetime import datetime
from logging import LogRecord
from types import TracebackType
from typing import Any
from lib import jsonutils
from lib.enums import MigrationName
from lib.jsonutils import json_serial
def json_dumps(obj: Any) -> str:
return jsonutils.dumps(obj, default=json_serial)
class CHANNELS:
TASKS = 'TASKS'
SLACK = 'SLACK'
# analytics apis
API_SERVICE_ROUTER = 'API_SERVICE_ROUTER'
V0_API_ROUTER = 'V0_API_ROUTER'
V1_API_ROUTER = 'V1_API_ROUTER'
V2_PARTNER_API_ROUTER = 'V2_PARTNER_API_ROUTER'
PROMO_API_ROUTER = 'PROMO_API_ROUTER'
# analytics api v3
API_MIDDLEWARE = 'API_MIDDLEWARE'
API_V3_SERVICE_ROUTER = 'API_V3_SERVICE_ROUTER'
# external project apis
BM_ANALYTICS_API = 'BM_ANALYTICS_API'
BM_PROMO_API = 'BM_PROMO_API'
HYPER_API = 'HYPER_API'
BOOONKER_API = 'BOOONKER_API'
class LogContext:
channel: str = ''
job: str = ''
request_id: str = ''
project_uid: str = ''
project_name: str = ''
server_name: str = ''
campaign_uid: str | int = ''
migration: MigrationName | str = ''
@classmethod
def set(cls, clear_others: bool = False, **kw: Any) -> None:
if clear_others:
cls.clear()
for key, value in kw.items():
if key == 'job':
value = str(value)
if hasattr(cls, key):
setattr(cls, key, value)
@classmethod
def get(cls, name: str) -> Any:
return getattr(cls, name, None)
@classmethod
def clear(cls) -> None:
cls.channel = ''
cls.job = ''
cls.request_id = ''
cls.project_uid = ''
cls.project_name = ''
cls.server_name = ''
cls.campaign_uid = ''
cls.migration = ''
class BaseFormat(logging.Formatter):
MAX_LOG_SIZE: int = 0
@staticmethod
def find(*args: Any, **kwargs: Any) -> int:
return -1
def formatException(
self,
exc_info: (
tuple[type[BaseException], BaseException, TracebackType | None]
| tuple[None, None, None]
),
) -> str:
s = super().formatException(exc_info)
rows = s.split('\n')
rows[-1] = ' ' + rows[-1]
return '\n'.join(rows)
def format(self, record: LogRecord) -> str:
extra = record.__dict__
log_extra_data = str(extra.get('extra_data', ''))
if record.exc_info:
# Ensure exc_info is tuple with 3 elements, not tuple[None, None, None]
if record.exc_info != (None, None, None):
exc_info_typed: tuple[
type[BaseException], BaseException, TracebackType | None
] = record.exc_info # type: ignore[assignment]
log_extra_data = self._format_tb(exc_info_typed, log_extra_data)
data = {
'log_message': record.getMessage(),
'log_data': str(extra.get('data', '')),
'log_extra_data': log_extra_data,
'log_level': record.levelname,
'number': extra.get('number', 0),
'log_channel': extra.get('channel') or LogContext.channel or '',
'job': LogContext.job or '',
'request_id': LogContext.request_id or '',
'project_uid': LogContext.project_uid or '',
'project_name': LogContext.project_name or '',
'server_name': LogContext.server_name or '',
'campaign_uid': LogContext.campaign_uid or '',
'migration': LogContext.migration or '',
'name': record.name,
'pathname': record.pathname,
'lineno': record.lineno,
'log_ts': record.created,
'log_date': datetime.strftime(
datetime.fromtimestamp(record.created), self.datefmt or '%Y-%m-%d %H:%M:%S'
),
}
return self._format(data)
@classmethod
def _format(cls, data: dict[str, Any]) -> str:
raise NotImplementedError
@classmethod
def _format_tb(
cls,
exc_info: tuple[type[BaseException], BaseException, TracebackType | None],
log_extra_data: str,
) -> str:
tb_formatted_str: str
try:
_, _, tb = exc_info
tb_formatted_list = traceback.format_tb(tb)
tb_formatted_str = '\n'.join(tb_formatted_list)
except Exception:
tb_formatted_str = ''
if log_extra_data and tb_formatted_str:
return f'{log_extra_data}\n{tb_formatted_str}'
return log_extra_data or tb_formatted_str
@classmethod
def _truncate_log(cls, data: dict[str, Any]) -> dict[str, Any]:
if len(json_dumps(data)) < cls.MAX_LOG_SIZE:
return data
max_field_size = int(cls.MAX_LOG_SIZE / 3) - 1000
for key in ('log_data', 'log_message', 'log_extra_data'):
if not isinstance(data[key], str):
data[key] = json_dumps(data[key])
if len(data[key]) > max_field_size:
data[key] = data[key][:max_field_size]
return data
class JsonFormatter(BaseFormat):
@classmethod
def _format(cls, data: dict[str, Any]) -> str:
truncated = cls._truncate_log(data)
json_str = json_dumps(truncated)
return f'{json_str}\n'
class TextFormatter(BaseFormat):
@classmethod
def _get_context_msg(cls) -> str:
context_msg = ''
for k in dir(LogContext):
if k.startswith('__'):
continue
value = LogContext.get(k)
if k in ('get', 'set', 'clear'):
continue
if not value:
continue
context_msg += '\n{}={}'.format(k, value)
return context_msg
@classmethod
def _format(cls, data: dict[str, Any]) -> str:
get_context_msg = cls._get_context_msg()
template = """{}: {} - {}\n{}\n{}\ncontext:{}\n{}{}
"""
data = cls._truncate_log(data)
return template.format(
data['log_date'],
data['log_level'],
data['log_channel'],
data['log_message'],
data['log_data'],
get_context_msg,
data['log_extra_data'],
'\nnumber:%s' % str(data['number']) if data.get('number') else '',
)
class JsonStreamHandler(logging.StreamHandler[Any]):
def __init__(self, stream: Any = None) -> None:
super().__init__(stream)
formatter = JsonFormatter(datefmt='%Y-%m-%dT%H:%M:%SZ', style='{')
self.setFormatter(formatter)
class TextStreamHandler(logging.StreamHandler[Any]):
def __init__(self, stream: Any = None) -> None:
super().__init__(stream)
formatter = TextFormatter(datefmt='%Y-%m-%dT%H:%M:%SZ', style='{')
self.setFormatter(formatter)
class WarehouseLogger(logging.Logger):
def __init__(self, name: str, level: int = logging.WARNING) -> None:
super().__init__(name, level)
self.set_context()
def set_context(self, clear_others: bool = False, **kwargs: Any) -> None:
LogContext.set(clear_others, **kwargs)
def info( # type: ignore[override]
self,
msg: str,
*args: Any,
data: Any = None,
extra_data: Any = None,
number: float | None = None,
**kwargs: Any,
) -> None:
self._log_with_additional_fields(
logging.INFO, msg, args, data=data, extra_data=extra_data, number=number, **kwargs
)
def warning( # type: ignore[override]
self,
msg: str,
*args: Any,
data: Any = None,
extra_data: Any = None,
number: float | None = None,
sentry_skip: bool = False,
**kwargs: Any,
) -> None:
self._log_with_additional_fields(
logging.WARNING,
msg,
args,
data=data,
extra_data=extra_data,
number=number,
sentry_skip=sentry_skip,
**kwargs,
)
def error( # type: ignore[override]
self,
msg: str,
*args: Any,
data: Any = None,
extra_data: Any = None,
number: float | None = None,
sentry_skip: bool = False,
**kwargs: Any,
) -> None:
self._log_with_additional_fields(
logging.ERROR,
msg,
args,
data=data,
extra_data=extra_data,
number=number,
sentry_skip=sentry_skip,
**kwargs,
)
def debug( # type: ignore[override]
self,
msg: str,
*args: Any,
data: Any = None,
extra_data: Any = None,
number: float | None = None,
**kwargs: Any,
) -> None:
self._log_with_additional_fields(
logging.DEBUG, msg, args, data=data, extra_data=extra_data, number=number, **kwargs
)
def _log_with_additional_fields(
self,
level: int,
msg: str,
args: tuple[Any, ...],
*,
data: Any = None,
extra_data: Any = None,
number: float | None = None,
sentry_skip: bool = False,
exc_info: Any = None,
stack_info: Any = False,
**kwargs: Any,
) -> None:
extra = {
'data': data,
'extra_data': extra_data,
'number': number,
'sentry_skip': sentry_skip,
}
extra.update(kwargs)
super()._log(
level,
msg,
args,
exc_info=exc_info,
extra=extra,
stack_info=stack_info,
)
def getLogger(name: str) -> WarehouseLogger:
logging.setLoggerClass(WarehouseLogger)
logger = logging.getLogger(name)
assert isinstance(logger, WarehouseLogger)
return logger