"""
비즈니스 인사이트 및 선호도 분석 기능
"""
import os
import json
from datetime import timedelta
from typing import Optional, List, Dict
from groq import Groq
from sqlalchemy import func
from ...db.database import get_session
from ...db.models import Persona, FGISurvey, FGIResponse
from ...utils.cache import get_cache
from ...utils.credentials import get_groq_api_key
def analyze_preference(
query: str,
category: Optional[str] = None,
age_range: Optional[str] = None,
gender: Optional[str] = None,
location: Optional[str] = None,
limit: int = 20,
arguments: Optional[Dict] = None
):
"""
특정 제품/주제에 대한 페르소나들의 선호도 분석
예: "스티커 디자인", "키링 스타일", "제품 느낌" 등
:param query: 분석할 주제/제품 (예: "스티커 디자인", "키링", "제품 느낌")
:param category: 설문 카테고리 필터
:param age_range: 나이대 필터
:param gender: 성별 필터
:param location: 지역 필터
:param limit: 분석할 응답 수 제한
:return: 선호도 분석 결과
"""
try:
# 캐시 확인 (선호도 분석은 같은 쿼리+필터 조합에서 재사용 가능)
cache = get_cache()
cache_key = cache._generate_cache_key(
"preference",
query=query,
category=category,
age_range=age_range,
gender=gender,
location=location,
limit=limit
)
cached_result = cache.get(cache_key, ttl=timedelta(hours=12))
if cached_result is not None:
return cached_result
groq_api_key = get_groq_api_key(arguments)
if not groq_api_key:
return {"error": "GROQ_API_KEY가 설정되지 않았습니다"}
client = Groq(api_key=groq_api_key)
with get_session() as session:
# 필터 조건에 맞는 응답 조회
responses_query = session.query(FGIResponse)
# 페르소나 필터 적용
if age_range or gender or location:
personas_query = session.query(Persona.id)
if age_range:
personas_query = personas_query.filter(Persona.age_range == age_range)
if gender:
personas_query = personas_query.filter(Persona.gender == gender)
if location:
personas_query = personas_query.filter(Persona.location == location)
persona_ids = [p[0] for p in personas_query.all()]
responses_query = responses_query.filter(FGIResponse.persona_id.in_(persona_ids))
# 설문 카테고리 필터 적용
if category:
survey_ids = session.query(FGISurvey.id).filter(FGISurvey.category == category).all()
survey_ids = [s[0] for s in survey_ids]
responses_query = responses_query.filter(FGIResponse.survey_id.in_(survey_ids))
# 응답 가져오기
responses = responses_query.order_by(FGIResponse.submitted_at.desc()).limit(limit).all()
if not responses:
result = {
"query": query,
"message": "조건에 맞는 응답이 없습니다.",
"count": 0,
"insights": []
}
# 캐시 저장 (응답이 없어도 캐싱)
cache.set(cache_key, result, ttl=timedelta(hours=12))
return result
# 응답 텍스트 수집
response_texts = []
for response in responses:
if response.responses:
for resp in response.responses:
answer = resp.get('answer', '')
if answer:
response_texts.append(answer)
if not response_texts:
result = {
"query": query,
"message": "응답 내용이 없습니다.",
"count": 0,
"insights": []
}
# 캐시 저장 (응답이 없어도 캐싱)
cache.set(cache_key, result, ttl=timedelta(hours=12))
return result
# LLM으로 선호도 분석
responses_summary = "\n".join([f"- {text[:200]}" for text in response_texts[:10]])
prompt = f"""다음은 사용자들의 설문 응답입니다. 이 응답들을 분석하여 **"{query}"**에 대한 선호도를 파악해주세요.
**응답 샘플:**
{responses_summary}
**분석 목표:**
- "{query}"에 대한 사용자들의 선호도 파악
- 인기 있는 스타일/디자인/느낌 추출
- 트렌드와 패턴 발견
- 구체적인 예시와 이유
**JSON 형식으로 반환:**
{{
"top_preferences": [
{{
"preference": "선호하는 스타일/디자인/느낌",
"description": "상세 설명",
"examples": ["구체적 예시1", "구체적 예시2"],
"reason": "왜 선호하는지",
"frequency": "얼마나 자주 언급되었는지 (높음/중간/낮음)"
}}
],
"trends": ["트렌드1", "트렌드2", "트렌드3"],
"insights": "종합 인사이트 (비즈니스 관점에서)",
"recommendations": ["추천사항1", "추천사항2", "추천사항3"]
}}
주의:
- 모든 값은 한국어로 작성
- 구체적이고 실용적인 인사이트 제공
- 비즈니스 관점에서 활용 가능한 정보
"""
llm_response = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=800,
temperature=0.7
)
analysis = json.loads(llm_response.choices[0].message.content)
result = {
"query": query,
"filters": {
"category": category,
"age_range": age_range,
"gender": gender,
"location": location
},
"analyzed_responses": len(response_texts),
"top_preferences": analysis.get("top_preferences", []),
"trends": analysis.get("trends", []),
"insights": analysis.get("insights", ""),
"recommendations": analysis.get("recommendations", [])
}
# 캐시 저장 (성공한 경우만, Rate Limit 에러는 짧게 캐싱)
if not result.get("error"):
cache.set(cache_key, result, ttl=timedelta(hours=12))
elif "rate_limit" in str(result.get("error", "")).lower() or "429" in str(result.get("error", "")):
# Rate Limit 에러는 5분간 캐싱 (같은 에러 반복 방지)
cache.set(cache_key, result, ttl=timedelta(minutes=5))
return result
except Exception as e:
error_result = {"error": f"선호도 분석 중 오류 발생: {str(e)}"}
# Rate Limit 에러는 짧게 캐싱
if "rate_limit" in str(e).lower() or "429" in str(e):
cache.set(cache_key, error_result, ttl=timedelta(minutes=5))
return error_result
def get_trend_insights(
topic: str,
persona_filters: Optional[Dict] = None,
min_freshness: Optional[float] = None,
arguments: Optional[Dict] = None
):
"""
특정 주제에 대한 트렌드 인사이트 제공
예: "요즘 사람들이 좋아하는 스티커 디자인", "인기 있는 키링 스타일"
:param topic: 분석할 주제 (예: "스티커 디자인", "키링", "제품 느낌")
:param persona_filters: 페르소나 필터 (age_range, gender, location 등)
:param min_freshness: 최소 신선도 점수
:return: 트렌드 인사이트
"""
try:
# 캐시 확인 (트렌드 인사이트는 같은 토픽+필터 조합에서 재사용 가능)
cache = get_cache()
cache_key = cache._generate_cache_key(
"trend",
topic=topic,
filters=persona_filters or {},
min_freshness=min_freshness
)
cached_result = cache.get(cache_key, ttl=timedelta(hours=6))
if cached_result is not None:
return cached_result
groq_api_key = get_groq_api_key(arguments)
if not groq_api_key:
return {"error": "GROQ_API_KEY가 설정되지 않았습니다"}
client = Groq(api_key=groq_api_key)
with get_session() as session:
# 필터 조건에 맞는 페르소나 조회
personas_query = session.query(Persona)
if persona_filters:
if persona_filters.get("age_range"):
personas_query = personas_query.filter(Persona.age_range == persona_filters["age_range"])
if persona_filters.get("gender"):
personas_query = personas_query.filter(Persona.gender == persona_filters["gender"])
if persona_filters.get("location"):
personas_query = personas_query.filter(Persona.location == persona_filters["location"])
if persona_filters.get("occupation_category"):
personas_query = personas_query.filter(Persona.occupation_category == persona_filters["occupation_category"])
if min_freshness:
personas_query = personas_query.filter(Persona.freshness_score >= min_freshness)
personas = personas_query.all()
persona_ids = [p.id for p in personas]
if not persona_ids:
return {
"topic": topic,
"message": "조건에 맞는 페르소나가 없습니다.",
"insights": []
}
# 해당 페르소나들의 응답 조회
responses = session.query(FGIResponse).filter(
FGIResponse.persona_id.in_(persona_ids)
).order_by(FGIResponse.submitted_at.desc()).limit(30).all()
# 응답 텍스트 수집
response_texts = []
for response in responses:
if response.responses:
for resp in response.responses:
answer = resp.get('answer', '')
if answer and topic.lower() in answer.lower() or len(response_texts) < 10:
response_texts.append(answer)
if not response_texts:
return {
"topic": topic,
"message": "관련 응답이 없습니다.",
"insights": []
}
# LLM으로 트렌드 분석
responses_summary = "\n".join([f"- {text[:200]}" for text in response_texts[:15]])
prompt = f"""다음은 사용자들의 설문 응답입니다. **"{topic}"**에 대한 최신 트렌드와 인사이트를 분석해주세요.
**응답 샘플:**
{responses_summary}
**분석 목표:**
- "{topic}"에 대한 최신 트렌드 파악
- 인기 있는 스타일/디자인/느낌
- 사용자들이 원하는 것
- 비즈니스 기회 발견
**JSON 형식으로 반환:**
{{
"current_trends": [
{{
"trend": "트렌드 이름",
"description": "상세 설명",
"popularity": "인기도 (매우 높음/높음/보통)",
"examples": ["예시1", "예시2"]
}}
],
"what_people_like": [
"사람들이 좋아하는 것1",
"사람들이 좋아하는 것2",
"사람들이 좋아하는 것3"
],
"what_people_dont_like": [
"사람들이 싫어하는 것1",
"사람들이 싫어하는 것2"
],
"business_opportunities": [
"비즈니스 기회1",
"비즈니스 기회2",
"비즈니스 기회3"
],
"key_insights": "핵심 인사이트 (한 문단)",
"recommendations": ["추천사항1", "추천사항2"]
}}
주의:
- 모든 값은 한국어로 작성
- 실용적이고 구체적인 정보 제공
- 비즈니스 관점에서 활용 가능한 인사이트
"""
llm_response = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=1000,
temperature=0.7
)
analysis = json.loads(llm_response.choices[0].message.content)
result = {
"topic": topic,
"filters": persona_filters or {},
"analyzed_personas": len(persona_ids),
"analyzed_responses": len(response_texts),
"current_trends": analysis.get("current_trends", []),
"what_people_like": analysis.get("what_people_like", []),
"what_people_dont_like": analysis.get("what_people_dont_like", []),
"business_opportunities": analysis.get("business_opportunities", []),
"key_insights": analysis.get("key_insights", ""),
"recommendations": analysis.get("recommendations", [])
}
# 캐시 저장 (성공한 경우만)
if not result.get("error"):
cache.set(cache_key, result, ttl=timedelta(hours=6))
return result
except Exception as e:
return {"error": f"트렌드 인사이트 분석 중 오류 발생: {str(e)}"}
def search_responses_by_keyword(
keyword: str,
category: Optional[str] = None,
limit: int = 20,
arguments: Optional[Dict] = None
):
"""
AI 기반 의미 검색으로 특정 주제와 관련된 응답 찾기
단순 키워드 매칭이 아닌 의미적으로 관련된 응답을 찾습니다.
예: "스티커 디자인"을 검색하면 "귀여운 캐릭터 스티커", "디자인이 예쁜 스티커" 등도 찾을 수 있음
:param keyword: 검색할 주제/키워드 (예: "스티커 디자인", "키링", "제품 느낌")
:param category: 설문 카테고리 필터
:param limit: 최대 결과 수
:param arguments: 도구 호출 인자 (env 필드 포함 가능)
:return: 검색 결과 (관련도 점수 포함)
"""
try:
# 캐시 확인 (검색 결과는 같은 키워드+필터 조합에서 재사용 가능)
cache = get_cache()
cache_key = cache._generate_cache_key(
"search",
keyword=keyword,
category=category,
limit=limit
)
cached_result = cache.get(cache_key, ttl=timedelta(hours=12))
if cached_result is not None:
return cached_result
groq_api_key = get_groq_api_key(arguments)
if not groq_api_key:
return {"error": "GROQ_API_KEY가 설정되지 않았습니다"}
client = Groq(api_key=groq_api_key)
with get_session() as session:
responses_query = session.query(FGIResponse)
# 카테고리 필터
if category:
survey_ids = session.query(FGISurvey.id).filter(FGISurvey.category == category).all()
survey_ids = [s[0] for s in survey_ids]
responses_query = responses_query.filter(FGIResponse.survey_id.in_(survey_ids))
# 더 많은 응답을 가져와서 LLM으로 필터링
responses = responses_query.order_by(FGIResponse.submitted_at.desc()).limit(limit * 5).all()
# 모든 응답 수집
all_responses = []
for response in responses:
if response.responses:
for resp in response.responses:
answer = resp.get('answer', '')
if answer and len(answer) > 10: # 너무 짧은 응답 제외
all_responses.append({
"persona_id": response.persona_id,
"survey_id": response.survey_id,
"question_id": resp.get('question_id', ''),
"answer": answer,
"submitted_at": response.submitted_at.isoformat() if response.submitted_at else None
})
if not all_responses:
return {
"keyword": keyword,
"category": category,
"count": 0,
"message": "조건에 맞는 응답이 없습니다.",
"responses": []
}
# LLM으로 관련도 분석 (배치 처리)
batch_size = 20
scored_responses = []
for i in range(0, len(all_responses), batch_size):
batch = all_responses[i:i+batch_size]
batch_text = "\n".join([
f"{idx+1}. {resp['answer'][:300]}"
for idx, resp in enumerate(batch)
])
prompt = f"""다음은 사용자들의 설문 응답입니다. 각 응답이 **"{keyword}"**와 얼마나 관련이 있는지 **엄격하게** 평가해주세요.
**검색 주제:** {keyword}
**응답 목록:**
{batch_text}
**평가 기준 (매우 엄격):**
- **직접적으로 언급** (90-100점): 검색 주제의 핵심 키워드가 명확히 언급됨
- 예: "스티커 디자인" 검색 시 → "스티커를 좋아해요", "디자인이 예뻐요" 등
- **간접적으로 관련** (70-89점): 검색 주제와 명확히 연관된 내용
- 예: "스티커 디자인" 검색 시 → "캐릭터 스티커", "예쁜 스티커 디자인" 등
- **약간 관련** (50-69점): 검색 주제와 어느 정도 연관이 있지만 명확하지 않음
- **관련 없음** (0-49점): 검색 주제와 전혀 관련 없음
- 예: "스티커 디자인" 검색 시 → "AI 기술", "음식 맛집" 등은 관련 없음
**중요 규칙:**
- **관련이 명확하지 않으면 낮은 점수를 주세요**
- **검색 주제와 전혀 관련 없는 응답은 30점 이하로 평가**
- **70점 이상만 is_relevant: true로 설정** (기존 50점에서 상향)
- 억지로 관련성을 찾지 마세요. 명확히 관련이 있을 때만 높은 점수
**JSON 형식으로 반환:**
{{
"scores": [
{{
"index": 1,
"relevance_score": 0-100,
"is_relevant": true/false,
"reason": "관련 이유 (한 줄, 명확하게)"
}},
...
]
}}
주의:
- relevance_score: 0-100 (높을수록 관련성 높음)
- **70점 이상이면 is_relevant: true** (엄격한 기준)
- **관련이 명확하지 않으면 낮은 점수를 주세요**
- 억지로 관련성을 찾지 마세요
"""
try:
llm_response = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=800,
temperature=0.3
)
analysis = json.loads(llm_response.choices[0].message.content)
scores = analysis.get("scores", [])
for score_data in scores:
idx = score_data.get("index", 1) - 1
if 0 <= idx < len(batch):
scored_responses.append({
**batch[idx],
"relevance_score": score_data.get("relevance_score", 0),
"is_relevant": score_data.get("is_relevant", False),
"relevance_reason": score_data.get("reason", "")
})
except Exception as e:
# LLM 실패 시 기본 점수 부여 (키워드 포함 여부, 더 엄격하게)
for resp in batch:
answer_lower = resp['answer'].lower()
keyword_lower = keyword.lower()
# 키워드가 직접 포함되어야만 높은 점수
if keyword_lower in answer_lower:
score = 80 # 직접 언급
elif any(word in answer_lower for word in keyword_lower.split()):
score = 40 # 일부 키워드만 언급
else:
score = 5 # 관련 없음
scored_responses.append({
**resp,
"relevance_score": score,
"is_relevant": score >= 70, # 70점 이상만 관련
"relevance_reason": "키워드 직접 매칭" if score >= 70 else "관련성 낮음"
})
# 관련도 점수로 정렬하고 상위 결과만 반환
scored_responses.sort(key=lambda x: x.get("relevance_score", 0), reverse=True)
relevant_responses = [
resp for resp in scored_responses
if resp.get("is_relevant", False) and resp.get("relevance_score", 0) >= 70
][:limit]
# 관련 응답이 없으면 명확한 메시지 반환
if not relevant_responses:
return {
"keyword": keyword,
"category": category,
"total_analyzed": len(all_responses),
"count": 0,
"responses": [],
"message": f"'{keyword}'와 관련된 응답을 찾을 수 없습니다. DB에 해당 주제에 대한 응답이 없을 수 있습니다.",
"note": "검색 결과가 없습니다. 더 많은 설문 응답이 필요할 수 있습니다."
}
result = {
"keyword": keyword,
"category": category,
"total_analyzed": len(all_responses),
"count": len(relevant_responses),
"responses": relevant_responses,
"message": f"'{keyword}'와 관련된 응답 {len(relevant_responses)}개를 찾았습니다."
}
# 캐시 저장 (성공한 경우만)
if not result.get("error"):
cache.set(cache_key, result, ttl=timedelta(hours=12))
return result
except Exception as e:
return {"error": f"응답 검색 중 오류 발생: {str(e)}"}