"""Rumor detection system for identifying and analyzing fake news and misinformation."""
import re
import asyncio
import logging
import json
import hashlib
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Any, Optional, Tuple, Set, Union
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
import statistics
from collections import defaultdict, Counter
import difflib
class RumorType(Enum):
"""Types of rumors detected."""
MISINFORMATION = "misinformation"
DISINFORMATION = "disinformation"
CONSPIRACY = "conspiracy"
MARKET_MANIPULATION = "market_manipulation"
DEFAMATION = "defamation"
HOAX = "hoax"
SATIRE = "satire"
CLICKBAIT = "clickbait"
class RumorError(Exception):
"""Rumor detection specific error."""
pass
@dataclass
class SourceReliability:
"""Source reliability assessment."""
source_name: str
reliability_score: float # 0.0 to 1.0
reputation_score: float
verification_status: str = "unknown"
historical_accuracy: float = 0.5
bias_score: float = 0.5
transparency_score: float = 0.5
last_updated: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
@dataclass
class ContentPattern:
"""Content pattern detected in text."""
pattern_type: str
confidence: float
evidence: List[str]
text_span: Tuple[int, int] = (0, 0)
pattern_strength: float = 0.0
@dataclass
class PropagationMetrics:
"""Metrics for rumor propagation analysis."""
viral_coefficient: float
spread_velocity: float # items per hour
engagement_ratio: float
bot_activity_score: float
network_density: float = 0.0
amplification_factor: float = 1.0
geographic_spread: Dict[str, int] = field(default_factory=dict)
@dataclass
class RumorScore:
"""Detailed rumor scoring breakdown."""
overall_score: float
source_score: float
content_score: float
propagation_score: float
verification_score: float
temporal_score: float = 0.0
social_score: float = 0.0
@dataclass
class RumorResult:
"""Result of rumor analysis."""
news_id: str
rumor_score: float
rumor_type: str
confidence: float
evidence: List[str]
source_reliability: SourceReliability
content_patterns: List[ContentPattern] = field(default_factory=list)
propagation_metrics: Optional[PropagationMetrics] = None
score_breakdown: Optional[RumorScore] = None
verification_status: str = "unverified"
risk_level: str = "medium"
recommendations: List[str] = field(default_factory=list)
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
class RumorDetector:
"""Advanced rumor detection and analysis system."""
def __init__(self):
"""Initialize rumor detector."""
self.logger = logging.getLogger("rumor_detector")
# Pattern definitions
self.rumor_patterns = self._load_rumor_patterns()
# Source reliability database
self.source_reliability_db = self._load_source_reliability_db()
# Tracking storage
self.active_rumors = {}
self.rumor_clusters = defaultdict(list)
# Statistics
self.stats = {
"total_analyzed": 0,
"rumors_detected": 0,
"high_risk_count": 0,
"false_positives": 0
}
# Configuration
self.config = {
"rumor_threshold": 0.6,
"high_risk_threshold": 0.8,
"confidence_threshold": 0.7,
"clustering_similarity": 0.8
}
def _load_rumor_patterns(self) -> Dict[str, List[Dict[str, Any]]]:
"""Load rumor detection patterns."""
patterns = {
"anonymous_source": [
{
"pattern": r"익명의?\s*소식통|내부\s*관계자|한\s*관계자|소식통에?\s*의?하면|anonymous\s*source|according\s*to\s*sources",
"weight": 0.8,
"description": "Anonymous source references"
},
{
"pattern": r"이름을?\s*밝히지?\s*않은|신원\s*미상|정체\s*불명|unnamed\s*source|undisclosed\s*source",
"weight": 0.7,
"description": "Unnamed source indicators"
}
],
"unverified_claim": [
{
"pattern": r"확인되지?\s*않은|검증되지?\s*않은|추측에?\s*불과|루머로?\s*알려져|공식.*확인되지.*않았습니다",
"weight": 0.9,
"description": "Unverified claim indicators"
},
{
"pattern": r"라는?\s*말이?\s*있다|할\s*가능성|일지도?\s*모른다",
"weight": 0.6,
"description": "Speculative language"
}
],
"sensational_language": [
{
"pattern": r"충격적|놀라운|대박|긴급|속보|!!!|대단한|엄청난|shocking|amazing|breaking",
"weight": 0.7,
"description": "Sensational language markers"
},
{
"pattern": r"믿을\s*수\s*없는|상상도\s*못한|전례\s*없는|역사상\s*최|unbelievable|incredible|unprecedented",
"weight": 0.6,
"description": "Hyperbolic expressions"
}
],
"vague_language": [
{
"pattern": r"어떤\s*사람들은?\s*말한다|일부에서는?\s*추측|라는?\s*말이?\s*있다|가능성이?\s*있다고?\s*한다",
"weight": 0.5,
"description": "Vague attribution"
},
{
"pattern": r"알려진\s*바에?\s*따르면|전해지는\s*바|소문에?\s*따르면|들리는\s*바",
"weight": 0.6,
"description": "Indirect reporting"
}
],
"conspiracy_markers": [
{
"pattern": r"음모|숨겨진\s*진실|조작|은폐|정부\s*개입",
"weight": 0.8,
"description": "Conspiracy theory indicators"
},
{
"pattern": r"진짜\s*이유|실제\s*목적|뒤에\s*숨은|배후|흑막",
"weight": 0.7,
"description": "Hidden agenda implications"
}
],
"market_manipulation": [
{
"pattern": r"주가\s*조작|시세\s*조작|작전\s*세력|세력\s*개입",
"weight": 0.9,
"description": "Market manipulation terms"
},
{
"pattern": r"물량\s*공급|급등\s*예상|폭락\s*전망|확실한\s*수익",
"weight": 0.7,
"description": "Investment manipulation"
}
]
}
return patterns
def _load_source_reliability_db(self) -> Dict[str, SourceReliability]:
"""Load source reliability database."""
# Mock database - in real implementation would load from actual DB
return {
"samsung_official": SourceReliability(
source_name="samsung_official",
reliability_score=0.95,
reputation_score=0.9,
verification_status="verified",
historical_accuracy=0.95
),
"reuters": SourceReliability(
source_name="reuters",
reliability_score=0.9,
reputation_score=0.85,
verification_status="verified",
historical_accuracy=0.9
),
"unknown_blog": SourceReliability(
source_name="unknown_blog",
reliability_score=0.2,
reputation_score=0.1,
verification_status="unverified",
historical_accuracy=0.3
),
"tech_blog": SourceReliability(
source_name="tech_blog",
reliability_score=0.6,
reputation_score=0.5,
verification_status="pending",
historical_accuracy=0.6
)
}
async def analyze_rumor(self, news_item: Dict[str, Any]) -> RumorResult:
"""Analyze news item for rumor characteristics."""
if news_item is None:
raise RumorError("News item cannot be None")
try:
news_id = news_item.get("id", "unknown")
# Check source reliability
source_reliability = await self.check_source_reliability(
news_item.get("source", "unknown")
)
# Detect content patterns
content = news_item.get("content", "") + " " + news_item.get("title", "")
content_patterns = await self.detect_patterns(content)
# Calculate rumor score
rumor_score = await self.calculate_rumor_score(news_item)
# Classify rumor type
rumor_type = await self._classify_rumor_type(content, content_patterns)
# Calculate confidence
confidence = await self.calculate_confidence({
"content": content,
"source_reliability": source_reliability.reliability_score,
"pattern_matches": len(content_patterns)
})
# Extract evidence
evidence = [pattern.pattern_type for pattern in content_patterns]
# Determine risk level
risk_level = self._determine_risk_level(rumor_score, confidence)
# Generate recommendations
recommendations = await self._generate_recommendations(rumor_score, rumor_type, risk_level)
# Update statistics
self.stats["total_analyzed"] += 1
if rumor_score > self.config["rumor_threshold"]:
self.stats["rumors_detected"] += 1
if rumor_score > self.config["high_risk_threshold"]:
self.stats["high_risk_count"] += 1
return RumorResult(
news_id=news_id,
rumor_score=rumor_score,
rumor_type=rumor_type,
confidence=confidence,
evidence=evidence,
source_reliability=source_reliability,
content_patterns=content_patterns,
risk_level=risk_level,
recommendations=recommendations
)
except Exception as e:
self.logger.error(f"Error analyzing rumor: {e}")
# Return default result for graceful handling
return RumorResult(
news_id=news_item.get("id", "error"),
rumor_score=0.5,
rumor_type=RumorType.MISINFORMATION.value,
confidence=0.0,
evidence=["analysis_error"],
source_reliability=SourceReliability("unknown", 0.5, 0.5)
)
async def check_source_reliability(self, source_name: str) -> SourceReliability:
"""Check reliability of news source."""
if source_name in self.source_reliability_db:
return self.source_reliability_db[source_name]
# Default reliability for unknown sources
return SourceReliability(
source_name=source_name,
reliability_score=0.4, # Default to low-medium reliability
reputation_score=0.3,
verification_status="unknown"
)
async def detect_patterns(self, text: str) -> List[ContentPattern]:
"""Detect rumor patterns in text content."""
if not text:
return []
detected_patterns = []
for pattern_type, patterns in self.rumor_patterns.items():
for pattern_info in patterns:
pattern = pattern_info["pattern"]
weight = pattern_info["weight"]
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
detected_patterns.append(ContentPattern(
pattern_type=pattern_type,
confidence=weight,
evidence=[match.group()],
text_span=(match.start(), match.end()),
pattern_strength=weight
))
return detected_patterns
async def calculate_rumor_score(self, news_item: Dict[str, Any]) -> float:
"""Calculate overall rumor score."""
score_components = []
# Source reliability component (0.3 weight)
source_name = news_item.get("source", "unknown")
source_reliability = await self.check_source_reliability(source_name)
source_score = 1.0 - source_reliability.reliability_score
score_components.append(source_score * 0.3)
# Content pattern component (0.4 weight)
content = news_item.get("content", "") + " " + news_item.get("title", "")
patterns = await self.detect_patterns(content)
content_score = min(sum(p.confidence for p in patterns) / 5.0, 1.0)
score_components.append(content_score * 0.4)
# Author credibility component (0.1 weight)
author = news_item.get("author", "")
author_score = 0.8 if author == "anonymous" or not author else 0.2
score_components.append(author_score * 0.1)
# Social propagation component (0.2 weight)
social_shares = news_item.get("social_shares", 0)
view_count = news_item.get("view_count", 0)
# High shares with low credible sources might indicate viral misinformation
if social_shares > 1000 and source_reliability.reliability_score < 0.5:
social_score = min(social_shares / 5000.0, 1.0) # Lower threshold for higher scores
else:
social_score = 0.0
score_components.append(social_score * 0.2)
return min(sum(score_components), 1.0)
async def _classify_rumor_type(self, content: str, patterns: List[ContentPattern]) -> str:
"""Classify the type of rumor based on content and patterns."""
pattern_counts = Counter(p.pattern_type for p in patterns)
# Market manipulation indicators
if (pattern_counts.get("market_manipulation", 0) > 0 or
re.search(r"주가|투자|수익|매수|매도", content, re.IGNORECASE)):
return RumorType.MARKET_MANIPULATION.value
# Conspiracy theory indicators
if (pattern_counts.get("conspiracy_markers", 0) > 0 or
pattern_counts.get("anonymous_source", 0) > 1):
return RumorType.CONSPIRACY.value
# Defamation indicators
if re.search(r"비리|스캔들|부정|횡령|사기", content, re.IGNORECASE):
return RumorType.DEFAMATION.value
# Default to misinformation
return RumorType.MISINFORMATION.value
async def calculate_confidence(self, analysis_data: Dict[str, Any]) -> float:
"""Calculate confidence in rumor detection."""
confidence_factors = []
# Pattern clarity
pattern_matches = analysis_data.get("pattern_matches", 0)
pattern_confidence = min(pattern_matches / 3.0, 1.0)
confidence_factors.append(pattern_confidence * 0.4)
# Source reliability influence
source_reliability = analysis_data.get("source_reliability", 0.5)
# High or low reliability both increase confidence in assessment
reliability_confidence = abs(source_reliability - 0.5) * 2
confidence_factors.append(reliability_confidence * 0.3)
# Content length and detail
content_length = len(analysis_data.get("content", ""))
length_confidence = min(content_length / 500.0, 1.0)
confidence_factors.append(length_confidence * 0.2)
# Base confidence
confidence_factors.append(0.5 * 0.1)
return min(sum(confidence_factors), 1.0)
def _determine_risk_level(self, rumor_score: float, confidence: float) -> str:
"""Determine risk level based on score and confidence."""
if rumor_score >= 0.8 and confidence >= 0.7:
return "critical"
elif rumor_score >= 0.6 and confidence >= 0.5:
return "high"
elif rumor_score >= 0.4:
return "medium"
else:
return "low"
async def _generate_recommendations(self, rumor_score: float, rumor_type: str,
risk_level: str) -> List[str]:
"""Generate recommendations based on analysis."""
recommendations = []
if risk_level in ["critical", "high"]:
recommendations.append("Immediate fact-checking required")
recommendations.append("Consider issuing official statement")
if rumor_type == RumorType.MARKET_MANIPULATION.value:
recommendations.append("Notify financial regulators")
recommendations.append("Monitor stock price movements")
if rumor_score > 0.7:
recommendations.append("Track viral spread patterns")
recommendations.append("Identify original source")
recommendations.append("Continue monitoring for updates")
return recommendations
async def analyze_propagation(self, news_item: Dict[str, Any]) -> PropagationMetrics:
"""Analyze rumor propagation patterns."""
social_shares = news_item.get("social_shares", 0)
view_count = news_item.get("view_count", 0)
comments_count = news_item.get("comments_count", 0)
# Calculate metrics
viral_coefficient = social_shares / max(view_count, 1) * 100
engagement_ratio = comments_count / max(social_shares, 1)
# Estimate spread velocity (simplified)
time_since_published = (datetime.now(timezone.utc) -
news_item.get("published_at", datetime.now(timezone.utc))).total_seconds() / 3600
spread_velocity = social_shares / max(time_since_published, 1)
# Bot activity estimation (simplified)
bot_activity_score = min(viral_coefficient / 10.0, 1.0) if viral_coefficient > 5 else 0.0
return PropagationMetrics(
viral_coefficient=viral_coefficient,
spread_velocity=spread_velocity,
engagement_ratio=engagement_ratio,
bot_activity_score=bot_activity_score
)
async def cross_reference_verification(self, news_item: Dict[str, Any]) -> Dict[str, Any]:
"""Cross-reference with reliable sources for verification."""
# Mock implementation - would integrate with real fact-checking APIs
entities = news_item.get("entities", {}).get("companies", [])
verification_score = 0.5 # Default neutral
matching_sources = []
contradiction_sources = []
# Simulate cross-referencing
if "삼성전자" in entities:
# Check if similar news exists in reliable sources
matching_sources = ["reuters", "bloomberg"]
verification_score = 0.7
return {
"verification_score": verification_score,
"matching_sources": matching_sources,
"contradiction_sources": contradiction_sources
}
async def check_temporal_consistency(self, news_timeline: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Check temporal consistency of related news items."""
if len(news_timeline) < 2:
return {"consistency_score": 1.0, "anomalies": [], "trend_analysis": "insufficient_data"}
# Sort by timestamp
timeline = sorted(news_timeline, key=lambda x: x["timestamp"])
# Check for sudden sentiment changes
sentiments = [item.get("sentiment", "neutral") for item in timeline]
anomalies = []
for i in range(1, len(sentiments)):
if sentiments[i-1] == "neutral" and sentiments[i] == "speculative":
anomalies.append(f"Sudden shift to speculation at index {i}")
# Calculate consistency score
consistency_score = max(0.0, 1.0 - len(anomalies) / len(timeline))
return {
"consistency_score": consistency_score,
"anomalies": anomalies,
"trend_analysis": "escalating" if len(anomalies) > 0 else "stable"
}
async def analyze_linguistic_features(self, text: str) -> Dict[str, float]:
"""Analyze linguistic features for rumor detection."""
# Certainty indicators
certainty_words = ["확실", "분명", "틀림없이", "반드시"]
uncertainty_words = ["아마", "~같다", "~듯", "~일지도"]
certainty_count = sum(1 for word in certainty_words if word in text)
uncertainty_count = sum(1 for word in uncertainty_words if word in text)
certainty_score = certainty_count / (certainty_count + uncertainty_count + 1)
# Formality indicators
formal_words = ["발표", "공식", "보도자료", "성명"]
informal_words = ["얘기", "소문", "찌라시", "~래"]
formal_count = sum(1 for word in formal_words if word in text)
informal_count = sum(1 for word in informal_words if word in text)
formality_score = formal_count / (formal_count + informal_count + 1)
# Objectivity score (simplified)
subjective_words = ["놀라운", "충격적", "대단한", "믿을수없는"]
subjective_count = sum(1 for word in subjective_words if word in text)
objectivity_score = max(0.0, 1.0 - subjective_count / 10.0)
return {
"certainty_score": certainty_score,
"formality_score": formality_score,
"objectivity_score": objectivity_score
}
async def analyze_social_signals(self, social_data: Dict[str, Any]) -> Dict[str, float]:
"""Analyze social media signals for rumor detection."""
shares = social_data.get("shares", 0)
likes = social_data.get("likes", 0)
comments = social_data.get("comments", 0)
# Viral score
total_engagement = shares + likes + comments
viral_score = min(total_engagement / 10000.0, 1.0)
# Authenticity score based on user types
user_types = social_data.get("user_types", {})
verified_users = user_types.get("verified", 0)
suspicious_users = user_types.get("suspicious", 0)
total_users = sum(user_types.values())
authenticity_score = verified_users / max(total_users, 1)
bot_probability = suspicious_users / max(total_users, 1)
# Engagement quality
if shares > 0:
engagement_quality = (likes + comments) / shares
else:
engagement_quality = 0.0
engagement_quality = min(engagement_quality / 10.0, 1.0)
return {
"viral_score": viral_score,
"authenticity_score": authenticity_score,
"bot_probability": bot_probability,
"engagement_quality": engagement_quality
}
async def check_facts(self, claim: str) -> Dict[str, Any]:
"""Check facts using external fact-checking services."""
# Mock implementation
return await self._call_fact_check_api(claim)
async def _call_fact_check_api(self, claim: str) -> Dict[str, Any]:
"""Mock fact-checking API call."""
# Simulate API response
await asyncio.sleep(0.1) # Simulate API delay
return {
"fact_check_result": "unverified",
"confidence": 0.6,
"sources": ["fact_check_service"],
"explanation": "No reliable sources found to verify this claim"
}
async def detect_rumor_clusters(self, rumors: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]:
"""Detect clusters of similar rumors."""
if len(rumors) < 2:
return [rumors] if rumors else []
clusters = []
processed = set()
for i, rumor1 in enumerate(rumors):
if i in processed:
continue
cluster = [rumor1]
processed.add(i)
for j, rumor2 in enumerate(rumors[i+1:], i+1):
if j in processed:
continue
# Calculate similarity (simplified)
content1 = rumor1.get("content", "")
content2 = rumor2.get("content", "")
similarity = self._calculate_text_similarity(content1, content2)
if similarity > self.config["clustering_similarity"]:
cluster.append(rumor2)
processed.add(j)
clusters.append(cluster)
return clusters
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""Calculate similarity between two texts."""
# Enhanced similarity calculation
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
# Jaccard similarity
intersection = words1.intersection(words2)
union = words1.union(words2)
jaccard = len(intersection) / len(union)
# Key term similarity (give more weight to important terms)
key_terms = {"삼성전자", "CEO", "교체", "변경", "리더십", "최고경영자"}
key_intersection = words1.intersection(words2).intersection(key_terms)
key_boost = len(key_intersection) * 0.3
return min(jaccard + key_boost, 1.0)
async def start_rumor_tracking(self, rumor_id: str, initial_data: Dict[str, Any]):
"""Start tracking a rumor's evolution."""
self.active_rumors[rumor_id] = {
"initial_data": initial_data,
"timeline": [initial_data],
"started_at": datetime.now(timezone.utc)
}
async def update_rumor_tracking(self, rumor_id: str, updated_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update rumor tracking with new data."""
if rumor_id not in self.active_rumors:
return {"error": "Rumor not being tracked"}
tracking_data = self.active_rumors[rumor_id]
tracking_data["timeline"].append(updated_data)
# Calculate spread rate
initial_shares = tracking_data["initial_data"].get("social_shares", 0)
current_shares = updated_data.get("social_shares", initial_shares)
time_elapsed = (datetime.now(timezone.utc) - tracking_data["started_at"]).total_seconds() / 3600
spread_rate = (current_shares - initial_shares) / max(time_elapsed, 1)
# Determine escalation level
if spread_rate > 100:
escalation_level = "high"
elif spread_rate > 50:
escalation_level = "medium"
else:
escalation_level = "low"
return {
"spread_rate": spread_rate,
"escalation_level": escalation_level,
"total_timeline_events": len(tracking_data["timeline"])
}
async def generate_mitigation_recommendations(self, rumor_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate recommendations for rumor mitigation."""
recommendations = []
rumor_score = rumor_data.get("rumor_score", 0.0)
rumor_type = rumor_data.get("rumor_type", "")
viral_coefficient = rumor_data.get("viral_coefficient", 0.0)
# High priority actions for critical rumors
if rumor_score > 0.8:
recommendations.append({
"action": "Issue immediate official statement",
"priority": "critical",
"rationale": "High rumor score requires immediate response"
})
if viral_coefficient > 2.0:
recommendations.append({
"action": "Implement social media monitoring",
"priority": "high",
"rationale": "High viral coefficient indicates rapid spread"
})
if rumor_type == RumorType.MARKET_MANIPULATION.value:
recommendations.append({
"action": "Contact financial authorities",
"priority": "high",
"rationale": "Market manipulation requires regulatory notification"
})
# Standard recommendations
recommendations.append({
"action": "Monitor rumor evolution",
"priority": "medium",
"rationale": "Continuous monitoring helps track impact"
})
return recommendations
async def analyze_historical_patterns(self, historical_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze historical rumor patterns."""
if not historical_data:
return {"error": "No historical data provided"}
# Analyze common patterns
rumor_types = [item.get("rumor_type") for item in historical_data]
type_frequency = Counter(rumor_types)
# Analyze target entities
target_entities = []
for item in historical_data:
target_entities.extend(item.get("target_entity", []))
entity_frequency = Counter(target_entities)
# Calculate average spread time
spread_times = [item.get("spread_time", 0) for item in historical_data if item.get("spread_time")]
avg_spread_time = statistics.mean(spread_times) if spread_times else 0
return {
"common_patterns": dict(type_frequency.most_common(5)),
"seasonal_trends": "No clear seasonal pattern detected",
"target_analysis": dict(entity_frequency.most_common(5)),
"effectiveness_metrics": {
"average_spread_time": avg_spread_time,
"total_analyzed": len(historical_data)
}
}
async def run_verification_workflow(self, news_item: Dict[str, Any]) -> Dict[str, Any]:
"""Run complete rumor verification workflow."""
workflow_result = {}
# Step 1: Basic rumor analysis
rumor_analysis = await self.analyze_rumor(news_item)
workflow_result["rumor_analysis"] = asdict(rumor_analysis)
# Step 2: Source reliability check
source_check = await self.check_source_reliability(news_item.get("source", ""))
workflow_result["source_check"] = asdict(source_check)
# Step 3: Fact verification
content = news_item.get("content", "")
fact_verification = await self.check_facts(content[:200]) # First 200 chars
workflow_result["fact_verification"] = fact_verification
# Step 4: Final assessment
final_score = (rumor_analysis.rumor_score +
(1 - source_check.reliability_score) +
(1 - fact_verification.get("confidence", 0.5))) / 3
workflow_result["final_assessment"] = {
"overall_rumor_probability": final_score,
"recommendation": "investigate" if final_score > 0.6 else "monitor"
}
# Step 5: Generate recommendations
recommendations = await self.generate_mitigation_recommendations({
"rumor_score": final_score,
"rumor_type": rumor_analysis.rumor_type
})
workflow_result["recommendations"] = recommendations
return workflow_result
async def assess_rumor_impact(self, rumor_data: Dict[str, Any]) -> Dict[str, Any]:
"""Assess potential impact of rumor."""
rumor_type = rumor_data.get("rumor_type", "")
target_entities = rumor_data.get("target_entities", [])
reach = rumor_data.get("reach", 0)
credibility = rumor_data.get("credibility", 0.5)
# Market impact assessment
if rumor_type == RumorType.MARKET_MANIPULATION.value:
market_impact_score = min(reach / 100000.0 * credibility, 1.0)
else:
market_impact_score = 0.1
# Reputation damage assessment
reputation_damage_score = credibility * min(reach / 50000.0, 1.0)
# Investor sentiment effect
if any("전자" in entity for entity in target_entities):
investor_sentiment_effect = market_impact_score * 0.8
else:
investor_sentiment_effect = market_impact_score * 0.3
# Estimated financial impact (in millions)
estimated_financial_impact = market_impact_score * 100
return {
"market_impact_score": market_impact_score,
"reputation_damage_score": reputation_damage_score,
"investor_sentiment_effect": investor_sentiment_effect,
"estimated_financial_impact": estimated_financial_impact
}
async def generate_automated_response(self, rumor_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate automated response for detected rumors."""
rumor_score = rumor_data.get("rumor_score", 0.0)
confidence = rumor_data.get("confidence", 0.0)
rumor_type = rumor_data.get("rumor_type", "")
viral_coefficient = rumor_data.get("viral_coefficient", 0.0)
# Determine response type
if rumor_score > 0.9 and confidence > 0.8:
response_type = "immediate_intervention"
urgency_level = "critical"
elif rumor_score > 0.7:
response_type = "active_monitoring"
urgency_level = "high"
else:
response_type = "passive_monitoring"
urgency_level = "medium"
# Generate suggested actions
suggested_actions = []
if response_type == "immediate_intervention":
suggested_actions.extend([
"Deploy fact-checking response",
"Issue official statement",
"Contact platform administrators"
])
if viral_coefficient > 2.0:
suggested_actions.append("Implement counter-narrative strategy")
# Notification targets
notification_targets = ["content_moderators"]
if rumor_type == RumorType.MARKET_MANIPULATION.value:
notification_targets.append("financial_regulators")
if urgency_level == "critical":
notification_targets.append("executive_team")
return {
"response_type": response_type,
"urgency_level": urgency_level,
"suggested_actions": suggested_actions,
"notification_targets": notification_targets
}
async def analyze_propagation_network(self, propagation_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze rumor propagation network structure."""
propagation_path = propagation_data.get("propagation_path", [])
amplifiers = propagation_data.get("amplifiers", [])
# Analyze network structure
total_reach = sum(user.get("followers", 0) for user in propagation_path)
key_amplifiers = sorted(propagation_path, key=lambda x: x.get("followers", 0), reverse=True)[:3]
# Detect suspicious patterns
suspicious_patterns = []
if len(amplifiers) > len(propagation_path) * 0.3:
suspicious_patterns.append("High bot amplification detected")
rapid_spread = any(
(propagation_path[i]["timestamp"] - propagation_path[i-1]["timestamp"]).total_seconds() < 300
for i in range(1, len(propagation_path))
)
if rapid_spread:
suspicious_patterns.append("Unnaturally rapid spread detected")
# Identify intervention points
intervention_points = [user["user"] for user in key_amplifiers]
return {
"network_structure": {
"total_nodes": len(propagation_path),
"total_reach": total_reach,
"amplification_ratio": len(amplifiers) / max(len(propagation_path), 1)
},
"key_amplifiers": [user["user"] for user in key_amplifiers],
"suspicious_patterns": suspicious_patterns,
"intervention_points": intervention_points
}
async def track_rumor_evolution(self, rumor_versions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Track how rumors evolve over time."""
if len(rumor_versions) < 2:
return {"error": "Need at least 2 versions to track evolution"}
# Sort by timestamp
versions = sorted(rumor_versions, key=lambda x: x["timestamp"])
# Analyze content drift
content_changes = []
for i in range(1, len(versions)):
prev_content = versions[i-1]["content"]
curr_content = versions[i]["content"]
similarity = self._calculate_text_similarity(prev_content, curr_content)
content_changes.append(1 - similarity) # Higher value = more change
# Detect escalation pattern
if all(change > 0.3 for change in content_changes):
escalation_pattern = "increasing_dramatization"
elif content_changes[-1] > content_changes[0]:
escalation_pattern = "escalating"
else:
escalation_pattern = "stable"
# Calculate mutation points
mutation_points = [
i for i, change in enumerate(content_changes)
if change > 0.5
]
return {
"escalation_pattern": escalation_pattern,
"content_drift": statistics.mean(content_changes),
"amplification_factor": len(versions) / max((versions[-1]["timestamp"] - versions[0]["timestamp"]).total_seconds() / 3600, 1),
"mutation_points": mutation_points
}
async def analyze_batch_rumors(self, news_batch: List[Dict[str, Any]]) -> List[RumorResult]:
"""Analyze batch of news items for rumors efficiently."""
# Process in parallel batches
batch_size = 10
results = []
for i in range(0, len(news_batch), batch_size):
batch = news_batch[i:i + batch_size]
# Process batch concurrently
batch_tasks = [self.analyze_rumor(item) for item in batch]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
# Handle exceptions
for result in batch_results:
if isinstance(result, Exception):
self.logger.error(f"Batch processing error: {result}")
results.append(RumorResult(
news_id="error",
rumor_score=0.0,
rumor_type=RumorType.MISINFORMATION.value,
confidence=0.0,
evidence=["processing_error"],
source_reliability=SourceReliability("unknown", 0.5, 0.5)
))
else:
results.append(result)
return results
async def analyze_with_sentiment_integration(self, news_item: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze rumors with sentiment analysis integration."""
# Mock integration - would import and use real sentiment analyzer
mock_sentiment = {
"sentiment": "negative",
"confidence": 0.8,
"score": 0.2
}
rumor_result = await self.analyze_rumor(news_item)
# Calculate correlation between sentiment and rumor score
sentiment_score = mock_sentiment["score"]
sentiment_correlation = abs(rumor_result.rumor_score - (1 - sentiment_score))
# Integrated score considering both analyses
integrated_score = (rumor_result.rumor_score +
mock_sentiment["confidence"] *
(1 if mock_sentiment["sentiment"] == "negative" else 0)) / 2
return {
"rumor_analysis": asdict(rumor_result),
"sentiment_analysis": mock_sentiment,
"sentiment_correlation": sentiment_correlation,
"integrated_score": integrated_score
}
async def save_rumor_detection(self, rumor_data: Dict[str, Any]) -> bool:
"""Save rumor detection to database."""
# Mock implementation - would save to actual database
try:
# Simulate database save
await asyncio.sleep(0.01)
return True
except Exception as e:
self.logger.error(f"Error saving rumor detection: {e}")
return False
async def get_rumor_detection(self, news_id: str) -> Dict[str, Any]:
"""Retrieve rumor detection from database."""
# Mock implementation - would retrieve from actual database
return {
"news_id": news_id,
"rumor_score": 0.7,
"detected_at": datetime.now(timezone.utc),
"status": "under_investigation"
}
async def check_alert_conditions(self, rumor_data: Dict[str, Any]) -> bool:
"""Check if rumor should trigger alerts."""
rumor_score = rumor_data.get("rumor_score", 0.0)
viral_coefficient = rumor_data.get("viral_coefficient", 0.0)
rumor_type = rumor_data.get("rumor_type", "")
# High risk conditions
if rumor_score >= self.config["high_risk_threshold"]:
return True
if viral_coefficient > 3.0:
return True
if rumor_type == RumorType.MARKET_MANIPULATION.value and rumor_score > 0.6:
return True
return False
async def generate_rumor_alert(self, rumor_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate alert for high-risk rumor detection."""
rumor_score = rumor_data.get("rumor_score", 0.0)
rumor_type = rumor_data.get("rumor_type", "")
# Determine alert level
if rumor_score >= 0.9:
alert_level = "critical"
elif rumor_score >= 0.7:
alert_level = "high"
else:
alert_level = "medium"
# Generate message
message = f"High-risk {rumor_type} detected with score {rumor_score:.2f}"
# Recommended actions
recommended_actions = [
"Verify information with reliable sources",
"Monitor social media spread",
"Prepare official response if necessary"
]
if rumor_type == RumorType.MARKET_MANIPULATION.value:
recommended_actions.append("Notify financial authorities")
return {
"alert_level": alert_level,
"message": message,
"recommended_actions": recommended_actions,
"timestamp": datetime.now(timezone.utc)
}