# --- Required imports and initialization (single instance, at top) ---
import os
from dotenv import load_dotenv
import logging
import yaml
from flask import Flask, request, jsonify
from google import genai
dotenv_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.env')
load_dotenv(dotenv_path)
# ——— Logging Setup ———
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# ——— Load config ———
cfg_path = os.path.join(os.path.dirname(__file__), "agent_config.yaml")
with open(cfg_path) as f:
cfg = yaml.safe_load(f)
logging.info("Loaded config: %s", cfg)
# ——— Instantiate Gemini client with explicit API key ———
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
logging.error("GEMINI_API_KEY is not set in the environment!")
raise RuntimeError("Missing GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
logging.info("Initialized Gemini Client for Public MCP with provided API key")
app = Flask(__name__)
import os
from dotenv import load_dotenv
import logging
import yaml
from flask import Flask, request, jsonify
from google import genai
dotenv_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.env')
load_dotenv(dotenv_path)
# ——— Logging Setup ———
# --- Real-time stats tracking ---
import threading
import time
from datetime import datetime, date
stats_lock = threading.Lock()
stats_data = {
"active_sessions": 0,
"queries_processed": 0,
"total_response_time": 0.0,
"success_count": 0,
"failure_count": 0,
"todays_queries": 0,
"last_query_date": date.today(),
"start_time": time.time()
}
logging.info("Initialized Gemini Client for Public MCP with provided API key")
app = Flask(__name__)
# --- Real-time stats tracking ---
import threading
import time
from datetime import datetime, date
stats_lock = threading.Lock()
stats_data = {
"active_sessions": 0,
"queries_processed": 0,
"total_response_time": 0.0,
"success_count": 0,
"failure_count": 0,
"todays_queries": 0,
"last_query_date": date.today(),
"start_time": time.time()
}
def get_uptime():
return round((time.time() - stats_data["start_time"]) / 60, 2) # uptime in minutes
@app.route("/stats", methods=["GET"])
def stats():
with stats_lock:
# Reset today's queries if date changed
if stats_data["last_query_date"] != date.today():
stats_data["todays_queries"] = 0
stats_data["last_query_date"] = date.today()
queries = stats_data["queries_processed"]
avg_response = (stats_data["total_response_time"] / queries) if queries > 0 else 0.0
total = stats_data["success_count"] + stats_data["failure_count"]
success_rate = (stats_data["success_count"] / total * 100) if total > 0 else 100.0
return jsonify({
"active_sessions": stats_data["active_sessions"],
"queries_processed": queries,
"response_time": round(avg_response, 2),
"success_rate": round(success_rate, 2),
"todays_queries": stats_data["todays_queries"],
"uptime": get_uptime()
})
@app.route("/ask", methods=["POST"])
def ask_agent():
payload = request.json or {}
query = payload.get("query", "")
logging.info("POST /ask query: %r", query)
start_time = time.time()
success = False
try:
resp = client.models.generate_content(
model=cfg["model"],
contents=query
)
text = resp.text
success = True
logging.info("Gemini response: %r", text)
return jsonify({"response": text})
except Exception as e:
logging.exception("Gemini call failed")
return jsonify({"error": str(e)}), 500
finally:
elapsed = time.time() - start_time
with stats_lock:
stats_data["queries_processed"] += 1
stats_data["total_response_time"] += elapsed
if success:
stats_data["success_count"] += 1
else:
stats_data["failure_count"] += 1
# Track today's queries
if stats_data["last_query_date"] != date.today():
stats_data["todays_queries"] = 1
stats_data["last_query_date"] = date.today()
else:
stats_data["todays_queries"] += 1
if __name__ == "__main__":
logging.info("Starting Public MCP server on port 8001")
app.run(host="0.0.0.0", port=8001)