"""
版本管理模块
提供版本控制、迁移和兼容性检查功能
"""
import json
import os
import re
from typing import Dict, Any, List, Optional, Callable, Union
from dataclasses import dataclass, asdict
from enum import Enum
import semver
from pathlib import Path
import hashlib
import datetime
class VersionType(Enum):
"""版本类型"""
MAJOR = "major"
MINOR = "minor"
PATCH = "patch"
PRERELEASE = "prerelease"
BUILD = "build"
@dataclass
class Version:
"""版本信息"""
major: int
minor: int
patch: int
prerelease: str = ""
build: str = ""
def __str__(self) -> str:
version = f"{self.major}.{self.minor}.{self.patch}"
if self.prerelease:
version += f"-{self.prerelease}"
if self.build:
version += f"+{self.build}"
return version
@classmethod
def parse(cls, version_str: str) -> 'Version':
"""解析版本字符串"""
# 分离构建信息
version_part, _, build_part = version_str.partition('+')
# 分离预发布信息
main_part, _, prerelease_part = version_part.partition('-')
# 解析主版本号
match = re.match(r'^(\d+)\.(\d+)\.(\d+)$', main_part)
if not match:
raise ValueError(f"无效的版本格式: {version_str}")
major, minor, patch = map(int, match.groups())
return cls(
major=major,
minor=minor,
patch=patch,
prerelease=prerelease_part,
build=build_part
)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return asdict(self)
def compare_to(self, other: 'Version') -> int:
"""比较版本"""
if self.major != other.major:
return self.major - other.major
if self.minor != other.minor:
return self.minor - other.minor
if self.patch != other.patch:
return self.patch - other.patch
# 预发布版本比较
if self.prerelease and not other.prerelease:
return -1
if not self.prerelease and other.prerelease:
return 1
if self.prerelease and other.prerelease:
return self._compare_prerelease(self.prerelease, other.prerelease)
# 构建版本比较
if self.build and not other.build:
return 1
if not self.build and other.build:
return -1
if self.build and other.build:
return self._compare_build(self.build, other.build)
return 0
def _compare_prerelease(self, a: str, b: str) -> int:
"""比较预发布版本"""
a_parts = a.split('.')
b_parts = b.split('.')
# 比较每个部分
for i in range(max(len(a_parts), len(b_parts))):
if i >= len(a_parts):
return -1
if i >= len(b_parts):
return 1
a_part = a_parts[i]
b_part = b_parts[i]
# 数字部分比较
if a_part.isdigit() and b_part.isdigit():
a_num = int(a_part)
b_num = int(b_part)
diff = a_num - b_num
if diff != 0:
return diff
else:
# 字符串部分比较
diff = a_part.lower().compare_to(b_part.lower()) if hasattr(a_part.lower(), 'compare_to') else (a_part.lower() > b_part.lower()) - (a_part.lower() < b_part.lower())
if diff != 0:
return diff
return 0
def _compare_build(self, a: str, b: str) -> int:
"""比较构建版本"""
a_parts = a.split('.')
b_parts = b.split('.')
for i in range(max(len(a_parts), len(b_parts))):
if i >= len(a_parts):
return -1
if i >= len(b_parts):
return 1
a_part = a_parts[i]
b_part = b_parts[i]
if a_part.isdigit() and b_part.isdigit():
diff = int(a_part) - int(b_part)
if diff != 0:
return diff
else:
diff = (a_part.lower() > b_part.lower()) - (a_part.lower() < b_part.lower())
if diff != 0:
return diff
return 0
def bump(self, version_type: VersionType) -> 'Version':
"""提升版本"""
if version_type == VersionType.MAJOR:
return Version(self.major + 1, 0, 0)
elif version_type == VersionType.MINOR:
return Version(self.major, self.minor + 1, 0)
elif version_type == VersionType.PATCH:
return Version(self.major, self.minor, self.patch + 1)
elif version_type == VersionType.PRERELEASE:
prerelease = self._increment_prerelease()
return Version(self.major, self.minor, self.patch, prerelease)
elif version_type == VersionType.BUILD:
build = self._increment_build()
return Version(self.major, self.minor, self.patch, self.prerelease, build)
return self
def _increment_prerelease(self) -> str:
"""递增预发布版本"""
if not self.prerelease:
return "alpha.1"
if self.prerelease.startswith("alpha"):
return self._increment_version_part(self.prerelease, "alpha")
elif self.prerelease.startswith("beta"):
return self._increment_version_part(self.prerelease, "beta")
elif self.prerelease.startswith("rc"):
return self._increment_version_part(self.prerelease, "rc")
else:
return "prerelease.1"
def _increment_build(self) -> str:
"""递增构建版本"""
if not self.build:
return "1"
match = re.search(r'(\d+)', self.build)
if match:
return f"{int(match.group(1)) + 1}"
else:
return "1"
def _increment_version_part(self, current: str, prefix: str) -> str:
"""递增版本部分"""
match = re.search(f'{re.escape(prefix)}\\.(\\d+)', current)
if match:
return f"{prefix}.{int(match.group(1)) + 1}"
else:
return f"{prefix}.1"
@dataclass
class Migration:
"""迁移信息"""
from_version: Version
to_version: Version
migration_func: Callable[[Dict[str, Any]], Dict[str, Any]]
description: str = ""
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.datetime.now().isoformat()
@dataclass
class VersionInfo:
"""版本信息"""
version: Version
release_date: str
description: str
changes: List[str]
migration_required: bool = False
breaking_changes: bool = False
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"version": str(self.version),
"release_date": self.release_date,
"description": self.description,
"changes": self.changes,
"migration_required": self.migration_required,
"breaking_changes": self.breaking_changes
}
class VersionManager:
"""版本管理器"""
def __init__(self, version_file: str = "version.json"):
self.version_file = Path(version_file)
self.current_version: Optional[Version] = None
self.version_history: List[VersionInfo] = []
self.migrations: List[Migration] = []
# 加载版本信息
self._load_version_info()
# 注册默认迁移
self._register_default_migrations()
def get_current_version(self) -> Optional[Version]:
"""获取当前版本"""
return self.current_version
def set_current_version(self, version: Union[Version, str]) -> None:
"""设置当前版本"""
if isinstance(version, str):
version = Version.parse(version)
self.current_version = version
self._save_version_info()
def bump_version(self, version_type: VersionType) -> Version:
"""提升版本"""
if not self.current_version:
raise ValueError("没有当前版本信息")
new_version = self.current_version.bump(version_type)
self.set_current_version(new_version)
return new_version
def add_version_info(self, version_info: VersionInfo) -> None:
"""添加版本信息"""
# 检查版本是否已存在
for i, existing in enumerate(self.version_history):
if existing.version.compare_to(version_info.version) == 0:
# 更新现有版本信息
self.version_history[i] = version_info
return
# 按版本顺序插入
inserted = False
for i, existing in enumerate(self.version_history):
if existing.version.compare_to(version_info.version) > 0:
self.version_history.insert(i, version_info)
inserted = True
break
if not inserted:
self.version_history.append(version_info)
self._save_version_info()
def get_version_info(self, version: Union[Version, str]) -> Optional[VersionInfo]:
"""获取版本信息"""
if isinstance(version, str):
version = Version.parse(version)
for info in self.version_history:
if info.version.compare_to(version) == 0:
return info
return None
def list_versions(self) -> List[VersionInfo]:
"""列出所有版本"""
return self.version_history.copy()
def check_compatibility(self, version: Union[Version, str]) -> bool:
"""检查版本兼容性"""
if not self.current_version:
return True
if isinstance(version, str):
version = Version.parse(version)
# 简单的兼容性检查:主版本号相同
return self.current_version.major == version.major
def needs_migration(self, from_version: Union[Version, str], to_version: Union[Version, str]) -> bool:
"""检查是否需要迁移"""
if isinstance(from_version, str):
from_version = Version.parse(from_version)
if isinstance(to_version, str):
to_version = Version.parse(to_version)
# 检查是否有适用的迁移
for migration in self.migrations:
if (from_version.compare_to(migration.from_version) >= 0 and
to_version.compare_to(migration.to_version) <= 0):
return True
return False
def migrate(self, data: Dict[str, Any], from_version: Union[Version, str], to_version: Union[Version, str]) -> Dict[str, Any]:
"""执行迁移"""
if isinstance(from_version, str):
from_version = Version.parse(from_version)
if isinstance(to_version, str):
to_version = Version.parse(to_version)
result = data.copy()
# 应用所有适用的迁移
for migration in self.migrations:
if (from_version.compare_to(migration.from_version) <= 0 and
to_version.compare_to(migration.to_version) >= 0):
result = migration.migration_func(result)
return result
def register_migration(self, migration: Migration) -> None:
"""注册迁移"""
self.migrations.append(migration)
# 按版本排序
self.migrations.sort(key=lambda m: m.from_version.to_dict())
def generate_changelog(self, from_version: Optional[Union[Version, str]] = None, to_version: Optional[Union[Version, str]] = None) -> str:
"""生成更新日志"""
from_version = self._resolve_version_param(from_version, default=Version(0, 0, 0))
to_version = self._resolve_version_param(to_version)
changelog = "# 更新日志\n\n"
for info in reversed(self.version_history):
if self._should_include_version(info, from_version, to_version):
changelog += self._format_version_entry(info)
return changelog
def _resolve_version_param(self, version_param: Optional[Union[Version, str]], default: Optional[Version] = None) -> Optional[Version]:
"""解析版本参数"""
if version_param is None:
if default:
return default
elif self.version_history:
return self.version_history[0].version
return None
if isinstance(version_param, str):
return Version.parse(version_param)
return version_param
def _should_include_version(self, version_info, from_version: Version, to_version: Optional[Version]) -> bool:
"""判断是否应该包含版本信息"""
if version_info.version.compare_to(from_version) < 0:
return False
if to_version and version_info.version.compare_to(to_version) > 0:
return False
return True
def _format_version_entry(self, version_info) -> str:
"""格式化版本条目"""
entry = f"## {version_info.version} ({version_info.release_date})\n\n"
entry += f"{version_info.description}\n\n"
if version_info.breaking_changes:
entry += "### ⚠️ 破坏性变更\n\n"
entry += "### 变更\n\n"
for change in version_info.changes:
entry += f"- {change}\n"
entry += "\n"
return entry
def _load_version_info(self) -> None:
"""加载版本信息"""
if not self.version_file.exists():
# 创建默认版本文件
self.current_version = Version(2, 0, 0)
self._save_version_info()
return
try:
with open(self.version_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# 加载当前版本
if "current_version" in data:
self.current_version = Version.parse(data["current_version"])
# 加载版本历史
if "version_history" in data:
for info_data in data["version_history"]:
version = Version.parse(info_data["version"])
info = VersionInfo(
version=version,
release_date=info_data["release_date"],
description=info_data["description"],
changes=info_data["changes"],
migration_required=info_data.get("migration_required", False),
breaking_changes=info_data.get("breaking_changes", False)
)
self.version_history.append(info)
except (RuntimeError, ValueError) as e:
print(f"加载版本信息失败: {e}")
self.current_version = Version(2, 0, 0)
def _save_version_info(self) -> None:
"""保存版本信息"""
try:
# 确保目录存在
self.version_file.parent.mkdir(parents=True, exist_ok=True)
data = {
"current_version": str(self.current_version) if self.current_version else "0.0.0",
"version_history": [info.to_dict() for info in self.version_history]
}
with open(self.version_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except (RuntimeError, ValueError) as e:
print(f"保存版本信息失败: {e}")
def _register_default_migrations(self) -> None:
"""注册默认迁移"""
# v1.0.0 -> v2.0.0 迁移
migration_v1_to_v2 = Migration(
from_version=Version(1, 0, 0),
to_version=Version(2, 0, 0),
migration_func=self._migrate_v1_to_v2,
description="从v1.0.0迁移到v2.0.0"
)
self.register_migration(migration_v1_to_v2)
def _migrate_v1_to_v2(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""v1.0.0到v2.0.0的迁移逻辑"""
result = data.copy()
# 示例迁移:更新配置格式
if "config" in result:
config = result["config"]
# 重命名字段
if "outputPath" in config:
config["output_dir"] = config.pop("outputPath")
if "templatePath" in config:
config["template_dir"] = config.pop("templatePath")
# 添加新字段
if "cache_enabled" not in config:
config["cache_enabled"] = True
if "interaction_level" not in config:
config["interaction_level"] = "normal"
return result
# 全局版本管理器实例
global_version_manager = VersionManager()
def get_current_version() -> str:
"""获取当前版本便捷函数"""
version = global_version_manager.get_current_version()
return str(version) if version else "0.0.0"
def bump_version(version_type: VersionType) -> str:
"""提升版本便捷函数"""
new_version = global_version_manager.bump_version(version_type)
return str(new_version)
def check_compatibility(version: str) -> bool:
"""检查版本兼容性便捷函数"""
return global_version_manager.check_compatibility(version)
def generate_changelog(from_version: Optional[str] = None, to_version: Optional[str] = None) -> str:
"""生成更新日志便捷函数"""
return global_version_manager.generate_changelog(from_version, to_version)