Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
monitor_package.pyโ€ข11.7 kB
#!/usr/bin/env python3 """ MockLoop MCP Package Monitoring Script This script monitors PyPI statistics and package health for mockloop-mcp. It tracks download statistics, version information, and package availability. """ import argparse from dataclasses import asdict, dataclass from datetime import datetime import json from pathlib import Path import sys import time from typing import Any import requests @dataclass class PackageStats: """Package statistics data structure.""" package_name: str current_version: str latest_version: str total_downloads: int recent_downloads: int last_updated: str health_status: str check_timestamp: str versions: list[str] dependencies: list[str] maintainers: list[str] description: str homepage: str repository: str license: str python_versions: list[str] classifiers: list[str] class PyPIMonitor: """Monitor PyPI package statistics and health.""" def __init__(self, package_name: str = "mockloop-mcp"): self.package_name = package_name self.base_url = "https://pypi.org/pypi" self.stats_url = "https://pypistats.org/api" self.session = requests.Session() self.session.headers.update({"User-Agent": "MockLoop-MCP-Monitor/1.0"}) def get_package_info(self) -> dict[str, Any] | None: """Get package information from PyPI.""" try: response = self.session.get(f"{self.base_url}/{self.package_name}/json") response.raise_for_status() return response.json() except requests.RequestException: return None def get_download_stats(self, period: str = "month") -> dict[str, Any] | None: """Get download statistics from pypistats.org.""" try: response = self.session.get( f"{self.stats_url}/packages/{self.package_name}/recent", params={"period": period}, ) response.raise_for_status() return response.json() except requests.RequestException: return None def get_version_downloads(self) -> dict[str, Any] | None: """Get download statistics by version.""" try: response = self.session.get( f"{self.stats_url}/packages/{self.package_name}/major" ) response.raise_for_status() return response.json() except requests.RequestException: return None def check_package_health(self) -> str: """Check overall package health status.""" package_info = self.get_package_info() if not package_info: return "UNAVAILABLE" info = package_info.get("info", {}) # Check if package is actively maintained last_release = info.get("upload_time_iso_8601", "") if last_release: try: release_date = datetime.fromisoformat( last_release.replace("Z", "+00:00") ) days_since_release = (datetime.now().astimezone() - release_date).days if days_since_release > 365: return "STALE" elif days_since_release > 180: return "AGING" else: return "HEALTHY" except ValueError: return "UNKNOWN" return "UNKNOWN" def collect_stats(self) -> PackageStats | None: """Collect comprehensive package statistics.""" package_info = self.get_package_info() if not package_info: return None info = package_info.get("info", {}) releases = package_info.get("releases", {}) # Get download statistics download_stats = self.get_download_stats() total_downloads = 0 recent_downloads = 0 if download_stats and download_stats.get("data"): recent_downloads = download_stats["data"].get("last_month", 0) # Note: Total downloads require different API endpoint # Extract version information versions = list(releases.keys()) versions.sort( key=lambda x: [int(i) for i in x.split(".") if i.isdigit()], reverse=True ) current_version = info.get("version", "") latest_version = versions[0] if versions else "" # Extract maintainer information maintainers = [] if info.get("maintainer"): maintainers.append(info["maintainer"]) if info.get("author") and info["author"] not in maintainers: maintainers.append(info["author"]) # Extract Python version support python_versions = [] classifiers = info.get("classifiers", []) for classifier in classifiers: if classifier.startswith("Programming Language :: Python ::"): version = classifier.split("::")[-1].strip() if version and version not in ["Implementation", "Only"]: python_versions.append(version) # Extract dependencies dependencies = [] requires_dist = info.get("requires_dist", []) if requires_dist: for req in requires_dist: # Extract package name (before any version specifiers) dep_name = req.split()[0].split(">=")[0].split("==")[0].split("~=")[0] if dep_name not in dependencies: dependencies.append(dep_name) return PackageStats( package_name=self.package_name, current_version=current_version, latest_version=latest_version, total_downloads=total_downloads, recent_downloads=recent_downloads, last_updated=info.get("upload_time_iso_8601", ""), health_status=self.check_package_health(), check_timestamp=datetime.now().isoformat(), versions=versions[:10], # Last 10 versions dependencies=dependencies, maintainers=maintainers, description=info.get("summary", ""), homepage=info.get("home_page", ""), repository=info.get("project_url", ""), license=info.get("license", ""), python_versions=python_versions, classifiers=classifiers, ) def save_stats(self, stats: PackageStats, output_file: Path | None = None) -> None: """Save statistics to JSON file.""" if output_file is None: output_file = Path("package_stats.json") # Load existing stats if file exists existing_stats = [] if output_file.exists(): try: with open(output_file) as f: existing_stats = json.load(f) except (OSError, json.JSONDecodeError): existing_stats = [] # Add new stats existing_stats.append(asdict(stats)) # Keep only last 100 entries if len(existing_stats) > 100: existing_stats = existing_stats[-100:] # Save updated stats with open(output_file, "w") as f: json.dump(existing_stats, f, indent=2, default=str) def print_stats(self, stats: PackageStats) -> None: """Print statistics in a readable format.""" for _version in stats.python_versions: pass for _dep in stats.dependencies[:10]: # Show first 10 pass if len(stats.dependencies) > 10: pass for _version in stats.versions[:5]: # Show last 5 versions pass for _maintainer in stats.maintainers: pass def generate_health_report(self, stats: PackageStats) -> dict[str, Any]: """Generate a health report with recommendations.""" report = { "overall_health": stats.health_status, "timestamp": stats.check_timestamp, "issues": [], "recommendations": [], "metrics": { "version_freshness": "good" if stats.health_status == "HEALTHY" else "needs_attention", "download_trend": "unknown", # Would need historical data "dependency_count": len(stats.dependencies), "python_version_support": len(stats.python_versions), }, } # Check for issues if stats.health_status == "STALE": report["issues"].append("Package hasn't been updated in over a year") report["recommendations"].append("Consider releasing a maintenance update") if stats.health_status == "AGING": report["issues"].append("Package hasn't been updated in over 6 months") report["recommendations"].append( "Review for potential updates or bug fixes" ) if stats.recent_downloads < 100: report["issues"].append("Low download count in recent period") report["recommendations"].append( "Consider improving documentation or marketing" ) if len(stats.python_versions) < 3: report["issues"].append("Limited Python version support") report["recommendations"].append("Consider supporting more Python versions") if not stats.description: report["issues"].append("Missing package description") report["recommendations"].append("Add a comprehensive package description") return report def main(): """Main function for command-line usage.""" parser = argparse.ArgumentParser( description="Monitor MockLoop MCP package statistics" ) parser.add_argument( "--package", default="mockloop-mcp", help="Package name to monitor (default: mockloop-mcp)", ) parser.add_argument( "--output", type=Path, help="Output file for statistics (default: package_stats.json)", ) parser.add_argument( "--format", choices=["json", "text", "both"], default="both", help="Output format (default: both)", ) parser.add_argument( "--health-report", action="store_true", help="Generate health report with recommendations", ) parser.add_argument( "--continuous", type=int, metavar="MINUTES", help="Run continuously, checking every N minutes", ) args = parser.parse_args() monitor = PyPIMonitor(args.package) def run_check(): """Run a single monitoring check.""" stats = monitor.collect_stats() if not stats: return False if args.format in ["text", "both"]: monitor.print_stats(stats) if args.format in ["json", "both"]: monitor.save_stats(stats, args.output) if args.health_report: report = monitor.generate_health_report(stats) if report["issues"]: for _issue in report["issues"]: pass if report["recommendations"]: for _rec in report["recommendations"]: pass for _key, _value in report["metrics"].items(): pass return True if args.continuous: try: while True: success = run_check() if success: pass else: pass time.sleep(args.continuous * 60) except KeyboardInterrupt: pass else: success = run_check() sys.exit(0 if success else 1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server