"""形態素解析とキーワード抽出モジュール."""
import logging
from collections import Counter
from pathlib import Path
from typing import Optional
from janome.tokenizer import Tokenizer
from langdetect import LangDetectException, detect
logger = logging.getLogger(__name__)
class TextAnalyzer:
"""形態素解析とキーワード抽出を行うクラス.
日本語(janome)と英語(spacy)に対応した形態素解析を行い、
名詞・動詞・形容詞を抽出してキーワードランキングを生成します。
"""
def __init__(self, stopwords_path: Optional[str] = None, language: Optional[str] = None):
"""初期化.
Args:
stopwords_path: ストップワードファイルのパス(省略時はデフォルト使用)
language: 言語コード ("ja" or "en"、省略時は自動判別)
"""
try:
self.language = language # 手動指定された言語(Noneなら自動判別)
self.tokenizer_ja = None
self.nlp_en = None
# 日本語用の初期化(janome)
logger.info("Initializing Japanese Tokenizer (janome)...")
self.tokenizer_ja = Tokenizer()
logger.info(f"Japanese Tokenizer initialized: {type(self.tokenizer_ja)}")
# tokenizeメソッドが存在するか確認
if not hasattr(self.tokenizer_ja, "tokenize"):
raise AttributeError("Tokenizer does not have 'tokenize' method")
logger.info("Loading stopwords...")
self.stopwords_ja = self._load_stopwords_ja(stopwords_path)
self.stopwords_en = self._load_stopwords_en()
logger.info(f"Stopwords loaded: JA={len(self.stopwords_ja)}, EN={len(self.stopwords_en)}")
self.target_pos_ja = ["名詞", "動詞", "形容詞"]
self.target_pos_en = ["NOUN", "VERB", "ADJ"]
logger.info("TextAnalyzer initialization complete")
except Exception as e:
logger.error(f"TextAnalyzer initialization failed: {e}", exc_info=True)
raise
def detect_language(self, text: str) -> str:
"""テキストから言語を自動判別.
Args:
text: 判別対象のテキスト
Returns:
str: 言語コード ("ja" or "en")
"""
if self.language: # 手動指定があればそれを優先
return self.language
try:
lang = detect(text)
detected = "en" if lang == "en" else "ja"
logger.info(f"Language detected: {detected} (raw: {lang})")
return detected
except LangDetectException as e:
logger.warning(f"Language detection failed: {e}, defaulting to Japanese")
return "ja" # デフォルトは日本語
def _ensure_english_nlp(self):
"""英語NLPモデルの遅延初期化."""
if self.nlp_en is None:
try:
import spacy
logger.info("Loading English NLP model (spacy en_core_web_sm)...")
self.nlp_en = spacy.load("en_core_web_sm")
logger.info("English NLP model loaded successfully")
except OSError as e:
logger.error(
"English NLP model (en_core_web_sm) not found. "
"This should be automatically installed via dependencies. "
"Please reinstall the package or contact support."
)
raise
except ImportError:
logger.error("spacy is not installed. This should not happen in normal installation.")
raise
def analyze(self, text: str, language: Optional[str] = None) -> dict:
"""テキストを形態素解析.
Args:
text: 解析対象のテキスト
language: 言語コード ("ja" or "en"、省略時は自動判別)
Returns:
dict: 分析結果
- tokens: トークンリスト
- keywords: キーワードとその頻度
- word_count: 総単語数
- language: 使用した言語
"""
if not text or not text.strip():
return {"tokens": [], "keywords": {}, "word_count": 0, "language": "ja"}
# 言語判別
if language is None:
language = self.detect_language(text)
# 言語に応じた解析
if language == "ja":
return self._analyze_ja(text)
else:
return self._analyze_en(text)
def _analyze_ja(self, text: str) -> dict:
"""日本語テキストを形態素解析(janome).
Args:
text: 解析対象のテキスト
Returns:
dict: 分析結果
"""
tokens = []
try:
# janomeのtokenize()を呼び出し
for token in self.tokenizer_ja.tokenize(text):
# Tokenオブジェクトのアトリビュートを安全に取得
if hasattr(token, 'surface') and hasattr(token, 'part_of_speech'):
# 新しいjanome (0.4.x以降)
surface = token.surface
part_of_speech = token.part_of_speech
elif hasattr(token, '__str__'):
# 古いjanome(文字列として返される場合)
token_str = str(token)
parts = token_str.split("\t")
if len(parts) < 2:
continue
surface = parts[0]
part_of_speech = parts[1]
else:
# 不明な形式
logger.warning(f"Unknown token format: {type(token)}")
continue
# 品詞情報をパース
features = part_of_speech.split(",")
pos = features[0] if features else ""
# 品詞フィルタリング
if self._filter_pos_ja(pos, surface):
tokens.append({"surface": surface, "pos": pos, "features": features})
except Exception as e:
logger.error(f"形態素解析エラー: {e}", exc_info=True)
return {"tokens": [], "keywords": {}, "word_count": 0, "language": "ja"}
# キーワード頻度集計
word_counter = Counter([token["surface"] for token in tokens])
keywords = dict(word_counter.most_common())
logger.info(f"日本語形態素解析完了: {len(tokens)}トークン抽出")
return {"tokens": tokens, "keywords": keywords, "word_count": len(tokens), "language": "ja"}
def _analyze_en(self, text: str) -> dict:
"""英語テキストを形態素解析(spacy).
Args:
text: 解析対象のテキスト
Returns:
dict: 分析結果
"""
self._ensure_english_nlp()
tokens = []
try:
doc = self.nlp_en(text)
for token in doc:
# 品詞フィルタリング
if self._filter_pos_en(token.pos_, token.text):
tokens.append({"surface": token.text.lower(), "pos": token.pos_, "lemma": token.lemma_})
except Exception as e:
logger.error(f"English morphological analysis error: {e}", exc_info=True)
return {"tokens": [], "keywords": {}, "word_count": 0, "language": "en"}
# キーワード頻度集計(レンマを使用)
word_counter = Counter([token["lemma"] for token in tokens])
keywords = dict(word_counter.most_common())
logger.info(f"English morphological analysis completed: {len(tokens)} tokens extracted")
return {"tokens": tokens, "keywords": keywords, "word_count": len(tokens), "language": "en"}
def extract_keywords(self, text: str, top_n: int = 20, language: Optional[str] = None) -> list[dict]:
"""キーワードを抽出してランキング.
Args:
text: 解析対象のテキスト
top_n: 上位N件を返す
language: 言語コード ("ja" or "en"、省略時は自動判別)
Returns:
list[dict]: キーワードランキング
- keyword: キーワード
- count: 出現回数
- score: スコア(0-1で正規化)
"""
analysis = self.analyze(text, language=language)
keywords = analysis["keywords"]
if not keywords:
return []
# 上位N件を取得
top_keywords = list(keywords.items())[:top_n]
# スコア正規化(最大値を1とする)
max_count = top_keywords[0][1] if top_keywords else 1
results = []
for keyword, count in top_keywords:
score = count / max_count
results.append({"keyword": keyword, "count": count, "score": score})
logger.info(f"キーワード抽出完了: 上位{len(results)}件 (言語: {analysis.get('language', 'unknown')})")
return results
def extract_compound_keywords(
self,
text: str,
min_frequency: int = 2,
max_compound_length: int = 3,
compound_ratio: float = 0.7,
language: Optional[str] = None,
) -> dict[str, int]:
"""複合名詞と単語のハイブリッド抽出でWordCloud用の頻度辞書を生成.
注: 現在は日本語のみ対応。英語の場合は通常のキーワード抽出を実行。
Args:
text: 解析対象のテキスト
min_frequency: 最小出現回数(この回数未満は除外)
max_compound_length: 複合語の最大長(2-3推奨)
compound_ratio: 複合語の比率(0.6-0.8推奨)
language: 言語コード ("ja" or "en"、省略時は自動判別)
Returns:
dict[str, int]: 複合語と単語の頻度辞書
"""
if not text or not text.strip():
return {}
# 言語判別
if language is None:
language = self.detect_language(text)
# 英語の場合は通常のキーワード抽出(複合語抽出なし)
if language == "en":
analysis = self._analyze_en(text)
keywords = analysis["keywords"]
# 最小頻度でフィルタリング
filtered = {k: v for k, v in keywords.items() if v >= min_frequency}
logger.info(f"English keyword extraction: {len(filtered)} words")
return filtered
# 日本語の場合は複合語抽出
# 形態素解析
tokens = []
try:
for token in self.tokenizer_ja.tokenize(text):
if hasattr(token, "surface") and hasattr(token, "part_of_speech"):
surface = token.surface
part_of_speech = token.part_of_speech
elif hasattr(token, "__str__"):
token_str = str(token)
parts = token_str.split("\t")
if len(parts) < 2:
continue
surface = parts[0]
part_of_speech = parts[1]
else:
continue
features = part_of_speech.split(",")
pos = features[0] if features else ""
tokens.append({"surface": surface, "pos": pos})
except Exception as e:
logger.error(f"形態素解析エラー: {e}", exc_info=True)
return {}
# 1. 複合名詞を抽出
compound_nouns = []
i = 0
while i < len(tokens):
if tokens[i]["pos"] == "名詞":
compound = [tokens[i]["surface"]]
j = i + 1
# 連続する名詞を結合(最大max_compound_length語まで)
while j < len(tokens) and tokens[j]["pos"] == "名詞" and len(compound) < max_compound_length:
compound.append(tokens[j]["surface"])
j += 1
# 2語以上の複合名詞のみ抽出
if len(compound) >= 2:
compound_noun = "".join(compound)
# ストップワードチェック
if not any(sw in compound_noun for sw in self.stopwords_ja):
compound_nouns.append(compound_noun)
i = j
else:
i += 1
else:
i += 1
# 2. 単語を抽出(既存ロジック)
single_words = [
token["surface"] for token in tokens if self._filter_pos_ja(token["pos"], token["surface"])
]
# 3. 頻度カウント
compound_counter = Counter(compound_nouns)
word_counter = Counter(single_words)
# 4. 頻度フィルタリング(min_frequency未満を除外)
compound_filtered = {k: v for k, v in compound_counter.items() if v >= min_frequency}
word_filtered = {k: v for k, v in word_counter.items() if v >= min_frequency}
# 5. スコアリング(複合語を優先)
compound_scored = {k: int(v * 1.5) for k, v in compound_filtered.items()}
word_scored = word_filtered.copy()
# 6. 複合語に含まれる単語を除外(重複回避)
for compound in compound_scored.keys():
for word in list(word_scored.keys()):
if word in compound:
del word_scored[word]
# 7. 総スコア計算
total_compound_score = sum(compound_scored.values())
total_word_score = sum(word_scored.values())
total_score = total_compound_score + total_word_score
if total_score == 0:
return {}
# 8. 比率調整(compound_ratio:残りの比率になるよう調整)
target_compound_score = total_score * compound_ratio
target_word_score = total_score * (1 - compound_ratio)
# スケーリング係数計算
compound_scale = target_compound_score / total_compound_score if total_compound_score > 0 else 1
word_scale = target_word_score / total_word_score if total_word_score > 0 else 1
# 最終的な頻度辞書
result = {}
for k, v in compound_scored.items():
result[k] = max(1, int(v * compound_scale))
for k, v in word_scored.items():
result[k] = max(1, int(v * word_scale))
logger.info(f"複合キーワード抽出完了: 複合語{len(compound_scored)}語, 単語{len(word_scored)}語")
return result
def _load_stopwords_ja(self, stopwords_path: Optional[str] = None) -> set:
"""日本語ストップワード読み込み.
Args:
stopwords_path: ストップワードファイルのパス
Returns:
set: 日本語ストップワードのセット
"""
# デフォルトの日本語ストップワード
default_stopwords = {
"する",
"ある",
"いる",
"なる",
"れる",
"できる",
"くる",
"やる",
"つく",
"思う",
"みる",
"いく",
"こと",
"もの",
"よう",
"そう",
"です",
"ます",
"ません",
"ました",
"です",
"だ",
"である",
"ある",
"これ",
"それ",
"あれ",
"この",
"その",
"あの",
"ここ",
"そこ",
"あそこ",
"など",
"的",
"さん",
"ちゃん",
"くん",
}
if stopwords_path and Path(stopwords_path).exists():
try:
with open(stopwords_path, "r", encoding="utf-8") as f:
custom_stopwords = {line.strip() for line in f if line.strip()}
default_stopwords.update(custom_stopwords)
logger.info(f"カスタム日本語ストップワード読み込み: {len(custom_stopwords)}語")
except Exception as e:
logger.warning(f"ストップワードファイルの読み込みに失敗: {e}")
return default_stopwords
def _load_stopwords_en(self) -> set:
"""英語ストップワード読み込み.
Returns:
set: 英語ストップワードのセット
"""
# 一般的な英語ストップワード
stopwords = {
"i",
"me",
"my",
"myself",
"we",
"our",
"ours",
"ourselves",
"you",
"your",
"yours",
"yourself",
"yourselves",
"he",
"him",
"his",
"himself",
"she",
"her",
"hers",
"herself",
"it",
"its",
"itself",
"they",
"them",
"their",
"theirs",
"themselves",
"what",
"which",
"who",
"whom",
"this",
"that",
"these",
"those",
"am",
"is",
"are",
"was",
"were",
"be",
"been",
"being",
"have",
"has",
"had",
"having",
"do",
"does",
"did",
"doing",
"a",
"an",
"the",
"and",
"but",
"if",
"or",
"because",
"as",
"until",
"while",
"of",
"at",
"by",
"for",
"with",
"about",
"against",
"between",
"into",
"through",
"during",
"before",
"after",
"above",
"below",
"to",
"from",
"up",
"down",
"in",
"out",
"on",
"off",
"over",
"under",
"again",
"further",
"then",
"once",
}
return stopwords
def _filter_pos_ja(self, pos: str, surface: str) -> bool:
"""日本語品詞フィルタリング.
Args:
pos: 品詞
surface: 表層形
Returns:
bool: フィルタを通過する場合True
"""
# 品詞が対象外
if pos not in self.target_pos_ja:
return False
# ストップワードに含まれる
if surface in self.stopwords_ja:
return False
# 1文字のみ(記号・助詞など)
if len(surface) <= 1:
return False
# 数字のみ
if surface.isdigit():
return False
return True
def _filter_pos_en(self, pos: str, surface: str) -> bool:
"""英語品詞フィルタリング.
Args:
pos: 品詞
surface: 表層形(小文字化されたもの)
Returns:
bool: フィルタを通過する場合True
"""
# 品詞が対象外
if pos not in self.target_pos_en:
return False
# ストップワードに含まれる
if surface.lower() in self.stopwords_en:
return False
# 2文字未満(短すぎる)
if len(surface) < 3:
return False
# 数字のみ
if surface.isdigit():
return False
# アルファベット以外を含む(記号など)
if not surface.isalpha():
return False
return True