"""LLM APIを使った課題分析・改善提案モジュール."""
import logging
import os
from pathlib import Path
from typing import Optional
from anthropic import AsyncAnthropic
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
class AIAnalyzer:
"""LLM APIを使った課題分析・改善提案を行うクラス."""
def __init__(
self, provider: Optional[str] = None, api_key: Optional[str] = None, model: Optional[str] = None
):
"""初期化.
Args:
provider: LLMプロバイダー名(省略時は環境変数LLM_PROVIDERを使用)
api_key: APIキー(省略時は環境変数LLM_API_KEYを使用)
model: モデル名(省略時は環境変数LLM_MODELまたはデフォルト)
"""
# .envファイルから環境変数を読み込み
env_path = Path.cwd() / ".env"
if env_path.exists():
load_dotenv(env_path)
logger.info(f".envファイルを読み込みました: {env_path}")
else:
logger.warning(f".envファイルが見つかりません: {env_path}")
# プロバイダーの取得(デフォルト: anthropic)
self.provider = provider or os.getenv("LLM_PROVIDER", "anthropic")
# APIキーの取得(後方互換性のためANTHROPIC_API_KEYもチェック)
self.api_key = api_key or os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
# モデル名の取得(後方互換性のためCLAUDE_MODELもチェック)
default_model = "claude-3-5-sonnet-20241022" if self.provider == "anthropic" else "gemini-2.0-flash-exp"
self.model = model or os.getenv("LLM_MODEL") or os.getenv("CLAUDE_MODEL", default_model)
# LLM APIクライアント初期化
self.client = None
if self.api_key:
try:
if self.provider == "anthropic":
self.client = AsyncAnthropic(api_key=self.api_key)
logger.info(f"Anthropic Claude APIクライアントを初期化しました (model: {self.model})")
elif self.provider == "google":
try:
import google.generativeai as genai
genai.configure(api_key=self.api_key)
self.client = genai.GenerativeModel(self.model)
logger.info(f"Google Gemini APIクライアントを初期化しました (model: {self.model})")
except ImportError:
logger.error("Google Gemini APIを使用するには、google-generativeaiパッケージが必要です")
logger.error("インストール: uv pip install google-generativeai")
self.client = None
else:
logger.error(f"未対応のLLMプロバイダー: {self.provider}")
logger.error("利用可能なプロバイダー: anthropic, google")
except Exception as e:
logger.error(f"LLM APIクライアント初期化エラー: {e}")
self.client = None
else:
logger.warning("LLM_API_KEYが設定されていません。AI分析機能は無効化されます。")
logger.warning("AI分析を有効にするには、.envファイルにLLM_PROVIDER=anthropic, LLM_API_KEY=your_api_keyを設定してください。")
async def analyze_issues(
self, keywords: list[dict], axis_analysis: Optional[dict] = None, comments_sample: list[str] = None
) -> dict:
"""課題を分析.
Args:
keywords: キーワードリスト
axis_analysis: 分析軸データ(オプション)
comments_sample: コメントサンプル(オプション)
Returns:
dict: 課題分析結果
- issues: 課題リスト
- priority_issues: 優先度高の課題
- summary: サマリー
- warning: 警告メッセージ(オプション)
"""
if not self.client:
logger.warning("AI分析がスキップされました(APIキー未設定)")
return {
"issues": [],
"priority_issues": [],
"summary": "AI分析は無効化されています",
"warning": "LLM_API_KEYが設定されていません。AI課題分析を有効にするには、プロジェクトルートに.envファイルを作成し、LLM_PROVIDER=anthropic, LLM_API_KEY=your_api_keyを設定してください。",
}
# プロンプト構築
prompt = self._build_analysis_prompt(keywords, axis_analysis, comments_sample)
try:
logger.info(f"{self.provider.upper()} API ({self.model}) で課題分析を実行中...")
# プロバイダーに応じてAPI呼び出し
if self.provider == "anthropic":
response = await self.client.messages.create(
model=self.model, max_tokens=2000, messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
elif self.provider == "google":
# Geminiは同期APIなので、asyncio.to_threadでブロッキング呼び出しを非同期化
import asyncio
response = await asyncio.to_thread(self.client.generate_content, prompt)
# レスポンスのチェック
if not response.candidates:
logger.error("Gemini API: レスポンスにcandidatesがありません")
logger.error(f"レスポンス: {response}")
raise ValueError("Gemini APIからの応答が空です")
# テキストを取得
try:
content = response.text
except Exception as e:
logger.error(f"Gemini API: response.textの取得に失敗: {e}")
# 代替方法でテキストを取得
if response.candidates and response.candidates[0].content.parts:
content = response.candidates[0].content.parts[0].text
else:
raise ValueError(f"Gemini APIからテキストを取得できません: {e}")
else:
raise ValueError(f"未対応のLLMプロバイダー: {self.provider}")
# 結果をパース(簡易的な実装)
issues = self._parse_issues(content)
logger.info(f"課題分析完了: {len(issues)}件の課題を検出")
return {
"issues": issues,
"priority_issues": [issue for issue in issues if issue.get("priority") == "高"],
"summary": content,
}
except Exception as e:
logger.error(f"課題分析に失敗: {e}")
error_msg = str(e)
# エラーの種類に応じて適切な警告メッセージを生成
if "authentication" in error_msg.lower() or "invalid" in error_msg.lower() or "api_key" in error_msg.lower():
warning_msg = f"{self.provider.upper()}_API_KEYが無効です。正しいAPIキーを.envファイルに設定してください。"
elif "rate_limit" in error_msg.lower():
warning_msg = "APIリクエストのレート制限に達しました。しばらく時間をおいてから再実行してください。"
elif "quota" in error_msg.lower() or "billing" in error_msg.lower():
warning_msg = "APIクォータまたは請求の問題が発生しています。アカウントの請求設定を確認してください。"
else:
warning_msg = f"AI課題分析でエラーが発生しました: {error_msg}"
return {
"issues": [],
"priority_issues": [],
"summary": f"エラー: {error_msg}",
"warning": warning_msg,
}
async def generate_solutions(self, issues: list[dict]) -> list[dict]:
"""改善提案を生成.
Args:
issues: 課題リスト
Returns:
list[dict]: 改善提案リスト
- issue: 課題
- solution: 改善策
- priority: 優先度
- difficulty: 実施難易度
- expected_effect: 期待効果
"""
if not self.client or not issues:
logger.warning("改善提案生成がスキップされました")
return []
# プロンプト構築
prompt = self._build_solution_prompt(issues)
try:
logger.info(f"{self.provider.upper()} API ({self.model}) で改善提案を生成中...")
# プロバイダーに応じてAPI呼び出し
if self.provider == "anthropic":
response = await self.client.messages.create(
model=self.model, max_tokens=3000, messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
elif self.provider == "google":
# Geminiは同期APIなので、asyncio.to_threadでブロッキング呼び出しを非同期化
import asyncio
response = await asyncio.to_thread(self.client.generate_content, prompt)
# レスポンスのチェック
if not response.candidates:
logger.error("Gemini API: レスポンスにcandidatesがありません")
logger.error(f"レスポンス: {response}")
raise ValueError("Gemini APIからの応答が空です")
# テキストを取得
try:
content = response.text
except Exception as e:
logger.error(f"Gemini API: response.textの取得に失敗: {e}")
# 代替方法でテキストを取得
if response.candidates and response.candidates[0].content.parts:
content = response.candidates[0].content.parts[0].text
else:
raise ValueError(f"Gemini APIからテキストを取得できません: {e}")
else:
raise ValueError(f"未対応のLLMプロバイダー: {self.provider}")
# 結果をパース
solutions = self._parse_solutions(content)
logger.info(f"改善提案生成完了: {len(solutions)}件")
return solutions
except Exception as e:
logger.error(f"改善提案生成に失敗: {e}")
logger.warning("改善提案の生成をスキップしました。課題分析は表示されますが、改善提案は表示されません。")
return []
def _build_analysis_prompt(
self, keywords: list[dict], axis_analysis: Optional[dict], comments_sample: Optional[list[str]]
) -> str:
"""分析用プロンプト構築.
Args:
keywords: キーワードリスト
axis_analysis: 分析軸データ
comments_sample: コメントサンプル
Returns:
str: プロンプト
"""
prompt = """あなたは組織診断の専門家です。以下の社内アンケートデータから、組織の課題を分析してください。
# キーワード分析結果
"""
# キーワードトップ20を追加
for i, kw in enumerate(keywords[:20], 1):
prompt += f"{i}. {kw['keyword']}: {kw['count']}回 (スコア: {kw['score']:.2f})\n"
# 分析軸データがあれば追加
if axis_analysis:
prompt += "\n# 分析軸別データ\n"
for axis_name, result in axis_analysis.get("axes_results", {}).items():
prompt += f"\n## {axis_name}\n"
categories = result.get("categories", {})
for category, data in list(categories.items())[:5]:
prompt += f"- {category}: {data['count']}件\n"
# コメントサンプルがあれば追加
if comments_sample:
prompt += "\n# コメントサンプル(抜粋)\n"
for i, comment in enumerate(comments_sample[:10], 1):
prompt += f"{i}. {comment[:100]}...\n"
prompt += """
# 分析タスク
上記データから、以下の観点で課題を抽出してください:
1. **ネガティブキーワード**: 不満や問題を示すキーワードから課題を特定
2. **軸間格差**: 分析軸間の差異から潜在的な問題を発見
3. **パターン認識**: コメントの傾向から組織文化の課題を抽出
# 出力形式
各課題について、以下の形式で出力してください:
課題1: [課題のタイトル]
- 詳細: [具体的な説明]
- 根拠: [データに基づく根拠]
- 優先度: [高/中/低]
- 影響範囲: [全社/特定部署/個人レベル]
課題2: ...
"""
return prompt
def _build_solution_prompt(self, issues: list[dict]) -> str:
"""改善提案用プロンプト構築.
Args:
issues: 課題リスト
Returns:
str: プロンプト
"""
prompt = """あなたは組織開発のコンサルタントです。以下の課題に対して、具体的かつ実行可能な改善策を提案してください。
# 検出された課題
"""
for i, issue in enumerate(issues[:10], 1):
prompt += f"\n課題{i}: {issue.get('title', '不明')}\n"
prompt += f"- 詳細: {issue.get('detail', '不明')}\n"
prompt += f"- 優先度: {issue.get('priority', '中')}\n"
prompt += """
# 改善提案の要件
各課題に対して、以下の要素を含む改善策を提案してください:
1. **具体的アクションプラン**: 実施可能な施策(3-5個)
2. **優先順位**: どの施策から着手すべきか
3. **実施難易度**: [高/中/低]
4. **期待効果**: 定量的・定性的な効果
5. **実施期間**: 短期(1-3ヶ月)/ 中期(3-6ヶ月)/ 長期(6ヶ月以上)
# 出力形式
課題1への改善提案:
【アクションプラン】
1. [施策1]
2. [施策2]
3. [施策3]
【優先順位】
[優先度の理由]
【実施難易度】
[難易度の説明]
【期待効果】
[具体的な効果]
【実施期間】
[期間の説明]
---
課題2への改善提案:
...
"""
return prompt
def _parse_issues(self, content: str) -> list[dict]:
"""課題をパース(簡易実装).
Args:
content: LLM APIのレスポンス
Returns:
list[dict]: 課題リスト
"""
issues = []
lines = content.split("\n")
current_issue = {}
for line in lines:
line = line.strip()
if line.startswith("課題"):
if current_issue:
issues.append(current_issue)
# 課題タイトル抽出
title = line.split(":", 1)[1].strip() if ":" in line else line
current_issue = {"title": title}
elif line.startswith("- 詳細:"):
current_issue["detail"] = line.split(":", 1)[1].strip()
elif line.startswith("- 優先度:"):
current_issue["priority"] = line.split(":", 1)[1].strip()
elif line.startswith("- 影響範囲:"):
current_issue["impact"] = line.split(":", 1)[1].strip()
if current_issue:
issues.append(current_issue)
return issues
def _parse_solutions(self, content: str) -> list[dict]:
"""改善提案をパース(簡易実装).
Args:
content: LLM APIのレスポンス
Returns:
list[dict]: 改善提案リスト
"""
solutions = []
sections = content.split("---")
for section in sections:
if "への改善提案" in section:
solution = {"actions": [], "priority": "", "difficulty": "", "expected_effect": "", "period": ""}
# アクションプラン抽出
if "【アクションプラン】" in section:
action_section = section.split("【アクションプラン】")[1].split("【")[0]
for line in action_section.split("\n"):
if line.strip() and (line.strip()[0].isdigit() or line.strip().startswith("-")):
solution["actions"].append(line.strip())
# 他の要素も同様に抽出
if "【優先順位】" in section:
solution["priority"] = section.split("【優先順位】")[1].split("【")[0].strip()
if "【実施難易度】" in section:
solution["difficulty"] = section.split("【実施難易度】")[1].split("【")[0].strip()
if "【期待効果】" in section:
solution["expected_effect"] = section.split("【期待効果】")[1].split("【")[0].strip()
if "【実施期間】" in section:
solution["period"] = section.split("【実施期間】")[1].strip()
solutions.append(solution)
return solutions