Skip to main content
Glama

OSM PostgreSQL Server

by wiseman
import json import logging import os import queue import socket import sys import threading import time import io import base64 from contextlib import redirect_stdout from unittest import mock _log = logging.getLogger('werkzeug') _log.setLevel(logging.WARNING) # Redirect all Flask/Werkzeug logging to stderr for handler in _log.handlers: handler.setStream(sys.stderr) from flask import ( Flask, Response, jsonify, render_template, request, send_from_directory, ) # Redirect all logging to stderr. logging.basicConfig(stream=sys.stderr) # Create a logger for this module logger = logging.getLogger(__name__) class FlaskServer: def __init__(self, host="127.0.0.1", port=5000): self.host = host self.port = port self.app = Flask(__name__, template_folder=os.path.join(os.path.dirname(os.path.dirname(__file__)), "templates"), static_folder=os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")) # Capture and redirect Flask's initialization output to stderr with io.StringIO() as buf, redirect_stdout(buf): self.setup_routes() output = buf.getvalue() if output: logger.info(output.strip()) self.server_thread = None self.clients = {} # Maps client_id to message queue self.client_counter = 0 self.current_view = { "center": [0, 0], "zoom": 2, "bounds": [[-85, -180], [85, 180]] } self.sse_clients = {} # Changed from list to dict to store queues self.latest_screenshot = None # Store the latest screenshot # Add storage for geolocate requests and responses self.geolocate_requests = {} self.geolocate_responses = {} def setup_routes(self): @self.app.route("/") def index(): return render_template("index.html") @self.app.route("/static/<path:path>") def send_static(path): return send_from_directory("static", path) @self.app.route("/api/sse") def sse(): def event_stream(client_id): # Create a queue for this client client_queue = queue.Queue() self.sse_clients[client_id] = client_queue try: # Initial connection message yield 'data: {"type": "connected", "id": %d}\n\n' % client_id while True: try: # Try to get a message from the queue with a timeout message = client_queue.get(timeout=30) yield f"data: {message}\n\n" except queue.Empty: # No message received in timeout period, send a ping yield 'data: {"type": "ping"}\n\n' except GeneratorExit: # Client disconnected if client_id in self.sse_clients: del self.sse_clients[client_id] logger.info( f"Client {client_id} disconnected, {len(self.sse_clients)} clients remaining" ) # Generate a unique ID for this client client_id = int(time.time() * 1000) % 1000000 return Response(event_stream(client_id), mimetype="text/event-stream") @self.app.route("/api/viewChanged", methods=["POST"]) def view_changed(): data = request.json if data: if "center" in data: self.current_view["center"] = data["center"] if "zoom" in data: self.current_view["zoom"] = data["zoom"] if "bounds" in data: self.current_view["bounds"] = data["bounds"] return jsonify({"status": "success"}) @self.app.route("/api/screenshot", methods=["POST"]) def save_screenshot(): data = request.json if data and "image" in data: # Store the base64 image data self.latest_screenshot = data["image"] return jsonify({"status": "success"}) return jsonify({"status": "error", "message": "No image data provided"}), 400 @self.app.route("/api/geolocateResponse", methods=["POST"]) def geolocate_response(): data = request.json if data and "requestId" in data and "results" in data: request_id = data["requestId"] results = data["results"] # Store the response self.geolocate_responses[request_id] = results logger.info(f"Received geolocate response for request {request_id} with {len(results)} results") return jsonify({"status": "success"}) return jsonify({"status": "error", "message": "Invalid geolocate response data"}), 400 def is_port_in_use(self, port): """Check if a port is already in use""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex((self.host, port)) == 0 def start(self): """Start the Flask server in a separate thread""" # Try up to 10 ports, starting with self.port original_port = self.port max_attempts = 10 for attempt in range(max_attempts): if self.is_port_in_use(self.port): logger.info(f"Port {self.port} is already in use, trying port {self.port + 1}") self.port += 1 if attempt == max_attempts - 1: logger.error(f"Failed to find an available port after {max_attempts} attempts") # Reset port to original value self.port = original_port return False else: # Port is available, start the server def run_server(): # Redirect stdout to stderr while running Flask with redirect_stdout(sys.stderr): self.app.run( host=self.host, port=self.port, debug=False, use_reloader=False ) self.server_thread = threading.Thread(target=run_server) self.server_thread.daemon = True # Thread will exit when main thread exits self.server_thread.start() logger.info(f"Flask server started at http://{self.host}:{self.port}") return True return False def stop(self): """Stop the Flask server""" # Flask doesn't provide a clean way to stop the server from outside # In a production environment, you would use a more robust server like gunicorn # For this example, we'll rely on the daemon thread to exit when the main thread exits logger.info("Flask server stopping...") # Map control methods def send_map_command(self, command_type, data): """ Send a command to all connected SSE clients Args: command_type (str): Type of command (SHOW_POLYGON, SHOW_MARKER, SET_VIEW) data (dict): Data for the command """ command = {"type": command_type, "data": data} message = json.dumps(command) # Send the message to all connected clients clients_count = len(self.sse_clients) if clients_count == 0: logger.info("No connected clients to send message to") return logger.info(f"Sending {command_type} to {clients_count} clients") for client_id, client_queue in list(self.sse_clients.items()): try: client_queue.put(message) except Exception as e: logger.error(f"Error sending to client {client_id}: {e}") def show_polygon(self, coordinates, options=None): """ Display a polygon on the map Args: coordinates (list): List of [lat, lng] coordinates options (dict, optional): Styling options """ data = {"coordinates": coordinates, "options": options or {}} self.send_map_command("SHOW_POLYGON", data) def show_marker(self, coordinates, text=None, options=None): """ Display a marker on the map Args: coordinates (list): [lat, lng] coordinates text (str, optional): Popup text options (dict, optional): Styling options """ data = {"coordinates": coordinates, "text": text, "options": options or {}} self.send_map_command("SHOW_MARKER", data) def show_line(self, coordinates, options=None): """ Display a line (polyline) on the map Args: coordinates (list): List of [lat, lng] coordinates options (dict, optional): Styling options """ data = {"coordinates": coordinates, "options": options or {}} self.send_map_command("SHOW_LINE", data) def set_view(self, bounds=None, center=None, zoom=None): """ Set the map view Args: bounds (list, optional): [[south, west], [north, east]] center (list, optional): [lat, lng] center point zoom (int, optional): Zoom level """ data = {} if bounds: data["bounds"] = bounds if center: data["center"] = center if zoom: data["zoom"] = zoom self.send_map_command("SET_VIEW", data) def get_current_view(self): """ Get the current map view Returns: dict: Current view information """ return self.current_view def set_title(self, title, options=None): """ Set the map title displayed at the bottom right of the map Args: title (str): Title text to display options (dict, optional): Styling options like fontSize, color, etc. """ data = {"title": title, "options": options or {}} self.send_map_command("SET_TITLE", data) def capture_screenshot(self): """ Request a screenshot from the map and wait for it to be received Returns: str: Base64-encoded image data, or None if no screenshot is available """ # Send command to capture screenshot self.send_map_command("CAPTURE_SCREENSHOT", {}) # Wait for the screenshot to be received (with timeout) start_time = time.time() timeout = 5 # seconds while time.time() - start_time < timeout: if self.latest_screenshot: screenshot = self.latest_screenshot self.latest_screenshot = None # Clear after retrieving return screenshot time.sleep(0.1) logger.warning("Screenshot capture timed out") return None def geolocate(self, query): """ Send a geolocate request to the web client and wait for the response Args: query (str): The location name to search for Returns: list: Nominatim search results or None if the request times out """ # Generate a unique request ID request_id = str(int(time.time() * 1000)) # Send the geolocate command to the web client data = {"requestId": request_id, "query": query} self.send_map_command("GEOLOCATE", data) # Wait for the response (with timeout) start_time = time.time() timeout = 10 # seconds while time.time() - start_time < timeout: if request_id in self.geolocate_responses: results = self.geolocate_responses.pop(request_id) return results time.sleep(0.1) logger.warning(f"Geolocate request for '{query}' timed out") return None # For testing the Flask server directly if __name__ == "__main__": server = FlaskServer() server.start() # Keep the main thread running try: logger.info("Press Ctrl+C to stop the server") import time while True: time.sleep(1) except KeyboardInterrupt: server.stop() logger.info("Server stopped")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wiseman/osm-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server