"""
Code Security Analyzer Module
Provides proactive security suggestions for code
"""
import base64
import hashlib
import json
import logging
import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class CodeSecurityAnalyzer:
"""代码安全分析器 - 提供主动安全建议"""
def __init__(self, threat_aggregator):
self.threat_aggregator = threat_aggregator
# 安全模式库
self.security_patterns = {
"hardcoded_secrets": [
r'password\s*=\s*["\'][^"\']+["\']',
r'api_key\s*=\s*["\'][^"\']+["\']',
r'secret\s*=\s*["\'][^"\']+["\']',
r'token\s*=\s*["\'][^"\']+["\']',
r'aws_access_key_id\s*=\s*["\'][^"\']+["\']',
r'private_key\s*=\s*["\'][^"\']+["\']',
],
"sql_injection": [
r"SELECT\s+.*\s+FROM\s+.*\s+WHERE\s+.*\+",
r"INSERT\s+INTO\s+.*\s+VALUES\s*\([^)]*\+",
r"UPDATE\s+.*\s+SET\s+.*\+",
r"DELETE\s+FROM\s+.*\s+WHERE\s+.*\+",
],
"xss_vulnerabilities": [
r"innerHTML\s*=\s*.*\+",
r"document\.write\s*\([^)]*\+",
r"eval\s*\([^)]*\+",
r'setTimeout\s*\([^)]*\+.*["\'][^"\']*\+',
r'setInterval\s*\([^)]*\+.*["\'][^"\']*\+',
],
"insecure_random": [
r"Math\.random\(\)",
r"random\.random\(\)",
r"rand\(\)",
r"srand\(",
],
"weak_crypto": [
r"MD5\(",
r"SHA1\(",
r"DES\(",
r"RC4\(",
r"md5\(",
r"sha1\(",
],
}
# 安全建议模板
self.security_recommendations = {
"hardcoded_secrets": {
"zh": "检测到硬编码的敏感信息。建议使用环境变量或密钥管理服务。",
"en": "Hardcoded sensitive information detected. Use environment variables or key management services.",
"fix_example": {
"zh": "使用 os.getenv('API_KEY') 替代硬编码",
"en": "Use os.getenv('API_KEY') instead of hardcoding",
},
},
"sql_injection": {
"zh": "可能存在SQL注入漏洞。使用参数化查询或ORM。",
"en": "Potential SQL injection vulnerability. Use parameterized queries or ORM.",
"fix_example": {
"zh": "使用 cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))",
"en": "Use cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))",
},
},
"xss_vulnerabilities": {
"zh": "可能存在XSS漏洞。对用户输入进行适当的转义和验证。",
"en": "Potential XSS vulnerability. Properly escape and validate user input.",
"fix_example": {
"zh": "使用 DOMPurify.sanitize() 或类似的库",
"en": "Use DOMPurify.sanitize() or similar libraries",
},
},
"insecure_random": {
"zh": "使用了不安全的随机数生成器。对于安全用途,请使用加密安全的随机数。",
"en": "Insecure random number generator used. Use cryptographically secure random for security purposes.",
"fix_example": {
"zh": "使用 secrets.SystemRandom() 或 os.urandom()",
"en": "Use secrets.SystemRandom() or os.urandom()",
},
},
"weak_crypto": {
"zh": "使用了弱加密算法。建议使用SHA-256或更强的算法。",
"en": "Weak cryptographic algorithm used. Use SHA-256 or stronger algorithms.",
"fix_example": {
"zh": "使用 hashlib.sha256() 替代 hashlib.md5()",
"en": "Use hashlib.sha256() instead of hashlib.md5()",
},
},
}
async def analyze_code_security(
self, code_content: str, language: str = "auto", locale: str = "zh"
) -> Dict[str, Any]:
"""分析代码安全性并提供建议"""
results = {
"analysis_type": "code_security",
"timestamp": datetime.now().isoformat(),
"language": language,
"locale": locale,
"vulnerabilities": [],
"recommendations": [],
"risk_score": 0,
"secure_alternatives": [],
}
# 检测编程语言
if language == "auto":
language = self._detect_language(code_content)
results["language"] = language
# 执行安全模式匹配
for pattern_type, patterns in self.security_patterns.items():
matches = self._find_security_issues(code_content, patterns, pattern_type)
results["vulnerabilities"].extend(matches)
# 提取并分析硬编码的网络指标
network_indicators = self._extract_network_indicators(code_content)
if network_indicators:
threat_analysis = await self._analyze_network_indicators(network_indicators)
results["vulnerabilities"].extend(threat_analysis)
# 生成安全建议
results["recommendations"] = self._generate_recommendations(
results["vulnerabilities"], locale
)
# 生成安全替代方案
results["secure_alternatives"] = self._generate_secure_alternatives(
code_content, results["vulnerabilities"], locale
)
# 计算风险评分
results["risk_score"] = self._calculate_code_risk_score(
results["vulnerabilities"]
)
return results
def _detect_language(self, code_content: str) -> str:
"""检测编程语言"""
language_indicators = {
"python": [
r"import\s+\w+",
r"def\s+\w+\(",
r"class\s+\w+:",
r'if\s+__name__\s*==\s*["\']__main__["\']',
],
"javascript": [
r"function\s+\w+\(",
r"var\s+\w+\s*=",
r"const\s+\w+\s*=",
r"let\s+\w+\s*=",
r"console\.log",
],
"java": [
r"public\s+class\s+\w+",
r"public\s+static\s+void\s+main",
r"import\s+java\.",
],
"csharp": [
r"using\s+System",
r"public\s+class\s+\w+",
r"static\s+void\s+Main",
],
"php": [r"<\?php", r"\$\w+\s*=", r"function\s+\w+\("],
"go": [r"package\s+\w+", r"func\s+\w+\(", r'import\s+["\']'],
"rust": [r"fn\s+\w+\(", r"let\s+\w+\s*=", r"use\s+\w+"],
"cpp": [r"#include\s*<", r"int\s+main\(", r"std::"],
"sql": [r"SELECT\s+", r"INSERT\s+INTO", r"UPDATE\s+", r"DELETE\s+FROM"],
}
scores = {}
for lang, patterns in language_indicators.items():
score = 0
for pattern in patterns:
matches = re.findall(pattern, code_content, re.IGNORECASE)
score += len(matches)
scores[lang] = score
return max(scores, key=scores.get) if scores else "unknown"
def _find_security_issues(
self, code_content: str, patterns: List[str], issue_type: str
) -> List[Dict[str, Any]]:
"""查找安全问题"""
issues = []
lines = code_content.split("\n")
for pattern in patterns:
for line_num, line in enumerate(lines, 1):
matches = re.finditer(pattern, line, re.IGNORECASE)
for match in matches:
issues.append(
{
"type": issue_type,
"line": line_num,
"column": match.start(),
"matched_text": match.group(),
"severity": self._get_severity(issue_type),
"description": self._get_description(issue_type, "zh"),
"line_content": line.strip(),
}
)
return issues
def _extract_network_indicators(self, code_content: str) -> Dict[str, List[str]]:
"""提取代码中的网络指标"""
indicators = {"ips": [], "domains": [], "urls": []}
# IP地址模式
ip_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
indicators["ips"] = re.findall(ip_pattern, code_content)
# 域名模式
domain_pattern = (
r"\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b"
)
potential_domains = re.findall(domain_pattern, code_content)
# 过滤掉常见的非域名匹配
indicators["domains"] = [
d
for d in potential_domains
if not d.endswith((".py", ".js", ".java", ".cpp", ".h"))
]
# URL模式
url_pattern = r'https?://[^\s<>"\']+|ftp://[^\s<>"\']+|ftps://[^\s<>"\']+|sftp://[^\s<>"\']+'
indicators["urls"] = re.findall(url_pattern, code_content)
return indicators
async def _analyze_network_indicators(
self, indicators: Dict[str, List[str]]
) -> List[Dict[str, Any]]:
"""分析网络指标的威胁情报"""
threats = []
# 分析IP地址
for ip in indicators["ips"]:
if ip not in ["127.0.0.1", "0.0.0.0", "255.255.255.255"]: # 排除本地IP
try:
result = await self.threat_aggregator.analyze_indicator(ip)
if result.get("overall_reputation") == "malicious":
threats.append(
{
"type": "malicious_ip",
"indicator": ip,
"severity": "high",
"description": f"代码中包含恶意IP地址: {ip}",
"threat_details": result,
}
)
except Exception as e:
logger.error(f"Error analyzing IP {ip}: {e}")
# 分析域名
for domain in indicators["domains"]:
if domain not in ["localhost", "example.com", "test.com"]: # 排除测试域名
try:
result = await self.threat_aggregator.analyze_indicator(domain)
if result.get("overall_reputation") == "malicious":
threats.append(
{
"type": "malicious_domain",
"indicator": domain,
"severity": "high",
"description": f"代码中包含恶意域名: {domain}",
"threat_details": result,
}
)
except Exception as e:
logger.error(f"Error analyzing domain {domain}: {e}")
# 分析URL
for url in indicators["urls"]:
try:
result = await self.threat_aggregator.analyze_indicator(url)
if result.get("overall_reputation") == "malicious":
threats.append(
{
"type": "malicious_url",
"indicator": url,
"severity": "critical",
"description": f"代码中包含恶意URL: {url}",
"threat_details": result,
}
)
except Exception as e:
logger.error(f"Error analyzing URL {url}: {e}")
return threats
def _generate_recommendations(
self, vulnerabilities: List[Dict], locale: str
) -> List[str]:
"""生成安全建议"""
recommendations = []
vuln_types = set(vuln["type"] for vuln in vulnerabilities)
for vuln_type in vuln_types:
if vuln_type in self.security_recommendations:
rec = self.security_recommendations[vuln_type]
recommendations.append(rec.get(locale, rec.get("en", "")))
# 通用建议
if vulnerabilities:
if locale == "zh":
recommendations.extend(
[
"定期进行代码安全审查",
"使用静态代码分析工具",
"实施安全编码培训",
"建立安全开发生命周期(SDLC)",
]
)
else:
recommendations.extend(
[
"Conduct regular code security reviews",
"Use static code analysis tools",
"Implement secure coding training",
"Establish Secure Development Lifecycle (SDLC)",
]
)
return recommendations
def _generate_secure_alternatives(
self, code_content: str, vulnerabilities: List[Dict], locale: str
) -> List[Dict[str, str]]:
"""生成安全的代码替代方案"""
alternatives = []
for vuln in vulnerabilities:
if vuln["type"] in self.security_recommendations:
rec = self.security_recommendations[vuln["type"]]
fix_example = rec.get("fix_example", {})
alternatives.append(
{
"vulnerability_type": vuln["type"],
"line": vuln.get("line", 0),
"original_code": vuln.get("line_content", ""),
"secure_alternative": fix_example.get(
locale, fix_example.get("en", "")
),
"explanation": rec.get(locale, rec.get("en", "")),
}
)
return alternatives
def _get_severity(self, issue_type: str) -> str:
"""获取问题严重程度"""
severity_map = {
"hardcoded_secrets": "critical",
"sql_injection": "high",
"xss_vulnerabilities": "high",
"insecure_random": "medium",
"weak_crypto": "medium",
"malicious_ip": "high",
"malicious_domain": "high",
"malicious_url": "critical",
}
return severity_map.get(issue_type, "low")
def _get_description(self, issue_type: str, locale: str) -> str:
"""获取问题描述"""
if issue_type in self.security_recommendations:
return self.security_recommendations[issue_type].get(
locale, self.security_recommendations[issue_type].get("en", "")
)
return f"Security issue of type: {issue_type}"
def _calculate_code_risk_score(self, vulnerabilities: List[Dict]) -> int:
"""计算代码风险评分"""
score = 0
severity_weights = {"critical": 10, "high": 7, "medium": 4, "low": 1}
for vuln in vulnerabilities:
severity = vuln.get("severity", "low")
score += severity_weights.get(severity, 1)
return min(score, 100) # 最高100分