"""Conch - Simple lock file for voice conversation coordination.
The Conch provides a lock file mechanism to indicate when a voice conversation
is active. This allows other processes (like sound effect hooks) to check
whether to suppress their audio output.
Lock file location: ~/.voicemode/conch
Usage:
# As context manager (recommended)
with Conch(agent_name="cora"):
# ... voice conversation logic ...
# Manual acquire/release
conch = Conch()
conch.acquire(agent_name="cora")
try:
# ... voice conversation logic ...
finally:
conch.release()
# Check if converse is active (for external scripts)
if Conch.is_active():
print("Someone is in a voice conversation")
"""
import fcntl
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
# Import config for lock expiry - deferred to avoid circular import
def _get_lock_expiry() -> float:
"""Get lock expiry from config, with fallback."""
try:
from voice_mode.config import CONCH_LOCK_EXPIRY
return CONCH_LOCK_EXPIRY
except ImportError:
return 120.0 # Default 2 minutes
class Conch:
"""Simple lock file for voice conversation coordination.
Creates a lock file at ~/.voicemode/conch when a voice conversation
is active. The lock file contains:
- pid: Process ID of the lock holder (for stale lock detection)
- agent: Name of the agent holding the lock
- acquired: ISO timestamp when lock was acquired
- expires: Optional expiry time (reserved for future use)
"""
LOCK_FILE = Path.home() / ".voicemode" / "conch"
def __init__(self, agent_name: Optional[str] = None):
"""Initialize Conch with optional agent name.
Args:
agent_name: Name of the agent (e.g., "cora"). Used for debugging/logging.
"""
self.agent_name = agent_name
self._acquired = False
self._fd = None # File descriptor for flock
self._acquire_time = None # Track when acquired
def acquire(self, agent_name: Optional[str] = None) -> bool:
"""Create the lock file.
Args:
agent_name: Override the agent name set in __init__
Returns:
True if lock was acquired successfully
"""
agent = agent_name or self.agent_name or "unknown"
# Ensure parent directory exists
self.LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
data = {
"pid": os.getpid(),
"agent": agent,
"acquired": datetime.now().isoformat(),
"expires": None
}
self.LOCK_FILE.write_text(json.dumps(data, indent=2))
self._acquired = True
return True
def try_acquire(self, agent_name: Optional[str] = None) -> bool:
"""Atomically try to acquire the conch.
Uses fcntl.flock() for true atomic locking across processes.
Also handles stale locks: if a lock is older than CONCH_LOCK_EXPIRY
seconds, it will be forcibly released and re-acquired.
Args:
agent_name: Name of the agent acquiring the lock
Returns:
True if lock acquired, False if already held by another process
"""
if self._acquired:
return True # Already holding it
agent = agent_name or self.agent_name or "unknown"
self.LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
# First check: is there a stale lock we can forcibly clear?
self._check_and_clear_stale_lock()
try:
# Open file for read/write, create if doesn't exist
self._fd = os.open(str(self.LOCK_FILE), os.O_CREAT | os.O_RDWR, 0o644)
# Try to get exclusive lock (non-blocking)
fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Got lock - write our info
self._acquire_time = datetime.now()
data = {
"pid": os.getpid(),
"agent": agent,
"acquired": self._acquire_time.isoformat(),
"expires": None
}
os.ftruncate(self._fd, 0)
os.lseek(self._fd, 0, os.SEEK_SET)
os.write(self._fd, json.dumps(data, indent=2).encode())
os.fsync(self._fd) # Ensure data is written
self._acquired = True
return True
except (BlockingIOError, OSError) as e:
# Lock held by another process, or other OS error
if self._fd is not None:
try:
os.close(self._fd)
except OSError:
pass
self._fd = None
return False
def _check_and_clear_stale_lock(self) -> None:
"""Check for and clear stale locks based on timestamp.
If a lock file exists and its timestamp exceeds CONCH_LOCK_EXPIRY,
forcibly remove it to allow new acquisitions. This handles the case
where a process is alive but stuck and won't release the lock.
Note: This deletes the file, creating a new inode. The stuck process
still holds its flock on the old inode, but we can now create a fresh
lock file.
"""
lock_expiry = _get_lock_expiry()
if lock_expiry <= 0:
return # Stale lock detection disabled
if not self.LOCK_FILE.exists():
return
try:
data = json.loads(self.LOCK_FILE.read_text())
acquired_str = data.get("acquired")
if not acquired_str:
return
acquired_time = datetime.fromisoformat(acquired_str)
age_seconds = (datetime.now() - acquired_time).total_seconds()
if age_seconds > lock_expiry:
# Lock is stale - forcibly remove it
stale_agent = data.get("agent", "unknown")
stale_pid = data.get("pid", "unknown")
try:
self.LOCK_FILE.unlink()
# Log would be nice here, but avoid import complexity
except OSError:
pass
except (json.JSONDecodeError, ValueError, OSError):
# Can't read or parse - ignore
pass
def release(self) -> float:
"""Release the lock and return seconds held.
Returns:
Seconds the lock was held, or 0.0 if not acquired
"""
held_seconds = 0.0
if self._acquire_time:
held_seconds = (datetime.now() - self._acquire_time).total_seconds()
if self._fd is not None:
try:
fcntl.flock(self._fd, fcntl.LOCK_UN)
os.close(self._fd)
except OSError:
pass
self._fd = None
# Remove the lock file
if self.LOCK_FILE.exists():
try:
self.LOCK_FILE.unlink()
except OSError:
pass
self._acquired = False
self._acquire_time = None
return held_seconds
@classmethod
def is_active(cls) -> bool:
"""Check if a voice conversation is currently active.
A conversation is considered active if:
1. The lock file exists
2. The PID in the file corresponds to a running process
3. The lock is not stale (acquired within CONCH_LOCK_EXPIRY seconds)
Returns:
True if converse is active, False otherwise
"""
if not cls.LOCK_FILE.exists():
return False
try:
data = json.loads(cls.LOCK_FILE.read_text())
pid = data.get("pid")
if pid is None:
return False
# Check if process is alive (signal 0 doesn't actually send a signal)
os.kill(pid, 0)
# Check if lock is stale based on timestamp
lock_expiry = _get_lock_expiry()
if lock_expiry > 0:
acquired_str = data.get("acquired")
if acquired_str:
acquired_time = datetime.fromisoformat(acquired_str)
age_seconds = (datetime.now() - acquired_time).total_seconds()
if age_seconds > lock_expiry:
# Lock is stale - consider it inactive
return False
return True
except (json.JSONDecodeError, ProcessLookupError, PermissionError, OSError, ValueError):
# JSON invalid, process dead, no permission to signal, or invalid timestamp
return False
@classmethod
def get_holder(cls) -> Optional[dict]:
"""Get information about the current lock holder.
Returns:
Dict with lock info if active, None otherwise
"""
if not cls.is_active():
return None
try:
return json.loads(cls.LOCK_FILE.read_text())
except (json.JSONDecodeError, OSError):
return None
def __enter__(self):
"""Context manager entry - acquire the lock."""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - release the lock."""
self.release()
return False # Don't suppress exceptions