"""文件锁模块 - 提供跨平台的文件锁功能以支持并发访问"""
import os
import time
import sys
from pathlib import Path
from typing import Optional
from contextlib import contextmanager
# 根据操作系统导入不同的锁实现
if sys.platform == "win32":
import msvcrt
else:
import fcntl
class FileLock:
"""
跨平台文件锁实现
支持 Windows (msvcrt) 和 Unix/Linux (fcntl)
提供独占锁和共享锁两种模式
"""
def __init__(self, file_path: Path, timeout: float = 30.0, poll_interval: float = 0.1):
"""
初始化文件锁
Args:
file_path: 要锁定的文件路径
timeout: 获取锁的超时时间(秒)
poll_interval: 轮询间隔时间(秒)
"""
self.file_path = Path(file_path)
self.lock_file_path = Path(f"{file_path}.lock")
self.timeout = timeout
self.poll_interval = poll_interval
self.lock_file: Optional[object] = None
self.is_locked = False
def acquire(self, exclusive: bool = True) -> bool:
"""
获取文件锁
Args:
exclusive: True 为独占锁(写锁), False 为共享锁(读锁)
Returns:
成功返回 True, 超时返回 False
"""
if self.is_locked:
return True
# 确保锁文件的目录存在
self.lock_file_path.parent.mkdir(parents=True, exist_ok=True)
start_time = time.time()
while True:
try:
# 打开或创建锁文件
self.lock_file = open(self.lock_file_path, 'a+')
# 尝试获取锁
if sys.platform == "win32":
self._acquire_windows(exclusive)
else:
self._acquire_unix(exclusive)
self.is_locked = True
return True
except (IOError, OSError) as e:
# 关闭文件句柄
if self.lock_file:
try:
self.lock_file.close()
except:
pass
self.lock_file = None
# 检查是否超时
if time.time() - start_time >= self.timeout:
raise TimeoutError(
f"无法在 {self.timeout} 秒内获取文件锁: {self.lock_file_path}"
)
# 等待后重试
time.sleep(self.poll_interval)
def _acquire_windows(self, exclusive: bool):
"""Windows 平台获取锁"""
# 移动到文件开头
self.lock_file.seek(0)
# Windows 下使用 msvcrt.locking
# LK_NBLCK: 非阻塞独占锁
# LK_NBRLCK: 非阻塞共享锁(Windows 不支持,使用独占锁代替)
lock_mode = msvcrt.LK_NBLCK
try:
msvcrt.locking(self.lock_file.fileno(), lock_mode, 1)
except OSError as e:
raise IOError(f"获取 Windows 文件锁失败: {e}")
def _acquire_unix(self, exclusive: bool):
"""Unix/Linux 平台获取锁"""
# fcntl.LOCK_EX: 独占锁(写锁)
# fcntl.LOCK_SH: 共享锁(读锁)
# fcntl.LOCK_NB: 非阻塞模式
lock_mode = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
lock_mode |= fcntl.LOCK_NB
try:
fcntl.flock(self.lock_file.fileno(), lock_mode)
except (IOError, OSError) as e:
raise IOError(f"获取 Unix 文件锁失败: {e}")
def release(self):
"""释放文件锁"""
if not self.is_locked or not self.lock_file:
return
try:
if sys.platform == "win32":
self._release_windows()
else:
self._release_unix()
finally:
# 关闭文件
if self.lock_file:
try:
self.lock_file.close()
except:
pass
self.lock_file = None
self.is_locked = False
# 尝试删除锁文件(可能失败,不影响功能)
try:
if self.lock_file_path.exists():
self.lock_file_path.unlink()
except:
pass
def _release_windows(self):
"""Windows 平台释放锁"""
try:
self.lock_file.seek(0)
msvcrt.locking(self.lock_file.fileno(), msvcrt.LK_UNLCK, 1)
except:
pass
def _release_unix(self):
"""Unix/Linux 平台释放锁"""
try:
fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_UN)
except:
pass
def __enter__(self):
"""上下文管理器入口"""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出"""
self.release()
return False
def __del__(self):
"""析构函数,确保释放锁"""
self.release()
@contextmanager
def file_lock(file_path: Path, exclusive: bool = True, timeout: float = 30.0):
"""
文件锁上下文管理器(便捷函数)
用法:
with file_lock(path, exclusive=True) as lock:
# 执行需要锁保护的操作
...
Args:
file_path: 要锁定的文件路径
exclusive: True 为独占锁(写), False 为共享锁(读)
timeout: 获取锁的超时时间(秒)
"""
lock = FileLock(file_path, timeout=timeout)
try:
lock.acquire(exclusive=exclusive)
yield lock
finally:
lock.release()