import os
import re
import ast
from pathlib import Path
from typing import List, Dict, Tuple
class QualityIssueFixer:
def __init__(self, project_root: str):
self.project_root = Path(project_root)
self.fixes_applied = 0
self.files_fixed = 0
def fix_bare_except(self, content: str, filepath: str) -> str:
"""修复裸except语句"""
lines = content.split('\n')
new_lines = []
changes = 0
i = 0
while i < len(lines):
line = lines[i]
# 查找裸except语句
if re.match(r'^\s*except\s*:\s*$', line):
# 获取前面的try块内容来判断可能异常
j = i - 1
try_content = []
while j >= 0 and not (lines[j].strip().startswith('try:') or 'try ' in lines[j]):
try_content.insert(0, lines[j])
j -= 1
# 根据try内容推断可能的异常类型
suggested_exceptions = self._infer_exception_types(try_content)
# 替换裸except
indent = len(line) - len(line.lstrip())
new_line = ' ' * indent + f'except ({suggested_exceptions}) as e:'
new_lines.append(new_line)
# 在except块中添加适当的错误处理
next_line = lines[i + 1] if i + 1 < len(lines) else ''
next_indent = len(next_line) - len(next_line.lstrip()) if next_line.strip() else indent + 4
# 添加日志记录
import_line = ' ' * (next_indent) + f'import logging'
logger_line = ' ' * (next_indent) + f'logger = logging.getLogger(__name__)'
log_line = ' ' * (next_indent) + f'logger.warning(f"操作失败: {{e}}")'
new_lines.extend([log_line])
changes += 1
else:
new_lines.append(line)
i += 1
return '\n'.join(new_lines), changes
def fix_broad_exception(self, content: str, filepath: str) -> str:
"""修复过于宽泛的异常处理"""
lines = content.split('\n')
new_lines = []
changes = 0
for line in lines:
# 查找except Exception语句
if re.match(r'^\s*except\s+Exception\s+as\s+\w+:.*$', line):
# 根据上下文推断更具体的异常类型
suggested_exceptions = self._infer_specific_exceptions(line, filepath)
# 替换Exception为更具体的异常
if 'Exception' in line:
new_line = line.replace('Exception', suggested_exceptions)
new_lines.append(new_line)
changes += 1
else:
new_lines.append(line)
else:
new_lines.append(line)
return '\n'.join(new_lines), changes
def _infer_exception_types(self, try_content: List[str]) -> str:
"""根据try内容推断可能的异常类型"""
content_str = '\n'.join(try_content).lower()
if any(keyword in content_str for keyword in ['open(', 'file', 'read', 'write']):
return 'IOError, OSError'
elif any(keyword in content_str for keyword in ['int(', 'float(', 'parse']):
return 'ValueError, TypeError'
elif any(keyword in content_str for keyword in ['[', 'dict(', 'list(']):
return 'IndexError, KeyError, TypeError'
elif any(keyword in content_str for keyword in ['path', 'os.', 'file']):
return 'FileNotFoundError, PermissionError'
else:
return 'RuntimeError'
def _infer_specific_exceptions(self, line: str, filepath: str) -> str:
"""推断更具体的异常类型"""
file_path = Path(filepath)
if 'cache' in filepath:
return 'CacheError, IOError'
elif 'security' in filepath:
return 'SecurityError, ValueError'
elif 'file' in filepath or 'path' in filepath:
return 'FileNotFoundError, PermissionError, OSError'
else:
return 'RuntimeError, ValueError'
def add_logging_import(self, content: str) -> str:
"""添加logging导入(如果需要)"""
if 'import logging' not in content and 'logger' in content:
lines = content.split('\n')
# 找到第一个导入位置
insert_pos = 0
for i, line in enumerate(lines):
if line.strip().startswith('import ') or line.strip().startswith('from '):
insert_pos = i + 1
break
# 插入logging导入
lines.insert(insert_pos, 'import logging')
return '\n'.join(lines)
return content
def fix_file(self, filepath: Path) -> Tuple[int, int]:
"""修复单个文件"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
original_content = f.read()
except:
print(f"无法读取文件: {filepath}")
return 0, 0
# 逐步修复
content = original_content
total_changes = 0
# 1. 修复裸except
content, changes1 = self.fix_bare_except(content, str(filepath))
total_changes += changes1
# 2. 修复过于宽泛的异常
content, changes2 = self.fix_broad_exception(content, str(filepath))
total_changes += changes2
# 3. 添加必要的导入
if total_changes > 0:
content = self.add_logging_import(content)
# 保存修复后的文件
if total_changes > 0:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return total_changes, 1
return 0, 0
def fix_project(self) -> Dict:
"""修复整个项目"""
results = {
'files_processed': 0,
'files_fixed': 0,
'total_fixes': 0,
'files_with_fixes': []
}
# 遍历Python文件
for root, dirs, files in os.walk(self.project_root):
# 跳过虚拟环境和缓存目录
dirs[:] = [d for d in dirs if d not in ['__pycache__', '.git', 'venv', 'mcp-env']]
for file in files:
if file.endswith('.py'):
filepath = Path(root) / file
results['files_processed'] += 1
# 修复文件
fixes, file_fixed = self.fix_file(filepath)
results['total_fixes'] += fixes
results['files_fixed'] += file_fixed
if fixes > 0:
results['files_with_fixes'].append(str(filepath.relative_to(self.project_root)))
return results
def generate_fix_report(self, results: Dict) -> str:
"""生成修复报告"""
report = []
report.append("=" * 60)
report.append("代码质量修复报告")
report.append("=" * 60)
report.append(f"处理文件数: {results['files_processed']}")
report.append(f"修复文件数: {results['files_fixed']}")
report.append(f"总修复数: {results['total_fixes']}")
report.append("")
if results['files_with_fixes']:
report.append("已修复的文件:")
for file in results['files_with_fixes']:
report.append(f" - {file}")
report.append("")
report.append("修复内容:")
report.append(" 1. 将裸except语句替换为具体异常类型")
report.append(" 2. 将Exception替换为更具体的异常")
report.append(" 3. 添加适当的日志记录")
report.append(" 4. 添加必要的导入语句")
return "\n".join(report)
if __name__ == "__main__":
fixer = QualityIssueFixer("src")
results = fixer.fix_project()
report = fixer.generate_fix_report(results)
print(report)
# 保存修复报告
with open("fix_report.txt", "w", encoding="utf-8") as f:
f.write(report)