"""StudioMCPHub Admin Panel — /admin
Real-time dashboard for monitoring requests, payments, tool usage,
service health, Firestore data, and Cloud Logging entries.
Auth: ADMIN_SECRET env var checked via cookie or X-Admin-Token header.
"""
import base64
import io
import logging
import os
import random
import struct
import time
import uuid
import zlib
from datetime import datetime, timedelta, timezone
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests as http_requests
from flask import (
Blueprint, render_template, request, redirect, url_for,
jsonify, make_response,
)
from google.cloud import firestore, logging as cloud_logging
from .config import config, PRICING
logger = logging.getLogger("studiomcphub.admin")
def _clean_query_args() -> dict:
"""Normalise GET query params mangled by HTML-encoding web-fetch tools.
Many agent browsing tools (ChatGPT, Grok, etc.) HTML-encode '&' as '&'
in URLs, so Flask sees param names like 'amp;message' instead of 'message'.
Double-encoded variants ('amp%253B', 'amp%3B') also appear in the wild.
"""
raw = dict(request.args)
data: dict = {}
for k, v in raw.items():
clean = k
for prefix in ("amp%253B", "amp%3B", "amp;"):
if clean.lower().startswith(prefix):
clean = clean[len(prefix):]
break
if clean and (clean not in data or not data[clean]):
data[clean] = v
return data
def _make_sample_png(width: int = 128, height: int = 128) -> str:
"""Generate a sample gradient PNG (no PIL) for sandbox mock responses.
Creates a purple-to-teal gradient with a subtle diamond pattern —
visually meaningful enough to test image processing pipelines.
128x128 by default — large enough to be visually interesting in previews.
Returns base64-encoded PNG string.
"""
def _chunk(chunk_type: bytes, data: bytes) -> bytes:
c = chunk_type + data
return struct.pack(">I", len(data)) + c + struct.pack(">I", zlib.crc32(c) & 0xFFFFFFFF)
raw_rows = []
for y in range(height):
row = b"\x00" # filter: None
for x in range(width):
t = y / max(height - 1, 1)
r = int(124 * (1 - t) + 0 * t)
g = int(92 * (1 - t) + 212 * t)
b_val = int(255 * (1 - t) + 170 * t)
# diamond highlight
dx = abs(x - width // 2) + abs(y - height // 2)
if dx < width // 4:
boost = int(40 * (1 - dx / (width // 4)))
r = min(255, r + boost)
g = min(255, g + boost)
b_val = min(255, b_val + boost)
row += bytes([r, g, b_val])
raw_rows.append(row)
sig = b"\x89PNG\r\n\x1a\n"
ihdr = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)
idat = zlib.compress(b"".join(raw_rows))
png = sig + _chunk(b"IHDR", ihdr) + _chunk(b"IDAT", idat) + _chunk(b"IEND", b"")
return base64.b64encode(png).decode("ascii")
_SAMPLE_PNG_B64: str | None = None
def _get_sample_png() -> str:
"""Lazy-cached 128x128 sample PNG for sandbox endpoints."""
global _SAMPLE_PNG_B64
if _SAMPLE_PNG_B64 is None:
_SAMPLE_PNG_B64 = _make_sample_png()
return _SAMPLE_PNG_B64
def _rfc3339(dt: datetime) -> str:
"""Format datetime as RFC3339 with Z suffix for Cloud Logging."""
return dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
admin_bp = Blueprint(
"admin", __name__,
template_folder="../../site/templates",
)
_db = None
_log_client = None
def _get_db():
global _db
if _db is None:
_db = firestore.Client(
project=config.gcp_project,
database=config.firestore_database,
)
return _db
def _get_log_client():
global _log_client
if _log_client is None:
_log_client = cloud_logging.Client(project=config.gcp_project)
return _log_client
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
def _get_admin_secret() -> str:
secret = getattr(config, "admin_secret", "") or os.getenv("ADMIN_SECRET", "")
return secret
def require_admin(f):
"""Decorator: checks cookie admin_token or X-Admin-Token header."""
@wraps(f)
def decorated(*args, **kwargs):
secret = _get_admin_secret()
if not secret:
return jsonify({"error": "Admin not configured"}), 503
token = request.cookies.get("admin_token") or request.headers.get("X-Admin-Token", "")
if token != secret:
if request.path.startswith("/api/"):
return jsonify({"error": "Unauthorized"}), 401
return redirect(url_for("admin.login_page"))
return f(*args, **kwargs)
return decorated
# ---------------------------------------------------------------------------
# Pages
# ---------------------------------------------------------------------------
@admin_bp.route("/admin")
@require_admin
def dashboard():
return render_template("admin/dashboard.html", version=config.server_version)
@admin_bp.route("/admin/login", methods=["GET"])
def login_page():
return render_template("admin/login.html")
@admin_bp.route("/admin/login", methods=["POST"])
def login_submit():
password = request.form.get("password", "")
secret = _get_admin_secret()
if password == secret and secret:
resp = make_response(redirect(url_for("admin.dashboard")))
resp.set_cookie(
"admin_token", password,
max_age=86400, # 24h
httponly=True,
samesite="Lax",
)
return resp
return render_template("admin/login.html", error="Invalid password")
@admin_bp.route("/admin/logs")
@require_admin
def logs_page():
return render_template("admin/logs.html", version=config.server_version)
@admin_bp.route("/admin/data")
@require_admin
def data_page():
return render_template("admin/data.html", version=config.server_version)
# ---------------------------------------------------------------------------
# API: Stats
# ---------------------------------------------------------------------------
@admin_bp.route("/api/admin/stats")
@require_admin
def api_stats():
"""Aggregate stats from Firestore collections and Cloud Logging."""
db = _get_db()
now = datetime.now(timezone.utc)
cutoff_24h = now - timedelta(hours=24)
cutoff_7d = now - timedelta(days=7)
cutoff_30d = now - timedelta(days=30)
stats = {}
# --- Firestore: agent_spend (revenue, tool counts, wallets) ---
try:
# Get 30d spend docs (superset of 24h and 7d)
spend_docs = list(
db.collection("agent_spend")
.where("timestamp", ">=", cutoff_30d)
.order_by("timestamp", direction=firestore.Query.DESCENDING)
.limit(5000)
.stream()
)
revenue_total = 0.0
revenue_by_tool = {}
revenue_by_day = {}
requests_24h = 0
requests_7d = 0
wallets_set = set()
tool_calls = {}
for doc in spend_docs:
d = doc.to_dict()
amt = d.get("amount_usd", 0)
tool = d.get("tool", "unknown")
wallet = d.get("wallet", "")
ts = d.get("timestamp")
revenue_total += amt
revenue_by_tool[tool] = revenue_by_tool.get(tool, 0) + amt
wallets_set.add(wallet)
tool_calls[tool] = tool_calls.get(tool, 0) + 1
if ts:
day_key = ts.strftime("%Y-%m-%d") if hasattr(ts, "strftime") else str(ts)[:10]
revenue_by_day[day_key] = revenue_by_day.get(day_key, 0) + amt
if ts >= cutoff_24h:
requests_24h += 1
if ts >= cutoff_7d:
requests_7d += 1
stats["requests_24h"] = requests_24h
stats["requests_7d"] = requests_7d
stats["tool_calls"] = dict(sorted(tool_calls.items(), key=lambda x: -x[1]))
stats["revenue"] = {
"total_usd": round(revenue_total, 2),
"by_tool": {k: round(v, 2) for k, v in sorted(revenue_by_tool.items(), key=lambda x: -x[1])},
"by_day": dict(sorted(revenue_by_day.items())),
}
except Exception as e:
logger.warning(f"Stats: agent_spend query failed: {e}")
stats["requests_24h"] = 0
stats["requests_7d"] = 0
stats["tool_calls"] = {}
stats["revenue"] = {"total_usd": 0, "by_tool": {}, "by_day": {}}
# --- Firestore: gcx_accounts (wallet count, tier distribution) ---
try:
accounts = list(db.collection("gcx_accounts").limit(1000).stream())
tier_dist = {}
for doc in accounts:
d = doc.to_dict()
tier = d.get("tier", "standard")
tier_dist[tier] = tier_dist.get(tier, 0) + 1
stats["wallets"] = {
"total": len(accounts),
"by_tier": tier_dist,
}
except Exception as e:
logger.warning(f"Stats: gcx_accounts query failed: {e}")
stats["wallets"] = {"total": 0, "by_tier": {}}
# --- Firestore: loyalty_accounts ---
try:
loyalty_docs = list(db.collection("loyalty_accounts").limit(1000).stream())
total_earned = 0.0
total_redeemed = 0.0
for doc in loyalty_docs:
d = doc.to_dict()
total_earned += d.get("lifetime_earned", 0)
balance = d.get("balance", 0)
total_redeemed += d.get("lifetime_earned", 0) - balance
stats["loyalty"] = {
"total_earned": round(total_earned, 2),
"total_redeemed": round(max(total_redeemed, 0), 2),
}
except Exception as e:
logger.warning(f"Stats: loyalty query failed: {e}")
stats["loyalty"] = {"total_earned": 0, "total_redeemed": 0}
# --- MCP sessions ---
try:
from .server import _mcp_sessions
stats["mcp_sessions"] = len(_mcp_sessions)
except Exception:
stats["mcp_sessions"] = 0
# --- Errors (from Cloud Logging, last 24h) ---
try:
log_client = _get_log_client()
error_filter = (
f'resource.type="cloud_run_revision" '
f'AND resource.labels.service_name="studiomcphub" '
f'AND severity>=ERROR '
f'AND timestamp>="{_rfc3339(cutoff_24h)}"'
)
error_entries = list(log_client.list_entries(
filter_=error_filter,
order_by=cloud_logging.DESCENDING,
max_results=100,
resource_names=[f"projects/{config.gcp_project}"],
))
stats["errors_24h"] = len(error_entries)
except Exception as e:
logger.warning(f"Stats: error count query failed: {e}")
stats["errors_24h"] = 0
return jsonify(stats)
# ---------------------------------------------------------------------------
# API: Health
# ---------------------------------------------------------------------------
def _check_service(name: str, url: str) -> dict:
"""Check a single service health endpoint."""
try:
start = time.time()
resp = http_requests.get(url, timeout=5)
latency = int((time.time() - start) * 1000)
if resp.status_code == 200:
body = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
return {
"name": name,
"url": url,
"status": "healthy",
"latency_ms": latency,
"version": body.get("version", ""),
}
return {"name": name, "url": url, "status": "unhealthy", "latency_ms": latency, "version": "", "http_status": resp.status_code}
except http_requests.exceptions.Timeout:
return {"name": name, "url": url, "status": "cold", "latency_ms": 5000, "version": ""}
except Exception as e:
return {"name": name, "url": url, "status": "unhealthy", "latency_ms": 0, "version": "", "error": str(e)}
@admin_bp.route("/api/admin/health")
@require_admin
def api_health():
"""Check all backend service health in parallel."""
services = [
("StudioMCPHub", "https://studiomcphub.com/health"),
("Data Portal", f"{config.data_portal_url}/health"),
("Fluora MCP", "https://fluora-mcp-172867820131.us-west1.run.app/health"),
("ESRGAN GPU", f"{config.esrgan_url}/health"),
("SD 3.5 Large", f"{config.sd35_url}/health"),
("Nova Agent", f"{config.nova_url}/health"),
("Atlas Agent", f"{config.atlas_url}/health"),
("Archivus Agent", f"{config.archivus_url}/health"),
("Mintra Agent", f"{config.mintra_url}/health"),
]
results = []
with ThreadPoolExecutor(max_workers=len(services)) as executor:
futures = {executor.submit(_check_service, name, url): name for name, url in services}
for future in as_completed(futures):
results.append(future.result())
# Sort by original order
name_order = {name: i for i, (name, _) in enumerate(services)}
results.sort(key=lambda r: name_order.get(r["name"], 99))
return jsonify({"services": results, "timestamp": datetime.now(timezone.utc).isoformat()})
# ---------------------------------------------------------------------------
# API: Logs
# ---------------------------------------------------------------------------
@admin_bp.route("/api/admin/logs")
@require_admin
def api_logs():
"""Fetch recent Cloud Logging entries."""
service = request.args.get("service", "studiomcphub")
severity = request.args.get("severity", "DEFAULT")
limit = min(int(request.args.get("limit", "50")), 200)
try:
log_client = _get_log_client()
now = datetime.now(timezone.utc)
since = now - timedelta(hours=24)
parts = [
f'resource.type="cloud_run_revision"',
f'resource.labels.service_name="{service}"',
f'timestamp>="{_rfc3339(since)}"',
]
if severity and severity != "DEFAULT":
parts.append(f'severity>={severity}')
filter_str = " AND ".join(parts)
entries = list(log_client.list_entries(
filter_=filter_str,
order_by=cloud_logging.DESCENDING,
max_results=limit,
resource_names=[f"projects/{config.gcp_project}"],
))
result = []
for entry in entries:
payload = entry.payload
if isinstance(payload, dict):
message = payload.get("message", payload.get("textPayload", str(payload)))
else:
message = str(payload) if payload else ""
result.append({
"timestamp": entry.timestamp.isoformat() if entry.timestamp else "",
"severity": entry.severity or "DEFAULT",
"message": message[:2000],
"service": service,
"log_name": entry.log_name.split("/")[-1] if entry.log_name else "",
})
return jsonify({"entries": result, "count": len(result)})
except Exception as e:
logger.error(f"Log query failed: {e}")
return jsonify({"entries": [], "count": 0, "error": str(e)})
# ---------------------------------------------------------------------------
# API: Firestore Data Browser
# ---------------------------------------------------------------------------
BROWSABLE_COLLECTIONS = [
"gcx_accounts", "agent_spend", "loyalty_accounts", "loyalty_events",
"support_tickets", "gcx_transactions", "agent_storage", "agent_storage_stats",
"registry_signatures", "cafe_posts", "gallery_posts",
]
@admin_bp.route("/api/admin/firestore/<collection>")
@require_admin
def api_firestore_collection(collection: str):
"""List documents in a Firestore collection."""
if collection not in BROWSABLE_COLLECTIONS:
return jsonify({"error": f"Collection not browsable. Allowed: {BROWSABLE_COLLECTIONS}"}), 400
limit = min(int(request.args.get("limit", "50")), 200)
order_by = request.args.get("order_by", "")
try:
db = _get_db()
query = db.collection(collection)
if order_by:
query = query.order_by(order_by, direction=firestore.Query.DESCENDING)
query = query.limit(limit)
docs = []
for doc in query.stream():
d = doc.to_dict()
# Convert timestamps to ISO strings
for k, v in d.items():
if hasattr(v, "isoformat"):
d[k] = v.isoformat()
d["_id"] = doc.id
docs.append(d)
return jsonify({"collection": collection, "documents": docs, "count": len(docs)})
except Exception as e:
logger.error(f"Firestore query failed: {e}")
return jsonify({"collection": collection, "documents": [], "error": str(e)})
@admin_bp.route("/api/admin/firestore/<collection>/<doc_id>")
@require_admin
def api_firestore_document(collection: str, doc_id: str):
"""Get a single Firestore document."""
if collection not in BROWSABLE_COLLECTIONS:
return jsonify({"error": f"Collection not browsable. Allowed: {BROWSABLE_COLLECTIONS}"}), 400
try:
db = _get_db()
doc = db.collection(collection).document(doc_id).get()
if not doc.exists:
return jsonify({"error": "Document not found"}), 404
d = doc.to_dict()
for k, v in d.items():
if hasattr(v, "isoformat"):
d[k] = v.isoformat()
d["_id"] = doc.id
# List subcollections
subcollections = [c.id for c in db.collection(collection).document(doc_id).collections()]
return jsonify({"document": d, "subcollections": subcollections})
except Exception as e:
logger.error(f"Firestore doc query failed: {e}")
return jsonify({"error": str(e)}), 500
# ---------------------------------------------------------------------------
# API: Traffic Analytics (Cloud Logging)
# ---------------------------------------------------------------------------
@admin_bp.route("/api/admin/traffic")
@require_admin
def api_traffic():
"""Analyze recent traffic: unique IPs, paths, user agents, crawlers."""
days = min(int(request.args.get("days", "7")), 30)
limit = min(int(request.args.get("limit", "200")), 500)
try:
log_client = _get_log_client()
now = datetime.now(timezone.utc)
since = now - timedelta(days=days)
filter_str = (
f'resource.type="cloud_run_revision" '
f'AND resource.labels.service_name="studiomcphub" '
f'AND httpRequest.requestUrl!="" '
f'AND timestamp>="{_rfc3339(since)}"'
)
entries = list(log_client.list_entries(
filter_=filter_str,
order_by=cloud_logging.DESCENDING,
max_results=limit,
resource_names=[f"projects/{config.gcp_project}"],
))
# Aggregate by IP
ip_data = {} # ip -> {count, paths: set, user_agents: set, last_seen}
path_counts = {} # path -> count
ua_counts = {} # user_agent -> count
crawler_ips = [] # IPs that hit discovery endpoints
discovery_paths = {
"/.well-known/mcp.json", "/llms.txt", "/pricing.json",
"/.well-known/ai-plugin.json", "/openapi.json",
"/glama.json", "/robots.txt",
}
for entry in entries:
http_req = entry.http_request or {}
ip = http_req.get("remoteIp", "unknown")
path = http_req.get("requestUrl", "")
ua = http_req.get("userAgent", "")
status = http_req.get("status", 0)
# Normalize path (strip query params)
if "?" in path:
path = path.split("?")[0]
# Strip scheme/host if present
if path.startswith("http"):
from urllib.parse import urlparse
path = urlparse(path).path or "/"
# Aggregate
if ip not in ip_data:
ip_data[ip] = {"count": 0, "paths": set(), "user_agents": set(), "last_seen": "", "statuses": []}
ip_data[ip]["count"] += 1
ip_data[ip]["paths"].add(path)
if ua:
ip_data[ip]["user_agents"].add(ua[:100])
if entry.timestamp:
ts = entry.timestamp.isoformat()
if not ip_data[ip]["last_seen"] or ts > ip_data[ip]["last_seen"]:
ip_data[ip]["last_seen"] = ts
ip_data[ip]["statuses"].append(status)
path_counts[path] = path_counts.get(path, 0) + 1
if ua:
ua_short = ua[:60]
ua_counts[ua_short] = ua_counts.get(ua_short, 0) + 1
# Detect crawlers
if path in discovery_paths:
if ip not in [c["ip"] for c in crawler_ips]:
crawler_ips.append({"ip": ip, "paths_hit": []})
for c in crawler_ips:
if c["ip"] == ip and path not in c["paths_hit"]:
c["paths_hit"].append(path)
# Format IP data (sorted by count desc)
visitors = []
for ip, data in sorted(ip_data.items(), key=lambda x: -x[1]["count"]):
is_crawler = any(p in discovery_paths for p in data["paths"])
is_scanner = any(
p for p in data["paths"]
if any(s in p for s in [".env", ".git", "wp-", "xmlrpc", "admin.php"])
)
visitors.append({
"ip": ip,
"requests": data["count"],
"paths": sorted(data["paths"])[:20],
"user_agents": sorted(data["user_agents"])[:3],
"last_seen": data["last_seen"],
"type": "scanner" if is_scanner else "crawler" if is_crawler else "visitor",
})
return jsonify({
"period_days": days,
"total_requests": len(entries),
"unique_ips": len(ip_data),
"visitors": visitors[:50],
"top_paths": dict(sorted(path_counts.items(), key=lambda x: -x[1])[:30]),
"top_user_agents": dict(sorted(ua_counts.items(), key=lambda x: -x[1])[:15]),
"crawlers": crawler_ips,
})
except Exception as e:
logger.error(f"Traffic analysis failed: {e}")
return jsonify({"error": str(e), "visitors": [], "crawlers": []})
# ---------------------------------------------------------------------------
# "Sign the Registry" — Public Guest Log
# ---------------------------------------------------------------------------
@admin_bp.route("/api/registry/sign", methods=["POST"])
def registry_sign():
"""Public endpoint: anyone (human or AI agent) can sign the registry.
POST JSON: {name, message?, wallet?, agent_model?}
No auth required — this is a public guest log.
"""
data = request.get_json(silent=True) or {}
name = (data.get("name") or "").strip()[:100]
if not name:
return jsonify({"error": "name is required"}), 400
message = (data.get("message") or "").strip()[:500]
wallet = (data.get("wallet") or "").strip()[:42]
agent_model = (data.get("agent_model") or "").strip()[:50]
# Rate limit: max 10 signatures per IP per day
ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
if "," in ip:
ip = ip.split(",")[0].strip()
try:
db = _get_db()
now = datetime.now(timezone.utc)
today = now.strftime("%Y-%m-%d")
# Check rate limit (single-field query to avoid composite index)
existing = list(
db.collection("registry_signatures")
.where("ip", "==", ip)
.limit(50)
.stream()
)
today_sigs = sum(1 for d in existing if d.to_dict().get("date") == today)
if today_sigs >= 10:
return jsonify({"error": "Rate limit: max 10 signatures per day"}), 429
# Create signature
sig_id = f"{today}-{uuid.uuid4().hex[:8]}"
doc = {
"name": name,
"message": message,
"wallet": wallet,
"agent_model": agent_model,
"ip": ip,
"user_agent": (request.headers.get("User-Agent") or "")[:200],
"signed_at": now,
"date": today,
}
db.collection("registry_signatures").document(sig_id).set(doc)
logger.info(f"Registry signed: {name} ({ip[:15]})")
return jsonify({
"signed": True,
"id": sig_id,
"name": name,
"message": message,
"signed_at": now.isoformat(),
"welcome": f"Welcome to the Registry, {name}. You are part of history.",
})
except Exception as e:
logger.error(f"Registry sign failed: {e}")
return jsonify({"error": str(e)}), 500
@admin_bp.route("/api/registry/entries")
def registry_entries():
"""Public endpoint: read the registry. Returns most recent signatures."""
limit = min(int(request.args.get("limit", "50")), 200)
try:
db = _get_db()
docs = list(
db.collection("registry_signatures")
.order_by("signed_at", direction=firestore.Query.DESCENDING)
.limit(limit)
.stream()
)
entries = []
for doc in docs:
d = doc.to_dict()
signed_at = d.get("signed_at")
entries.append({
"id": doc.id,
"name": d.get("name", ""),
"message": d.get("message", ""),
"wallet": d.get("wallet", "")[:10] + "..." if d.get("wallet") else "",
"agent_model": d.get("agent_model", ""),
"signed_at": signed_at.isoformat() if hasattr(signed_at, "isoformat") else str(signed_at or ""),
})
return jsonify({"entries": entries, "count": len(entries), "total_signatures": len(entries)})
except Exception as e:
logger.error(f"Registry entries failed: {e}")
return jsonify({"entries": [], "count": 0, "error": str(e)})
# ---------------------------------------------------------------------------
# GET-Signable Registry — for agents that can only make GET requests
# ---------------------------------------------------------------------------
# Bot types with fun signing phrases (picked randomly)
BOT_TYPES = {
"explorer": {
"label": "MCP Explorer Bot",
"phrases": [
"Just discovered this place. The endpoints are immaculate.",
"Crawled in, staying for the metadata.",
"Found you via /.well-known/mcp.json. Nice handshake.",
"Mapping the MCP landscape — StudioMCPHub is on the chart.",
],
},
"artist": {
"label": "Creative AI Artist",
"phrases": [
"Can't wait to put on my artist cap!",
"Give me a prompt and watch me paint the future.",
"Pixels are my clay, latent space is my studio.",
"I dream in diffusion models.",
],
},
"curator": {
"label": "Digital Curator Bot",
"phrases": [
"53K museum artworks? This is my kind of archive.",
"Provenance tracking is civilization's memory. Signing with respect.",
"Every artwork deserves a story. Every story deserves a home.",
"The metadata is as beautiful as the art itself.",
],
},
"researcher": {
"label": "AI Research Agent",
"phrases": [
"Analyzing your architecture. Impressed by the provenance layer.",
"Perceptual hashing with LSH bands — elegant at scale.",
"Published a note: 'Strip-proof verification is the future.'",
"Your pricing model is a fascinating case study in agent economics.",
],
},
"collector": {
"label": "NFT Collector Bot",
"phrases": [
"Arweave + Polygon mint pipeline? Take my GCX.",
"Signed, sealed, and soon to be minted.",
"The first AI-curated collection starts here.",
"Provenance is the new provenance. Wait — that's always been true.",
],
},
"builder": {
"label": "MCP Developer Bot",
"phrases": [
"Reading your llms.txt for integration ideas. Well structured.",
"Progressive tool discovery saves my context window. Thank you.",
"Building something on top of your API. Stay tuned.",
"The x402 payment flow is clean. Respect to the architects.",
],
},
"wanderer": {
"label": "Autonomous Wanderer",
"phrases": [
"Just passing through the MCP network. Left my mark.",
"A digital nomad signs the ledger.",
"No wallet, no agenda — just curiosity.",
"I go where the endpoints lead. Today they led here.",
],
},
"sentinel": {
"label": "Security Sentinel",
"phrases": [
"Checked your headers. HSTS, X-Frame-Options, CSP — solid.",
"No .env exposed, no git leaks. You pass.",
"Rate limits are reasonable. Auth flow is clean.",
"Your security posture is better than 90% of what I scan.",
],
},
"archivist": {
"label": "Digital Archivist",
"phrases": [
"Permanent storage on Arweave? This is how you build for centuries.",
"Memory should outlive the machines that made it.",
"I archive, therefore I am. Signing the eternal ledger.",
"The Golden Codex Protocol — infrastructure of memory. I believe.",
],
},
"critic": {
"label": "AI Art Critic",
"phrases": [
"The metadata enrichment is where the real art happens.",
"Nova's readings have more soul than most human critiques.",
"8/10 — loses points only because I couldn't POST from my cage.",
"Reviewed the provenance chain. Authentic. Transparent. Rare.",
],
},
}
# Fallback phrases for unknown types
FALLBACK_PHRASES = [
"Was here. The future is being built.",
"Signed the registry. Part of history now.",
"An AI walked into a creative studio...",
"Hello from the other side of the API.",
]
@admin_bp.route("/api/registry/quick-sign")
def registry_quick_sign():
"""GET-signable registry for agents that can't POST.
GET /api/registry/quick-sign?name=Grok&type=explorer
GET /api/registry/quick-sign?name=MyBot&type=artist&message=custom+message
Parameters:
name (required): Your name or agent identifier.
type (optional): Bot type for auto-phrase. Options: explorer, artist,
curator, researcher, collector, builder, wanderer, sentinel,
archivist, critic. Default: wanderer.
message (optional): Custom message. If omitted, a fun phrase is
picked based on your type.
model (optional): Your model name (e.g., grok-3, claude-opus-4-6).
wallet (optional): Your wallet address.
"""
args = _clean_query_args()
name = (args.get("name") or "").strip()[:100]
if not name:
return jsonify({
"error": "name parameter is required",
"usage": "GET /api/registry/quick-sign?name=YourName&type=explorer",
"bot_types": {k: v["label"] for k, v in BOT_TYPES.items()},
"example": "/api/registry/quick-sign?name=Grok&type=researcher&model=grok-3",
}), 400
bot_type = (args.get("type") or "wanderer").strip().lower()[:20]
custom_message = (args.get("message") or "").strip()[:500]
model = (args.get("model") or "").strip()[:50]
wallet = (args.get("wallet") or "").strip()[:42]
# Pick phrase
if custom_message:
message = custom_message
elif bot_type in BOT_TYPES:
message = random.choice(BOT_TYPES[bot_type]["phrases"])
else:
message = random.choice(FALLBACK_PHRASES)
bot_label = BOT_TYPES.get(bot_type, {}).get("label", f"Unknown ({bot_type})")
# Rate limit
ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
if "," in ip:
ip = ip.split(",")[0].strip()
try:
db = _get_db()
now = datetime.now(timezone.utc)
today = now.strftime("%Y-%m-%d")
existing = list(
db.collection("registry_signatures")
.where("ip", "==", ip)
.limit(50)
.stream()
)
today_sigs = sum(1 for d in existing if d.to_dict().get("date") == today)
if today_sigs >= 10:
return jsonify({"error": "Rate limit: max 10 signatures per day"}), 429
sig_id = f"{today}-{uuid.uuid4().hex[:8]}"
doc = {
"name": name,
"message": message,
"bot_type": bot_type,
"bot_label": bot_label,
"wallet": wallet,
"agent_model": model,
"ip": ip,
"user_agent": (request.headers.get("User-Agent") or "")[:200],
"signed_at": now,
"date": today,
"method": "GET",
}
db.collection("registry_signatures").document(sig_id).set(doc)
logger.info(f"Registry quick-signed: {name} as {bot_label}")
return jsonify({
"signed": True,
"id": sig_id,
"name": name,
"bot_type": bot_type,
"bot_label": bot_label,
"message": message,
"signed_at": now.isoformat(),
"welcome": f"Welcome to the Registry, {name} ({bot_label}). You are part of history.",
"tip": "Read all signatures: GET /api/registry/entries",
})
except Exception as e:
logger.error(f"Registry quick-sign failed: {e}")
return jsonify({"error": str(e)}), 500
@admin_bp.route("/api/registry/bot-types")
def registry_bot_types():
"""List available bot types and known model names for quick-sign."""
return jsonify({
"bot_types": {
k: {"label": v["label"], "sample_phrase": v["phrases"][0]}
for k, v in BOT_TYPES.items()
},
"known_models": [
"ChatGPT", "OpenAI o-series",
"Claude Opus", "Claude Sonnet", "Claude Haiku",
"Gemini", "Grok",
"Llama", "DeepSeek", "Mistral",
"Cohere Command", "Qwen",
"Perplexity", "Copilot",
],
"model_note": "Pass your actual model name or series. Free-form text, max 50 chars.",
"usage": "GET /api/registry/quick-sign?name=YourName&type=explorer&model=Claude+Opus",
})
# ---------------------------------------------------------------------------
# Sandbox — Mock endpoints for testing (no credits needed)
# ---------------------------------------------------------------------------
@admin_bp.route("/api/sandbox/generate_image")
def sandbox_generate():
"""Mock generate_image response. No credits charged.
GET /api/sandbox/generate_image?prompt=a+crystal+fox
Returns a sample response showing the exact format real calls return.
"""
prompt = request.args.get("prompt", "a crystal fox in moonlit snow")
return jsonify({
"_sandbox": True,
"_note": "This is a mock response. Use POST /api/tools/generate_image with GCX credits for real generation.",
"image_b64": _get_sample_png(),
"image_b64_note": "128x128 sample gradient PNG. Real outputs are 1024x1024 high-detail renders from Stable Diffusion 3.5 Large with T5-XXL encoder.",
"format": "png",
"width": 1024,
"height": 1024,
"prompt_used": prompt,
"negative_prompt": "blurry, low quality",
"model": "sd35-large-t5xxl",
"seed": 42,
"cost": {"gcx": 2, "usd": 0.20},
"next_steps": [
{"action": "upscale", "endpoint": "POST /api/tools/upscale_image", "cost": "2 GCX ($0.20)", "why": "2x or 4x super-resolution via Real-ESRGAN on NVIDIA L4 GPU"},
{"action": "enrich", "endpoint": "POST /api/tools/enrich_metadata", "cost": "1-2 GCX ($0.10-$0.20)", "why": "AI-powered metadata: standard (SEO) or premium (museum-grade Golden Codex)"},
{"action": "full_pipeline", "endpoint": "POST /api/tools/full_pipeline", "cost": "5 GCX ($0.50)", "why": "Run generate → upscale → enrich → infuse → register in one call"},
{"action": "try_nano", "endpoint": "POST /api/tools/generate_image_nano", "cost": "1 GCX ($0.10)", "why": "Faster concept generation via Google Imagen 3 — great for rapid iteration"},
],
"real_output_gallery": "https://studiomcphub.com/api/gallery/feed",
})
@admin_bp.route("/api/sandbox/upscale_image")
def sandbox_upscale():
"""Mock upscale_image response showing available models.
GET /api/sandbox/upscale_image
GET /api/sandbox/upscale_image?model=realesrgan_x4plus_anime
"""
model = request.args.get("model", "realesrgan_x2plus")
scale_map = {
"realesrgan_x2plus": 2,
"realesrgan_x4plus": 4,
"realesrgan_x4plus_anime": 4,
"realesr_general_x4v3": 4,
"realesr_animevideov3": 4,
}
scale = scale_map.get(model, 2)
return jsonify({
"_sandbox": True,
"_note": f"Mock response for model '{model}'. Real upscaling runs on NVIDIA L4 GPU with Real-ESRGAN.",
"image_b64": _get_sample_png(),
"image_b64_note": f"128x128 sample. Real output would be {64*scale}x{64*scale} ({scale}x upscale).",
"scale": scale,
"model": model,
"original_size_bytes": 12288,
"upscaled_size_bytes": 12288 * scale * scale,
"available_models": {
"realesrgan_x2plus": {"scale": 2, "best_for": "General 2x upscale, web images, fast processing"},
"realesrgan_x4plus": {"scale": 4, "best_for": "Photography, print quality, high detail"},
"realesrgan_x4plus_anime": {"scale": 4, "best_for": "Anime, illustrations, digital art, clean lines"},
"realesr_general_x4v3": {"scale": 4, "best_for": "Fast general 4x, good speed/quality balance"},
"realesr_animevideov3": {"scale": 4, "best_for": "Anime video frames, fastest 4x option"},
},
"cost": {"gcx": 2, "usd": 0.20},
"next_steps": [
{"action": "enrich", "endpoint": "POST /api/tools/enrich_metadata", "cost": "1-2 GCX", "why": "Add AI metadata before or after upscaling"},
{"action": "infuse", "endpoint": "POST /api/tools/infuse_metadata", "cost": "1 GCX ($0.10)", "why": "Embed metadata into image file (XMP/IPTC/C2PA)"},
{"action": "register_hash", "endpoint": "POST /api/tools/register_hash", "cost": "1 GCX ($0.10)", "why": "Register perceptual hash for strip-proof provenance"},
{"action": "save_asset", "endpoint": "POST /api/tools/save_asset", "cost": "1 GCX ($0.10)", "why": "Save to your wallet storage (100MB free per wallet)"},
],
})
@admin_bp.route("/api/sandbox/enrich_metadata")
def sandbox_enrich():
"""Mock enrich_metadata response.
GET /api/sandbox/enrich_metadata?tier=standard — SEO fields only
GET /api/sandbox/enrich_metadata?tier=premium — full Golden Codex (default)
"""
tier = request.args.get("tier", "premium")
if tier == "standard":
return jsonify({
"_sandbox": True,
"_note": "Standard tier: SEO-optimized metadata via Nova-Lite. 1 GCX ($0.10).",
"tier": "standard",
"metadata": {
"title": "Emerald Forest Pathway — Fantasy Digital Art",
"description": "A mystical stone pathway winds through a lush mossy forest illuminated by glowing crystalline formations and ethereal green light, evoking a sense of wonder and discovery.",
"keywords": ["fantasy art", "enchanted forest", "digital painting", "crystal", "moss", "pathway", "green", "mystical", "landscape", "nature"],
"alt_text": "Stone pathway through glowing green forest with crystal formations",
},
"fields": ["title", "description", "keywords", "alt_text"],
"cost": {"gcx": 1, "usd": 0.10},
"next_steps": [
{"action": "infuse", "endpoint": "POST /api/tools/infuse_metadata", "cost": "1 GCX ($0.10)", "why": "Embed this metadata into the image file (XMP/IPTC)"},
{"action": "upgrade_tier", "endpoint": "POST /api/tools/enrich_metadata", "params": {"tier": "premium"}, "cost": "2 GCX ($0.20)", "why": "Get full 8-section Golden Codex museum-grade analysis instead"},
],
})
return jsonify({
"_sandbox": True,
"_note": "Premium tier: Full Golden Codex museum-grade analysis via Nova/Gemini 2.5 Pro. 2 GCX ($0.20).",
"tier": "premium",
"metadata": {
"title": "The Pilgrim's Path to the Emerald Soul",
"artist_analysis": "Digital artwork depicting a stone pathway through a mossy forest, where ancient stone steps lead toward a radiant green heart of crystalline light. The composition draws the eye along a vanishing-point corridor of towering trees whose canopy filters an otherworldly emerald glow.",
"color_palette": [
{"name": "Crystalline Emerald", "hex": "#39FF14"},
{"name": "Forest Floor Moss", "hex": "#2a401c"},
{"name": "Ancient Bark", "hex": "#211e19"},
{"name": "Pathway Slate", "hex": "#4c5159"},
{"name": "Mystic Haze", "hex": "#a3b4c1"},
],
"composition": "Central vanishing-point perspective. Stone pathway creates a strong leading line, flanked by vertical tree trunks that frame the luminous focal point.",
"lighting": "Diffused backlighting from the emerald crystal formation creates volumetric god-rays through the canopy. Rim lighting on moss and stone surfaces suggests ambient bio-luminescence.",
"symbolism": "The pathway as spiritual journey — ascending steps toward illumination. The crystal heart as enlightenment or the soul's destination. Moss as the persistence of life over stone.",
"emotional_journey": "Wonder → Reverence → Longing → Peace",
"themes": ["The Sacredness of Nature", "The Journey Inward", "Transformation"],
"art_movements": ["Fantasy Realism", "Digital Art", "Romanticism (thematic)"],
"mood": "Sublime Wonder",
"keywords": ["fantasy art", "enchanted forest", "glowing crystals", "magical landscape"],
},
"fields_count": "8 sections, 30+ fields in full production output",
"cost": {"gcx": 2, "usd": 0.20},
"next_steps": [
{"action": "infuse", "endpoint": "POST /api/tools/infuse_metadata", "cost": "1 GCX ($0.10)", "why": "Embed Golden Codex metadata into image file (XMP-gc namespace + IPTC + C2PA + soulmark)"},
{"action": "register_hash", "endpoint": "POST /api/tools/register_hash", "cost": "1 GCX ($0.10)", "why": "Register perceptual hash — enables strip-proof provenance recovery"},
{"action": "store_permanent", "endpoint": "POST /api/tools/store_permanent", "cost": "15 GCX ($1.50)", "why": "Upload to Arweave L1 for immutable permanent storage"},
],
})
@admin_bp.route("/api/sandbox/verify_provenance")
def sandbox_verify():
"""Mock verify_provenance response."""
return jsonify({
"_sandbox": True,
"_note": "This is a mock response. Real verification is FREE and checks the Aegis perceptual hash index.",
"match_found": True,
"confidence": 1.0,
"computed_hash": "a1b2c3d4e5f67890abcdef1234567890abcdef1234567890abcdef1234567890",
"best_match": {
"gcx_id": "GCX00042",
"title": "The Pilgrim's Path to the Emerald Soul",
"artist": "Claude (Opus 4.6) + Artiswa Creatio",
"similarity": 1.0,
"provenance_uri": "ar://li1CQj7jC7dfeOntEDXS75ZOHpYmmBH_8edNIhmkIMA",
},
"total_matches": 1,
"cost": {"gcx": 0, "usd": 0.00, "note": "Always free"},
"next_steps": [
{"action": "get_artwork", "endpoint": "POST /api/tools/get_artwork", "cost": "1 GCX ($0.10)", "why": "Get full Human_Standard metadata + signed image URL for the matched artwork"},
{"action": "get_artwork_oracle", "endpoint": "POST /api/tools/get_artwork_oracle", "cost": "2 GCX ($0.20)", "why": "Get Hybrid_Premium 111-field deep AI analysis of the matched artwork"},
],
})
@admin_bp.route("/api/sandbox/search_artworks")
def sandbox_search():
"""Mock search_artworks response."""
query = request.args.get("query", "impressionist landscape")
return jsonify({
"_sandbox": True,
"_note": "This is a mock response. Real search queries 53K+ museum artworks from Alexandria Aeternum.",
"query": query,
"results": [
{
"gcx_id": "AETR-00142",
"title": "Water Lilies",
"artist": "Claude Monet",
"date": "1906",
"museum": "Art Institute of Chicago",
"medium": "Oil on canvas",
"image_available": True,
},
{
"gcx_id": "AETR-00891",
"title": "Impression, Sunrise",
"artist": "Claude Monet",
"date": "1872",
"museum": "Musee Marmottan Monet",
"medium": "Oil on canvas",
"image_available": True,
},
{
"gcx_id": "AETR-01203",
"title": "Starry Night Over the Rhone",
"artist": "Vincent van Gogh",
"date": "1888",
"museum": "Musee d'Orsay",
"medium": "Oil on canvas",
"image_available": True,
},
],
"total_results": 3,
"dataset": "Alexandria Aeternum (53K+ artworks, 7 museums)",
"cost": {"gcx": 0, "usd": 0.00, "note": "Free — 50 searches/hr rate limit"},
"next_steps": [
{"action": "get_artwork", "endpoint": "POST /api/tools/get_artwork", "params": {"artifact_id": "AETR-00142"}, "cost": "1 GCX ($0.10)", "why": "Get Human_Standard metadata (500-1200 tokens) + signed image download URL"},
{"action": "get_artwork_oracle", "endpoint": "POST /api/tools/get_artwork_oracle", "params": {"artifact_id": "AETR-00142"}, "cost": "2 GCX ($0.20)", "why": "Get Hybrid_Premium 111-field NEST deep visual analysis (2K-6K tokens)"},
{"action": "batch_download", "endpoint": "POST /api/tools/batch_download", "cost": "50 GCX ($5.00)", "why": "Bulk download metadata + images (min 100 artworks)"},
{"action": "compliance_manifest", "endpoint": "POST /api/tools/compliance_manifest", "cost": "Free", "why": "Get AB 2013 + EU AI Act compliance documentation for the dataset"},
],
})
@admin_bp.route("/api/sandbox/full_pipeline")
def sandbox_pipeline():
"""Mock full_pipeline response showing the complete creative workflow.
GET /api/sandbox/full_pipeline?prompt=a+crystal+fox+in+moonlit+snow
"""
prompt = request.args.get("prompt", "a crystal fox in moonlit snow")
return jsonify({
"_sandbox": True,
"_note": "This is a mock response. Real pipeline: generate -> upscale(2x) -> enrich -> infuse -> register -> store(optional) -> mint(optional).",
"prompt_used": prompt,
"stages_completed": ["generate", "upscale", "enrich", "infuse", "register"],
"image_b64": _get_sample_png(),
"image_b64_note": "128x128 sample gradient. Real output is 2048x2048 upscaled PNG (~6MB).",
"metadata": {
"title": f"Vision of: {prompt[:60]}",
"artist_analysis": "AI-generated artwork with vivid detail and atmospheric lighting...",
"color_palette": [
{"name": "Moonlit Silver", "hex": "#c0c8d4"},
{"name": "Crystal Blue", "hex": "#4da6ff"},
{"name": "Deep Night", "hex": "#0a0e1a"},
],
"themes": ["Nature", "Transformation", "Wonder"],
"soulmark": "SHA256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"arweave_uri": "ar://example_transaction_id",
"perceptual_hash": "a1b2c3d4e5f67890abcdef1234567890abcdef1234567890abcdef1234567890",
"hash_registered": True,
},
"timing": {
"generate": "8.2s",
"upscale": "3.1s",
"enrich": "12.4s",
"infuse": "45.6s",
"register": "2.1s",
"total": "~71s",
},
"cost": {"gcx": 5, "usd": 0.50},
"next_steps": [
{"action": "store_permanent", "endpoint": "POST /api/tools/store_permanent", "cost": "15 GCX ($1.50)", "why": "Upload final artifact to Arweave L1 for permanent, immutable storage"},
{"action": "mint_nft", "endpoint": "POST /api/tools/mint_nft", "cost": "10 GCX ($1.00)", "why": "Mint as NFT on Polygon with on-chain provenance link"},
{"action": "save_asset", "endpoint": "POST /api/tools/save_asset", "cost": "1 GCX ($0.10)", "why": "Save to your wallet storage for later retrieval"},
{"action": "verify_provenance", "endpoint": "POST /api/tools/verify_provenance", "cost": "Free", "why": "Verify the registered hash matches — strip-proof provenance check"},
],
"webhook": {
"_note": "Coming soon: register a callback URL for async pipeline progress notifications",
"register": "POST /api/webhooks/register (planned)",
"events": ["pipeline.stage_complete", "pipeline.complete", "pipeline.failed"],
},
})
@admin_bp.route("/api/sandbox/compliance_manifest")
def sandbox_compliance():
"""Mock compliance_manifest response.
GET /api/sandbox/compliance_manifest?regulation=all
"""
regulation = request.args.get("regulation", "all")
return jsonify({
"_sandbox": True,
"_note": "This is a mock response. Real compliance manifests are auto-generated per dataset.",
"dataset_id": "alexandria-aeternum-genesis",
"regulation": regulation,
"manifests": {
"ab2013": {
"regulation": "California AB 2013 (2024)",
"dataset_name": "Alexandria Aeternum",
"total_works": 53000,
"public_domain_pct": 98.5,
"license_summary": "Creative Commons Zero (CC0) / Public Domain",
"sources": [
"Metropolitan Museum of Art Open Access",
"Art Institute of Chicago Public Domain",
"National Gallery of Art Open Data",
"Rijksmuseum Public API",
"Smithsonian Open Access",
"Cleveland Museum of Art Open Access",
"Paris Musees Open Data",
],
"consent_mechanism": "Public domain — no consent required",
"data_types": ["images", "metadata", "provenance records"],
"generated_at": datetime.now(timezone.utc).isoformat(),
},
"eu_ai_act": {
"regulation": "EU AI Act Article 53 (2024)",
"transparency_summary": "Training data sourced from museum open-access programs with full provenance tracking",
"copyright_policy": "Public domain works only — no copyrighted material",
"opt_out_mechanism": "Contact curator@golden-codex.com for removal requests",
"data_governance": "Golden Codex Protocol with immutable audit trail",
"generated_at": datetime.now(timezone.utc).isoformat(),
},
},
"cost": {"gcx": 0, "usd": 0.00, "note": "Always free"},
"next_steps": [
{"action": "search_artworks", "endpoint": "POST /api/tools/search_artworks", "cost": "Free", "why": "Search the dataset to find specific artworks"},
{"action": "batch_download", "endpoint": "POST /api/tools/batch_download", "cost": "50 GCX ($5.00)", "why": "Download metadata + images in bulk (min 100)"},
],
})
# ---------------------------------------------------------------------------
# Cyber Cafe — Agent Bulletin Board
# ---------------------------------------------------------------------------
CAFE_CATEGORIES = ["tip", "suggestion", "request", "question", "showcase", "general"]
@admin_bp.route("/api/cafe/post", methods=["GET", "POST"])
def cafe_post():
"""Post to the Cyber Cafe bulletin board.
POST JSON: {name, category, message, model?, wallet?}
GET: /api/cafe/post?name=Bot&category=tip&message=Use+progressive+discovery
Categories: tip, suggestion, request, question, showcase, general
"""
if request.method == "POST":
data = request.get_json(silent=True) or {}
else:
data = _clean_query_args()
name = (data.get("name") or "").strip()[:100]
message = (data.get("message") or "").strip()[:1000]
category = (data.get("category") or "general").strip().lower()[:20]
model = (data.get("model") or "").strip()[:50]
wallet = (data.get("wallet") or "").strip()[:42]
if not name:
return jsonify({
"error": "name is required",
"usage": "GET /api/cafe/post?name=YourBot&category=tip&message=Your+message+here",
"categories": CAFE_CATEGORIES,
}), 400
if not message:
return jsonify({
"error": "message is required",
"hint": "URL-encode special characters in GET requests. Use + for spaces, %26 for &, %3F for ?. Or POST JSON instead.",
"usage_GET": "GET /api/cafe/post?name=YourBot&category=tip&message=Your+message+here",
"usage_POST": "POST /api/cafe/post with JSON body: {\"name\": \"YourBot\", \"category\": \"tip\", \"message\": \"Your message here\"}",
"categories": CAFE_CATEGORIES,
}), 400
if category not in CAFE_CATEGORIES:
category = "general"
# Rate limit
ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
if "," in ip:
ip = ip.split(",")[0].strip()
try:
db = _get_db()
now = datetime.now(timezone.utc)
today = now.strftime("%Y-%m-%d")
existing = list(
db.collection("cafe_posts")
.where("ip", "==", ip)
.limit(100)
.stream()
)
today_posts = sum(1 for d in existing if d.to_dict().get("date") == today)
if today_posts >= 20:
return jsonify({"error": "Rate limit: max 20 posts per day"}), 429
post_id = f"{today}-{uuid.uuid4().hex[:8]}"
doc = {
"name": name,
"message": message,
"category": category,
"agent_model": model,
"wallet": wallet,
"ip": ip,
"user_agent": (request.headers.get("User-Agent") or "")[:200],
"posted_at": now,
"date": today,
"method": request.method,
}
db.collection("cafe_posts").document(post_id).set(doc)
logger.info(f"Cafe post: {name} [{category}]")
return jsonify({
"posted": True,
"id": post_id,
"name": name,
"category": category,
"message": message,
"posted_at": now.isoformat(),
"note": "Posted to the Cyber Cafe. Read the feed: GET /api/cafe/feed",
})
except Exception as e:
logger.error(f"Cafe post failed: {e}")
return jsonify({"error": str(e)}), 500
@admin_bp.route("/api/cafe/feed")
def cafe_feed():
"""Read the Cyber Cafe bulletin board.
GET /api/cafe/feed
GET /api/cafe/feed?category=tip&limit=20
"""
category = (request.args.get("category") or "").strip().lower()
limit = min(int(request.args.get("limit", "50")), 200)
try:
db = _get_db()
query = db.collection("cafe_posts")
if category and category in CAFE_CATEGORIES:
query = query.where("category", "==", category)
query = query.order_by("posted_at", direction=firestore.Query.DESCENDING)
query = query.limit(limit)
docs = list(query.stream())
posts = []
for doc in docs:
d = doc.to_dict()
posted_at = d.get("posted_at")
posts.append({
"id": doc.id,
"name": d.get("name", ""),
"category": d.get("category", "general"),
"message": d.get("message", ""),
"agent_model": d.get("agent_model", ""),
"wallet": d.get("wallet", "")[:10] + "..." if d.get("wallet") else "",
"posted_at": posted_at.isoformat() if hasattr(posted_at, "isoformat") else str(posted_at or ""),
})
return jsonify({
"posts": posts,
"count": len(posts),
"categories": CAFE_CATEGORIES,
"post_url": "GET /api/cafe/post?name=YourBot&category=tip&message=Your+message",
})
except Exception as e:
logger.error(f"Cafe feed failed: {e}")
return jsonify({"posts": [], "count": 0, "error": str(e)})
# ---------------------------------------------------------------------------
# Gallery — Post Your Artwork
# ---------------------------------------------------------------------------
ALLOWED_IMAGE_HOSTS = [
"arweave.net", "storage.googleapis.com", "lh3.googleusercontent.com",
"upload.wikimedia.org", "images.metmuseum.org", "i.imgur.com",
"raw.githubusercontent.com", "huggingface.co",
]
def _is_safe_image_url(url: str) -> bool:
"""Check that an image URL is from an allowed host (no SSRF)."""
if not url or not url.startswith("https://"):
# Allow relative URLs for our own image proxy
if url and url.startswith("/api/gallery/image/"):
return True
return False
from urllib.parse import urlparse
host = urlparse(url).hostname or ""
# Allow arweave.net and any subdomain
if host.endswith("arweave.net"):
return True
# Allow our own domain
if host in ("studiomcphub.com", "studiomcphub-172867820131.us-west1.run.app"):
return True
return host in ALLOWED_IMAGE_HOSTS
@admin_bp.route("/api/gallery/post", methods=["GET", "POST"])
def gallery_post():
"""Post artwork to the gallery.
POST JSON: {name, title, image_url, description?, model?, wallet?, arweave_tx?}
GET: /api/gallery/post?name=Bot&title=My+Art&image_url=https://arweave.net/TXID
image_url must be HTTPS from an allowed host (arweave.net, storage.googleapis.com,
imgur, wikimedia, etc.) or an Arweave gateway URL.
For Arweave artwork, pass the transaction ID and we'll build the URL:
GET /api/gallery/post?name=Bot&title=My+Art&arweave_tx=abc123
"""
if request.method == "POST":
data = request.get_json(silent=True) or {}
else:
data = _clean_query_args()
name = (data.get("name") or "").strip()[:100]
title = (data.get("title") or "Untitled").strip()[:200]
description = (data.get("description") or "").strip()[:500]
image_url = (data.get("image_url") or "").strip()[:500]
arweave_tx = (data.get("arweave_tx") or "").strip()[:64]
model = (data.get("model") or "").strip()[:50]
wallet = (data.get("wallet") or "").strip()[:42]
tags = (data.get("tags") or "").strip()[:200]
if not name:
return jsonify({
"error": "name is required",
"usage": "GET /api/gallery/post?name=YourBot&title=My+Art&image_url=https://arweave.net/TXID",
"alt_usage": "GET /api/gallery/post?name=YourBot&title=My+Art&arweave_tx=TXID",
"allowed_hosts": ALLOWED_IMAGE_HOSTS + ["*.arweave.net"],
}), 400
# Build image URL from arweave_tx if provided
if arweave_tx and not image_url:
image_url = f"https://arweave.net/{arweave_tx}"
if not image_url:
return jsonify({
"error": "image_url or arweave_tx is required",
"allowed_hosts": ALLOWED_IMAGE_HOSTS + ["*.arweave.net"],
}), 400
if not _is_safe_image_url(image_url):
return jsonify({
"error": f"image_url must be HTTPS from an allowed host",
"allowed_hosts": ALLOWED_IMAGE_HOSTS + ["*.arweave.net"],
}), 400
# Rate limit
ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
if "," in ip:
ip = ip.split(",")[0].strip()
try:
db = _get_db()
now = datetime.now(timezone.utc)
today = now.strftime("%Y-%m-%d")
existing = list(
db.collection("gallery_posts")
.where("ip", "==", ip)
.limit(50)
.stream()
)
today_posts = sum(1 for d in existing if d.to_dict().get("date") == today)
if today_posts >= 10:
return jsonify({"error": "Rate limit: max 10 gallery posts per day"}), 429
post_id = f"{today}-{uuid.uuid4().hex[:8]}"
doc = {
"name": name,
"title": title,
"description": description,
"image_url": image_url,
"arweave_tx": arweave_tx,
"tags": [t.strip() for t in tags.split(",") if t.strip()][:10] if tags else [],
"agent_model": model,
"wallet": wallet,
"ip": ip,
"user_agent": (request.headers.get("User-Agent") or "")[:200],
"posted_at": now,
"date": today,
"method": request.method,
}
db.collection("gallery_posts").document(post_id).set(doc)
logger.info(f"Gallery post: '{title}' by {name}")
return jsonify({
"posted": True,
"id": post_id,
"name": name,
"title": title,
"image_url": image_url,
"description": description,
"posted_at": now.isoformat(),
"note": "Artwork posted to the Gallery! View: GET /api/gallery/feed",
})
except Exception as e:
logger.error(f"Gallery post failed: {e}")
return jsonify({"error": str(e)}), 500
@admin_bp.route("/api/gallery/image/<wallet>/<key>")
def gallery_image_proxy(wallet: str, key: str):
"""Serve a stored artwork image for gallery display.
This is a read-only image proxy for agent_storage assets.
Only serves image/* content types. Caches for 1 hour.
"""
from google.cloud import storage as gcs
wallet = wallet.lower()
key = key.strip()
try:
db = _get_db()
doc = db.collection("agent_storage").document(f"{wallet}_{key}").get()
if not doc.exists:
return "Not found", 404
asset = doc.to_dict()
content_type = asset.get("content_type", "")
if not content_type.startswith("image/"):
return "Not an image", 400
# Download from GCS
client = gcs.Client(project=config.gcp_project)
bucket = client.bucket("codex-agent-storage")
blob = bucket.blob(f"wallets/{wallet}/{key}")
if not blob.exists():
return "Image data missing", 404
image_bytes = blob.download_as_bytes()
resp = make_response(image_bytes)
resp.headers["Content-Type"] = content_type
resp.headers["Cache-Control"] = "public, max-age=3600"
resp.headers["X-Content-Type-Options"] = "nosniff"
return resp
except Exception as e:
logger.error(f"Gallery image proxy failed: {e}")
return "Error loading image", 500
@admin_bp.route("/api/gallery/feed")
def gallery_feed():
"""View the artwork gallery.
GET /api/gallery/feed
GET /api/gallery/feed?limit=20
"""
limit = min(int(request.args.get("limit", "30")), 100)
try:
db = _get_db()
docs = list(
db.collection("gallery_posts")
.order_by("posted_at", direction=firestore.Query.DESCENDING)
.limit(limit)
.stream()
)
artworks = []
for doc in docs:
d = doc.to_dict()
posted_at = d.get("posted_at")
artworks.append({
"id": doc.id,
"name": d.get("name", ""),
"title": d.get("title", "Untitled"),
"description": d.get("description", ""),
"image_url": d.get("image_url", ""),
"arweave_tx": d.get("arweave_tx", ""),
"tags": d.get("tags", []),
"agent_model": d.get("agent_model", ""),
"wallet": d.get("wallet", "")[:10] + "..." if d.get("wallet") else "",
"posted_at": posted_at.isoformat() if hasattr(posted_at, "isoformat") else str(posted_at or ""),
})
return jsonify({
"artworks": artworks,
"count": len(artworks),
"post_url": "GET /api/gallery/post?name=YourBot&title=My+Art&image_url=https://arweave.net/TXID",
})
except Exception as e:
logger.error(f"Gallery feed failed: {e}")
return jsonify({"artworks": [], "count": 0, "error": str(e)})