Skip to main content
Glama
security_enhancer.py22 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 安全强化工具 用于扫描和修复项目中的安全漏洞 """ import json import subprocess import sys from pathlib import Path from typing import Dict, List, Set, Tuple import re from datetime import datetime class SecurityEnhancer: """安全强化器""" def __init__(self, project_root: str): """初始化安全强化器 Args: project_root: 项目根目录路径 """ self.project_root = Path(project_root) self.security_config = self.project_root / "config" / "security.json" def scan_vulnerabilities(self) -> Dict: """扫描安全漏洞 Returns: 漏洞扫描结果 """ print("🔒 开始安全漏洞扫描...") results = { "timestamp": datetime.now().isoformat(), "dependency_vulnerabilities": self._scan_dependency_vulnerabilities(), "code_vulnerabilities": self._scan_code_vulnerabilities(), "configuration_issues": self._scan_configuration_issues(), "file_permissions": self._check_file_permissions() } return results def _scan_dependency_vulnerabilities(self) -> List[Dict]: """扫描依赖漏洞 Returns: 依赖漏洞列表 """ vulnerabilities = [] try: # 使用safety扫描 result = subprocess.run( ['safety', 'check', '--json', '--full-report'], capture_output=True, text=True, timeout=60 ) if result.stdout: try: safety_data = json.loads(result.stdout) for vuln in safety_data: vulnerabilities.append({ 'type': 'dependency', 'package': vuln.get('package'), 'version': vuln.get('installed_version'), 'vulnerability_id': vuln.get('vulnerability_id'), 'advisory': vuln.get('advisory'), 'cve': vuln.get('cve'), 'severity': self._get_severity_from_cve(vuln.get('cve')), 'fix_suggestion': f"升级到安全版本: {vuln.get('spec', 'latest')}" }) except json.JSONDecodeError: pass except (subprocess.TimeoutExpired, FileNotFoundError): print("⚠️ 无法运行safety扫描,请安装safety包") # 使用bandit扫描Python代码 try: result = subprocess.run( ['bandit', '-r', str(self.project_root), '-f', 'json'], capture_output=True, text=True, timeout=60 ) if result.stdout: try: bandit_data = json.loads(result.stdout) for issue in bandit_data.get('results', []): vulnerabilities.append({ 'type': 'code', 'file': issue.get('filename'), 'line': issue.get('line_number'), 'test_id': issue.get('test_id'), 'test_name': issue.get('test_name'), 'issue_severity': issue.get('issue_severity'), 'issue_confidence': issue.get('issue_confidence'), 'issue_text': issue.get('issue_text'), 'fix_suggestion': self._get_bandit_fix_suggestion(issue.get('test_id')) }) except json.JSONDecodeError: pass except (subprocess.TimeoutExpired, FileNotFoundError): print("⚠️ 无法运行bandit扫描,请安装bandit包") return vulnerabilities def _scan_code_vulnerabilities(self) -> List[Dict]: """扫描代码漏洞 Returns: 代码漏洞列表 """ vulnerabilities = [] # 扫描常见的不安全模式 patterns = { 'hardcoded_password': r'password\s*=\s*["\'][^"\'\']+["\']', 'hardcoded_secret': r'secret\s*=\s*["\'][^"\'\']+["\']', 'hardcoded_api_key': r'api_key\s*=\s*["\'][^"\'\']+["\']', 'sql_injection': r'execute\s*\(\s*["\'].*%.*["\']', 'eval_usage': r'eval\s*\(', 'exec_usage': r'exec\s*\(', 'pickle_load': r'pickle\.loads?\s*\(', 'yaml_unsafe_load': r'yaml\.load\s*\(', 'shell_injection': r'os\.system\s*\(|subprocess\.call\s*\(', } for py_file in self.project_root.rglob('*.py'): try: with open(py_file, 'r', encoding='utf-8') as f: content = f.read() lines = content.split('\n') for pattern_name, pattern in patterns.items(): matches = re.finditer(pattern, content, re.IGNORECASE) for match in matches: line_num = content[:match.start()].count('\n') + 1 vulnerabilities.append({ 'type': 'code_pattern', 'file': str(py_file.relative_to(self.project_root)), 'line': line_num, 'pattern': pattern_name, 'matched_text': match.group(), 'severity': self._get_pattern_severity(pattern_name), 'fix_suggestion': self._get_pattern_fix_suggestion(pattern_name) }) except Exception: continue return vulnerabilities def _scan_configuration_issues(self) -> List[Dict]: """扫描配置问题 Returns: 配置问题列表 """ issues = [] # 检查敏感文件权限 sensitive_files = [ 'config/security.json', 'config/system.json', '.env', 'secrets.json' ] for file_path in sensitive_files: full_path = self.project_root / file_path if full_path.exists(): # 检查文件权限 import stat file_stat = full_path.stat() permissions = oct(file_stat.st_mode)[-3:] if permissions != '600': # 应该只有所有者可读写 issues.append({ 'type': 'file_permission', 'file': file_path, 'current_permission': permissions, 'recommended_permission': '600', 'severity': 'medium', 'fix_suggestion': f'chmod 600 {file_path}' }) # 检查配置文件中的敏感信息 config_files = list(self.project_root.rglob('*.json')) + list(self.project_root.rglob('*.yaml')) + list(self.project_root.rglob('*.yml')) for config_file in config_files: try: with open(config_file, 'r', encoding='utf-8') as f: content = f.read() # 检查是否包含敏感信息 sensitive_patterns = { 'password': r'"password"\s*:\s*"[^"]+"', 'secret': r'"secret"\s*:\s*"[^"]+"', 'api_key': r'"api_key"\s*:\s*"[^"]+"', 'token': r'"token"\s*:\s*"[^"]+"' } for pattern_name, pattern in sensitive_patterns.items(): if re.search(pattern, content, re.IGNORECASE): issues.append({ 'type': 'sensitive_config', 'file': str(config_file.relative_to(self.project_root)), 'pattern': pattern_name, 'severity': 'high', 'fix_suggestion': f'将{pattern_name}移动到环境变量或安全的密钥管理系统' }) except Exception: continue return issues def _check_file_permissions(self) -> List[Dict]: """检查文件权限 Returns: 文件权限问题列表 """ issues = [] # 检查脚本文件权限 script_files = list(self.project_root.rglob('*.sh')) + list(self.project_root.rglob('*.py')) for script_file in script_files: try: import stat file_stat = script_file.stat() # 检查是否有执行权限但不应该有 if script_file.suffix == '.py' and file_stat.st_mode & stat.S_IXUSR: # Python文件通常不需要执行权限 if not script_file.name.startswith('start_') and not script_file.name.startswith('run_'): issues.append({ 'type': 'unnecessary_execute_permission', 'file': str(script_file.relative_to(self.project_root)), 'severity': 'low', 'fix_suggestion': f'chmod -x {script_file.relative_to(self.project_root)}' }) # 检查shell脚本是否有执行权限 if script_file.suffix == '.sh' and not (file_stat.st_mode & stat.S_IXUSR): issues.append({ 'type': 'missing_execute_permission', 'file': str(script_file.relative_to(self.project_root)), 'severity': 'medium', 'fix_suggestion': f'chmod +x {script_file.relative_to(self.project_root)}' }) except Exception: continue return issues def _get_severity_from_cve(self, cve: str) -> str: """从CVE获取严重程度 Args: cve: CVE编号 Returns: 严重程度 """ # 这里可以集成CVE数据库查询 # 简化版本,返回默认值 return 'medium' def _get_bandit_fix_suggestion(self, test_id: str) -> str: """获取bandit测试的修复建议 Args: test_id: bandit测试ID Returns: 修复建议 """ suggestions = { 'B101': '不要使用assert语句进行安全检查', 'B102': '避免使用exec_used', 'B103': '设置适当的文件权限', 'B104': '绑定到所有接口可能不安全', 'B105': '硬编码密码字符串', 'B106': '硬编码密码函数参数', 'B107': '硬编码密码默认参数', 'B108': '临时文件创建不安全', 'B110': 'try/except/pass可能隐藏错误', 'B112': 'try/except/continue可能隐藏错误', 'B201': 'Flask应用调试模式', 'B301': 'pickle和相关模块不安全', 'B302': 'marshal.loads不安全', 'B303': 'MD5哈希算法不安全', 'B304': '不安全的密码学算法', 'B305': '不安全的密码学算法', 'B306': 'mktemp不安全', 'B307': 'eval使用不安全', 'B308': 'mark_safe可能不安全', 'B309': 'HTTPSConnection不验证证书', 'B310': 'urllib.urlopen不安全', 'B311': '随机数生成器不安全', 'B312': 'telnetlib使用不安全', 'B313': 'XML解析器不安全', 'B314': 'XML解析器不安全', 'B315': 'XML解析器不安全', 'B316': 'XML解析器不安全', 'B317': 'XML解析器不安全', 'B318': 'XML解析器不安全', 'B319': 'XML解析器不安全', 'B320': 'XML解析器不安全', 'B321': 'FTP相关安全问题', 'B322': 'input()函数使用', 'B323': '不安全的随机数生成', 'B324': 'hashlib.new不安全参数', 'B325': 'tempfile.mktemp不安全', 'B401': 'import telnetlib', 'B402': 'import ftplib', 'B403': 'import pickle', 'B404': 'import subprocess', 'B405': 'import xml.etree', 'B406': 'import xml.sax', 'B407': 'import xml.dom', 'B408': 'import xml.minidom', 'B409': 'import xml.pulldom', 'B410': 'import lxml', 'B411': 'import xmlrpclib', 'B412': 'import httpoxy', 'B413': 'import pycrypto', 'B501': 'SSL证书验证禁用', 'B502': 'SSL证书验证禁用', 'B503': 'SSL证书验证禁用', 'B504': 'SSL证书验证禁用', 'B505': 'SSL证书验证禁用', 'B506': 'YAML不安全加载', 'B507': 'SSH主机密钥验证禁用', 'B601': 'shell注入风险', 'B602': 'subprocess shell注入', 'B603': 'subprocess不安全', 'B604': 'shell调用不安全', 'B605': 'shell调用不安全', 'B606': 'shell调用不安全', 'B607': 'shell调用不安全', 'B608': 'SQL注入风险', 'B609': 'Linux命令通配符注入', 'B610': 'Django SQL注入', 'B611': 'Django SQL注入', 'B701': 'jinja2自动转义禁用', 'B702': 'Mako模板自动转义禁用', 'B703': 'Django标记安全', } return suggestions.get(test_id, '请查阅bandit文档获取详细修复建议') def _get_pattern_severity(self, pattern_name: str) -> str: """获取模式的严重程度 Args: pattern_name: 模式名称 Returns: 严重程度 """ severity_map = { 'hardcoded_password': 'high', 'hardcoded_secret': 'high', 'hardcoded_api_key': 'high', 'sql_injection': 'high', 'eval_usage': 'high', 'exec_usage': 'high', 'pickle_load': 'medium', 'yaml_unsafe_load': 'medium', 'shell_injection': 'high' } return severity_map.get(pattern_name, 'medium') def _get_pattern_fix_suggestion(self, pattern_name: str) -> str: """获取模式的修复建议 Args: pattern_name: 模式名称 Returns: 修复建议 """ suggestions = { 'hardcoded_password': '使用环境变量或安全的密钥管理系统存储密码', 'hardcoded_secret': '使用环境变量或安全的密钥管理系统存储密钥', 'hardcoded_api_key': '使用环境变量或安全的密钥管理系统存储API密钥', 'sql_injection': '使用参数化查询或ORM防止SQL注入', 'eval_usage': '避免使用eval(),考虑使用ast.literal_eval()或其他安全替代方案', 'exec_usage': '避免使用exec(),重新设计代码逻辑', 'pickle_load': '避免反序列化不可信数据,考虑使用JSON或其他安全格式', 'yaml_unsafe_load': '使用yaml.safe_load()代替yaml.load()', 'shell_injection': '使用subprocess的列表参数形式,避免shell=True' } return suggestions.get(pattern_name, '请查阅安全编码最佳实践') def generate_security_report(self, scan_results: Dict) -> str: """生成安全报告 Args: scan_results: 扫描结果 Returns: 安全报告内容 """ report = f""" # 安全扫描报告 生成时间: {scan_results['timestamp']} ## 概览 - 依赖漏洞: {len(scan_results['dependency_vulnerabilities'])} 个 - 代码漏洞: {len(scan_results['code_vulnerabilities'])} 个 - 配置问题: {len(scan_results['configuration_issues'])} 个 - 文件权限问题: {len(scan_results['file_permissions'])} 个 ## 依赖漏洞详情 """ for vuln in scan_results['dependency_vulnerabilities']: if vuln.get('type') == 'dependency': report += f""" ### {vuln['package']} {vuln['version']} - **漏洞ID**: {vuln['vulnerability_id']} - **CVE**: {vuln.get('cve', 'N/A')} - **严重程度**: {vuln.get('severity', 'unknown')} - **描述**: {vuln['advisory']} - **修复建议**: {vuln['fix_suggestion']} """ report += "\n## 代码安全问题\n" for vuln in scan_results['code_vulnerabilities']: report += f""" ### {vuln.get('file', 'unknown')}:{vuln.get('line', 'unknown')} - **问题类型**: {vuln.get('pattern', vuln.get('test_name', 'unknown'))} - **严重程度**: {vuln.get('severity', vuln.get('issue_severity', 'unknown'))} - **描述**: {vuln.get('matched_text', vuln.get('issue_text', 'N/A'))} - **修复建议**: {vuln['fix_suggestion']} """ report += "\n## 配置安全问题\n" for issue in scan_results['configuration_issues']: report += f""" ### {issue['file']} - **问题类型**: {issue['type']} - **严重程度**: {issue['severity']} - **修复建议**: {issue['fix_suggestion']} """ report += "\n## 文件权限问题\n" for issue in scan_results['file_permissions']: report += f""" ### {issue['file']} - **问题类型**: {issue['type']} - **严重程度**: {issue['severity']} - **修复建议**: {issue['fix_suggestion']} """ return report def apply_security_fixes(self, scan_results: Dict) -> Dict: """应用安全修复 Args: scan_results: 扫描结果 Returns: 修复结果 """ print("🔧 开始应用安全修复...") fixes_applied = { 'file_permissions': [], 'configuration_updates': [], 'dependency_updates': [] } # 修复文件权限问题 for issue in scan_results['file_permissions']: if issue['type'] == 'missing_execute_permission': try: file_path = self.project_root / issue['file'] file_path.chmod(file_path.stat().st_mode | 0o111) fixes_applied['file_permissions'].append(f"添加执行权限: {issue['file']}") except Exception as e: print(f"⚠️ 无法修复权限 {issue['file']}: {e}") # 修复配置问题 for issue in scan_results['configuration_issues']: if issue['type'] == 'file_permission': try: file_path = self.project_root / issue['file'] file_path.chmod(0o600) fixes_applied['file_permissions'].append(f"设置安全权限: {issue['file']}") except Exception as e: print(f"⚠️ 无法修复权限 {issue['file']}: {e}") return fixes_applied def save_security_report(self, scan_results: Dict, output_file: str = None): """保存安全报告 Args: scan_results: 扫描结果 output_file: 输出文件路径 """ if output_file is None: output_file = self.project_root / "security_scan_report.md" report = self.generate_security_report(scan_results) with open(output_file, 'w', encoding='utf-8') as f: f.write(report) print(f"🔒 安全报告已保存到: {output_file}") # 同时保存JSON格式的详细数据 json_file = str(output_file).replace('.md', '.json') with open(json_file, 'w', encoding='utf-8') as f: json.dump(scan_results, f, indent=2, ensure_ascii=False) print(f"🔒 详细数据已保存到: {json_file}") def main(): """主函数""" project_root = Path.cwd() print("🚀 开始安全扫描...") enhancer = SecurityEnhancer(str(project_root)) # 执行扫描 scan_results = enhancer.scan_vulnerabilities() # 生成并保存报告 enhancer.save_security_report(scan_results) # 应用自动修复 fixes = enhancer.apply_security_fixes(scan_results) # 打印摘要 print("\n🔒 安全扫描摘要:") print(f"- 依赖漏洞: {len(scan_results['dependency_vulnerabilities'])} 个") print(f"- 代码漏洞: {len(scan_results['code_vulnerabilities'])} 个") print(f"- 配置问题: {len(scan_results['configuration_issues'])} 个") print(f"- 文件权限问题: {len(scan_results['file_permissions'])} 个") print("\n🔧 自动修复摘要:") print(f"- 权限修复: {len(fixes['file_permissions'])} 个") print(f"- 配置更新: {len(fixes['configuration_updates'])} 个") print(f"- 依赖更新: {len(fixes['dependency_updates'])} 个") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/chatExcel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server