"""分析軸に基づく集計・比較分析モジュール."""
import logging
from typing import Optional
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
class AxisAnalyzer:
"""分析軸に基づく集計・比較分析を行うクラス.
指定された分析軸(部署、年代、役職など)でキーワード分析を行い、
軸間の差異や異常値を検出します。
"""
def __init__(self):
"""初期化."""
pass
def analyze_by_axis(self, df: pd.DataFrame, keywords: list[dict], axis_column: str, comment_column: str) -> dict:
"""指定軸でキーワード分析.
Args:
df: DataFrame
keywords: キーワードリスト
axis_column: 分析軸のカラム名
comment_column: コメント列名
Returns:
dict: 軸別分析結果
- axis_name: 軸名
- categories: カテゴリ別データ
- summary: サマリー統計
"""
if axis_column not in df.columns:
raise ValueError(f"分析軸が見つかりません: {axis_column}")
# 軸でグルーピング
grouped = df.groupby(axis_column)
categories = {}
for category, group_df in grouped:
# カテゴリのコメントを結合
comments = group_df[comment_column].dropna().astype(str).tolist()
combined_text = " ".join(comments)
# カテゴリのキーワード出現回数を集計
keyword_counts = {}
for kw in keywords:
keyword = kw["keyword"]
count = combined_text.count(keyword)
if count > 0:
keyword_counts[keyword] = count
categories[str(category)] = {
"count": len(group_df),
"keyword_counts": keyword_counts,
"top_keywords": sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10],
}
# サマリー統計
summary = {"total_categories": len(categories), "category_stats": self._calculate_stats(categories)}
logger.info(f"軸別分析完了: {axis_column} ({len(categories)}カテゴリ)")
return {"axis_name": axis_column, "categories": categories, "summary": summary}
def compare_axes(self, df: pd.DataFrame, keywords: list[dict], axes: list[str], comment_column: str) -> dict:
"""複数軸の比較分析.
Args:
df: DataFrame
keywords: キーワードリスト
axes: 分析軸のカラム名リスト
comment_column: コメント列名
Returns:
dict: 複数軸の分析結果
- axes_results: 各軸の分析結果
- comparison: 軸間比較データ
"""
axes_results = {}
for axis in axes:
if axis in df.columns:
try:
axes_results[axis] = self.analyze_by_axis(df, keywords, axis, comment_column)
except Exception as e:
logger.warning(f"軸 '{axis}' の分析に失敗: {e}")
continue
# 軸間比較
comparison = self._compare_axes_results(axes_results)
logger.info(f"複数軸比較完了: {len(axes_results)}軸")
return {"axes_results": axes_results, "comparison": comparison}
def detect_outliers(self, analysis_result: dict, threshold: float = 2.0) -> list[dict]:
"""異常値・外れ値検出.
Args:
analysis_result: 軸別分析結果
threshold: 標準偏差の倍数(デフォルト: 2.0σ)
Returns:
list[dict]: 異常値リスト
- category: カテゴリ名
- value: 値
- mean: 平均値
- std: 標準偏差
- z_score: Zスコア
"""
categories = analysis_result.get("categories", {})
if not categories:
return []
# 各カテゴリのレコード数を抽出
counts = [cat_data["count"] for cat_data in categories.values()]
mean_count = np.mean(counts)
std_count = np.std(counts)
if std_count == 0:
return []
outliers = []
for category, cat_data in categories.items():
count = cat_data["count"]
z_score = (count - mean_count) / std_count
if abs(z_score) > threshold:
outliers.append(
{"category": category, "value": count, "mean": mean_count, "std": std_count, "z_score": z_score}
)
logger.info(f"異常値検出完了: {len(outliers)}件")
return sorted(outliers, key=lambda x: abs(x["z_score"]), reverse=True)
def _calculate_stats(self, categories: dict) -> dict:
"""カテゴリ統計を計算.
Args:
categories: カテゴリ別データ
Returns:
dict: 統計データ
"""
counts = [cat_data["count"] for cat_data in categories.values()]
if not counts:
return {"mean": 0, "median": 0, "std": 0, "min": 0, "max": 0}
return {
"mean": float(np.mean(counts)),
"median": float(np.median(counts)),
"std": float(np.std(counts)),
"min": int(np.min(counts)),
"max": int(np.max(counts)),
}
def _compare_axes_results(self, axes_results: dict) -> dict:
"""軸間比較データを生成.
Args:
axes_results: 各軸の分析結果
Returns:
dict: 軸間比較データ
"""
comparison = {"axis_count": len(axes_results), "axes_overview": {}}
for axis_name, result in axes_results.items():
comparison["axes_overview"][axis_name] = {
"categories_count": result["summary"]["total_categories"],
"stats": result["summary"]["category_stats"],
}
return comparison