"""
Dependency management and smart waiting logic.
This module provides intelligent dependency resolution with multiple
waiting strategies: TCP port checks, HTTP endpoint validation, and log pattern matching.
"""
import re
import socket
import time
from typing import Dict, Any, List, Optional
import docker
from docker.errors import DockerException
from .docker_client import get_docker_client, get_container_by_name
class DependencyManager:
"""Manages service dependencies with smart waiting capabilities."""
def __init__(self):
self._dependencies: Dict[str, Dict[str, Any]] = {}
def define_dependency(self, service_name: str, depends_on: str, wait_condition: Dict[str, Any]) -> Dict[str, Any]:
"""
Define a dependency with smart waiting condition.
Args:
service_name: Name of the service that depends on another
depends_on: Name of the service to wait for
wait_condition: Dictionary with type and details
Examples:
{"type": "tcp", "host": "db", "port": 5432}
{"type": "http", "url": "http://api:8080/health"}
{"type": "log", "pattern": "ready to accept connections"}
Returns:
Dict with dependency definition result
"""
self._dependencies[service_name] = {
"depends_on": depends_on,
"wait_condition": wait_condition,
"created_at": time.time()
}
return {
"service": service_name,
"depends_on": depends_on,
"condition_type": wait_condition.get("type"),
"status": "defined"
}
def get_dependency_status(self, service_name: str) -> Dict[str, Any]:
"""
Get dependency status and wait information.
Args:
service_name: Service name to check
Returns:
Dict with dependency status information
"""
if service_name not in self._dependencies:
return {"error": f"No dependencies defined for {service_name}"}
dep_info = self._dependencies[service_name]
client = get_docker_client()
container = get_container_by_name(client, service_name)
return {
"service": service_name,
"depends_on": dep_info["depends_on"],
"condition": dep_info["wait_condition"],
"container_running": container is not None,
"created_at": dep_info["created_at"]
}
def wait_for_condition(self, condition: Dict[str, Any], timeout: int = 60) -> bool:
"""
Wait for a condition to be met (TCP, HTTP, or log pattern).
Args:
condition: Condition dictionary with type and parameters
timeout: Maximum time to wait in seconds
Returns:
bool: True if condition was met, False otherwise
"""
start_time = time.time()
while time.time() - start_time < timeout:
cond_type = condition.get("type")
if cond_type == "tcp":
host = condition.get("host")
port = condition.get("port")
if self._check_tcp_port(host, port):
return True
elif cond_type == "http":
url = condition.get("url")
if self._check_http_endpoint(url):
return True
elif cond_type == "log":
pattern = condition.get("pattern")
container_id = condition.get("container_id")
if self._check_log_pattern(container_id, pattern):
return True
time.sleep(2)
return False
def sort_services_by_dependencies(self, services: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Sort services based on their dependencies.
Args:
services: List of service definitions
Returns:
List of services sorted in dependency order
Raises:
RuntimeError: If circular dependency or missing dependency detected
"""
sorted_services = []
remaining = services.copy()
deployed = set()
while remaining:
for i, service in enumerate(remaining):
deps = service.get("depends_on")
if not deps or deps in deployed:
sorted_services.append(remaining.pop(i))
deployed.add(service["name"])
break
else:
# Circular dependency or missing dependency
raise RuntimeError(
f"Circular dependency or missing dependency in: "
f"{[s['name'] for s in remaining]}"
)
return sorted_services
def _check_tcp_port(self, host: str, port: int) -> bool:
"""Check if TCP port is open."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return False
def _check_http_endpoint(self, url: str) -> bool:
"""Check if HTTP endpoint responds."""
try:
import urllib.request
import urllib.error
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as response:
return response.status < 400
except:
return False
def _check_log_pattern(self, container_id: str, pattern: str) -> bool:
"""Check if log pattern appears in container logs."""
try:
client = get_docker_client()
container = client.containers.get(container_id)
logs = container.logs(tail=50).decode("utf-8", errors="replace")
return bool(re.search(pattern, logs, re.IGNORECASE))
except:
return False
# Global dependency manager instance
dependency_manager = DependencyManager()