Skip to main content
Glama
smoke.py7.71 kB
#!/usr/bin/env python3 # /// script # dependencies = [ # "httpx", # "structlog", # ] # /// """Smoke tests for Kodit API endpoints.""" import os import socket import subprocess import sys import tempfile import time from collections.abc import Callable from pathlib import Path from typing import Any import httpx import structlog BASE_HOST = "127.0.0.1" BASE_PORT = 8080 BASE_URL = f"http://{BASE_HOST}:{BASE_PORT}" TARGET_URI = "https://gist.github.com/7aa38185e20433c04c533f2b28f4e217.git" log = structlog.get_logger(__name__) def is_port_available(host: str, port: int) -> bool: """Check if a port is available by trying to bind to it.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind((host, port)) except OSError: return False # Port is in use else: return True def retry_with_timeout( func: Callable[..., bool], timeout: float = 600, retry_delay: float = 1 ) -> Any: """Keep trying a function until it succeeds or timeout is reached.""" start_time = time.time() while time.time() - start_time < timeout: try: if func(): return except Exception as e: if time.time() - start_time >= timeout: raise TimeoutError(f"Failed after {timeout} seconds: {e}") from e time.sleep(retry_delay) # Wait before retrying raise TimeoutError(f"Timed out after {timeout} seconds") def main() -> None: # noqa: PLR0915 """Run smoke tests.""" # Check there is nothing running on port 8080 already: if not is_port_available(BASE_HOST, BASE_PORT): log.error("Port is already in use", host=BASE_HOST, port=BASE_PORT) sys.exit(1) # Create a temporary file for the environment file so it doesn't pull any settings # from the local .env file with tempfile.NamedTemporaryFile(delete=False) as f: tmpfile = Path(f.name) env = os.environ.copy() env.update({"DISABLE_TELEMETRY": "true", "DB_URL": "sqlite+aiosqlite:///:memory:"}) if "SMOKE_DB_URL" in env: env.update({"DB_URL": env["SMOKE_DB_URL"]}) prefix = [] if os.getenv("CI") else ["uv", "run"] cmd = [ *prefix, "kodit", "--env-file", str(tmpfile), "serve", "--host", "127.0.0.1", "--port", "8080", ] process = subprocess.Popen(cmd, env=env) # noqa: S603 try: with httpx.Client(timeout=30.0) as client: log.info("Waiting for server to start listening", port=BASE_PORT) while is_port_available(BASE_HOST, BASE_PORT): time.sleep(1) client.get(f"{BASE_URL}/healthz").raise_for_status() # cspell: disable-line log.info("Server health check passed") log.info("Testing repository lifecycle") resp = client.get(f"{BASE_URL}/api/v1/repositories") repos = resp.json() log.info("Listed existing repositories", count=len(repos["data"])) payload = { "data": {"type": "repository", "attributes": {"remote_uri": TARGET_URI}} } client.post( f"{BASE_URL}/api/v1/repositories", json=payload ).raise_for_status() log.info("Created repository", uri=TARGET_URI) resp = client.get(f"{BASE_URL}/api/v1/repositories") repos = resp.json() repo_id = repos["data"][0]["id"] log.info("Retrieved repository ID", repo_id=repo_id) log.info("Testing repository endpoints", repo_id=repo_id) client.get(f"{BASE_URL}/api/v1/repositories/{repo_id}").raise_for_status() client.get( f"{BASE_URL}/api/v1/repositories/{repo_id}/status" ).raise_for_status() log.info("Waiting for indexing to complete", repo_id=repo_id) def indexing_finished() -> bool: response = client.get( f"{BASE_URL}/api/v1/repositories/{repo_id}/status" ) status = response.json() log.info("Indexing status", status=status) return ( all( task["attributes"]["state"] == "completed" or task["attributes"]["state"] == "skipped" for task in status["data"] ) and len(status["data"]) > 5 ) retry_with_timeout(indexing_finished) log.info("Indexing completed", repo_id=repo_id) log.info("Testing tags", repo_id=repo_id) resp = client.get(f"{BASE_URL}/api/v1/repositories/{repo_id}/tags") tags = resp.json() log.info("Retrieved tags", count=len(tags["data"])) if tags["data"]: tag_id = tags["data"][0]["id"] tag_url = f"{BASE_URL}/api/v1/repositories/{repo_id}/tags/{tag_id}" client.get(tag_url).raise_for_status() log.info("Retrieved tag details", tag_id=tags["data"][0]["id"]) log.info("Testing commits", repo_id=repo_id) resp = client.get(f"{BASE_URL}/api/v1/repositories/{repo_id}/commits") commits = resp.json() log.info("Retrieved commits", count=len(commits["data"])) if commits["data"]: commit_sha = commits["data"][0]["attributes"]["commit_sha"] commit_url = ( f"{BASE_URL}/api/v1/repositories/{repo_id}/commits/{commit_sha}" ) client.get(commit_url).raise_for_status() log.info("Retrieved commit details", commit_sha=commit_sha) resp = client.get(f"{commit_url}/files") files = resp.json() log.info("Retrieved commit files", count=len(files["data"])) if files["data"]: blob_sha = files["data"][0]["attributes"]["blob_sha"] client.get(f"{commit_url}/files/{blob_sha}").raise_for_status() log.info("Retrieved file content", blob_sha=blob_sha) assert client.get(f"{commit_url}/snippets").is_redirect log.info("Retrieved snippets", commit_sha=commit_sha) client.get(f"{commit_url}/enrichments").raise_for_status() log.info("Retrieved enrichments", commit_sha=commit_sha) client.get(f"{commit_url}/embeddings?full=false").raise_for_status() log.info("Retrieved embeddings", commit_sha=commit_sha) log.info("Testing search API") payload = { "data": { "type": "search", "attributes": {"keywords": ["test"], "code": "def", "limit": 5}, } } client.post(f"{BASE_URL}/api/v1/search", json=payload).raise_for_status() log.info("Search completed successfully") log.info("Testing queue API") client.get(f"{BASE_URL}/api/v1/queue").raise_for_status() log.info("Queue API responded successfully") log.info("Testing repository deletion", repo_id=repo_id) client.delete( f"{BASE_URL}/api/v1/repositories/{repo_id}" ).raise_for_status() log.info("Repository deleted successfully", repo_id=repo_id) log.info("All smoke tests passed successfully") finally: process.terminate() process.wait(timeout=10) tmpfile.unlink(missing_ok=True) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/helixml/kodit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server