OSM PostgreSQL Server
by wiseman
- osm-mcp
- mcp_osm
import json
import logging
import os
import queue
import socket
import sys
import threading
import time
import io
import base64
from contextlib import redirect_stdout
from unittest import mock
_log = logging.getLogger('werkzeug')
_log.setLevel(logging.WARNING)
# Redirect all Flask/Werkzeug logging to stderr
for handler in _log.handlers:
handler.setStream(sys.stderr)
from flask import (
Flask,
Response,
jsonify,
render_template,
request,
send_from_directory,
)
# Redirect all logging to stderr.
logging.basicConfig(stream=sys.stderr)
# Create a logger for this module
logger = logging.getLogger(__name__)
class FlaskServer:
def __init__(self, host="127.0.0.1", port=5000):
self.host = host
self.port = port
self.app = Flask(__name__,
template_folder=os.path.join(os.path.dirname(os.path.dirname(__file__)), "templates"),
static_folder=os.path.join(os.path.dirname(os.path.dirname(__file__)), "static"))
# Capture and redirect Flask's initialization output to stderr
with io.StringIO() as buf, redirect_stdout(buf):
self.setup_routes()
output = buf.getvalue()
if output:
logger.info(output.strip())
self.server_thread = None
self.clients = {} # Maps client_id to message queue
self.client_counter = 0
self.current_view = {
"center": [0, 0],
"zoom": 2,
"bounds": [[-85, -180], [85, 180]]
}
self.sse_clients = {} # Changed from list to dict to store queues
self.latest_screenshot = None # Store the latest screenshot
# Add storage for geolocate requests and responses
self.geolocate_requests = {}
self.geolocate_responses = {}
def setup_routes(self):
@self.app.route("/")
def index():
return render_template("index.html")
@self.app.route("/static/<path:path>")
def send_static(path):
return send_from_directory("static", path)
@self.app.route("/api/sse")
def sse():
def event_stream(client_id):
# Create a queue for this client
client_queue = queue.Queue()
self.sse_clients[client_id] = client_queue
try:
# Initial connection message
yield 'data: {"type": "connected", "id": %d}\n\n' % client_id
while True:
try:
# Try to get a message from the queue with a timeout
message = client_queue.get(timeout=30)
yield f"data: {message}\n\n"
except queue.Empty:
# No message received in timeout period, send a ping
yield 'data: {"type": "ping"}\n\n'
except GeneratorExit:
# Client disconnected
if client_id in self.sse_clients:
del self.sse_clients[client_id]
logger.info(
f"Client {client_id} disconnected, {len(self.sse_clients)} clients remaining"
)
# Generate a unique ID for this client
client_id = int(time.time() * 1000) % 1000000
return Response(event_stream(client_id), mimetype="text/event-stream")
@self.app.route("/api/viewChanged", methods=["POST"])
def view_changed():
data = request.json
if data:
if "center" in data:
self.current_view["center"] = data["center"]
if "zoom" in data:
self.current_view["zoom"] = data["zoom"]
if "bounds" in data:
self.current_view["bounds"] = data["bounds"]
return jsonify({"status": "success"})
@self.app.route("/api/screenshot", methods=["POST"])
def save_screenshot():
data = request.json
if data and "image" in data:
# Store the base64 image data
self.latest_screenshot = data["image"]
return jsonify({"status": "success"})
return jsonify({"status": "error", "message": "No image data provided"}), 400
@self.app.route("/api/geolocateResponse", methods=["POST"])
def geolocate_response():
data = request.json
if data and "requestId" in data and "results" in data:
request_id = data["requestId"]
results = data["results"]
# Store the response
self.geolocate_responses[request_id] = results
logger.info(f"Received geolocate response for request {request_id} with {len(results)} results")
return jsonify({"status": "success"})
return jsonify({"status": "error", "message": "Invalid geolocate response data"}), 400
def is_port_in_use(self, port):
"""Check if a port is already in use"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex((self.host, port)) == 0
def start(self):
"""Start the Flask server in a separate thread"""
# Try up to 10 ports, starting with self.port
original_port = self.port
max_attempts = 10
for attempt in range(max_attempts):
if self.is_port_in_use(self.port):
logger.info(f"Port {self.port} is already in use, trying port {self.port + 1}")
self.port += 1
if attempt == max_attempts - 1:
logger.error(f"Failed to find an available port after {max_attempts} attempts")
# Reset port to original value
self.port = original_port
return False
else:
# Port is available, start the server
def run_server():
# Redirect stdout to stderr while running Flask
with redirect_stdout(sys.stderr):
self.app.run(
host=self.host, port=self.port, debug=False, use_reloader=False
)
self.server_thread = threading.Thread(target=run_server)
self.server_thread.daemon = True # Thread will exit when main thread exits
self.server_thread.start()
logger.info(f"Flask server started at http://{self.host}:{self.port}")
return True
return False
def stop(self):
"""Stop the Flask server"""
# Flask doesn't provide a clean way to stop the server from outside
# In a production environment, you would use a more robust server like gunicorn
# For this example, we'll rely on the daemon thread to exit when the main thread exits
logger.info("Flask server stopping...")
# Map control methods
def send_map_command(self, command_type, data):
"""
Send a command to all connected SSE clients
Args:
command_type (str): Type of command (SHOW_POLYGON, SHOW_MARKER, SET_VIEW)
data (dict): Data for the command
"""
command = {"type": command_type, "data": data}
message = json.dumps(command)
# Send the message to all connected clients
clients_count = len(self.sse_clients)
if clients_count == 0:
logger.info("No connected clients to send message to")
return
logger.info(f"Sending {command_type} to {clients_count} clients")
for client_id, client_queue in list(self.sse_clients.items()):
try:
client_queue.put(message)
except Exception as e:
logger.error(f"Error sending to client {client_id}: {e}")
def show_polygon(self, coordinates, options=None):
"""
Display a polygon on the map
Args:
coordinates (list): List of [lat, lng] coordinates
options (dict, optional): Styling options
"""
data = {"coordinates": coordinates, "options": options or {}}
self.send_map_command("SHOW_POLYGON", data)
def show_marker(self, coordinates, text=None, options=None):
"""
Display a marker on the map
Args:
coordinates (list): [lat, lng] coordinates
text (str, optional): Popup text
options (dict, optional): Styling options
"""
data = {"coordinates": coordinates, "text": text, "options": options or {}}
self.send_map_command("SHOW_MARKER", data)
def show_line(self, coordinates, options=None):
"""
Display a line (polyline) on the map
Args:
coordinates (list): List of [lat, lng] coordinates
options (dict, optional): Styling options
"""
data = {"coordinates": coordinates, "options": options or {}}
self.send_map_command("SHOW_LINE", data)
def set_view(self, bounds=None, center=None, zoom=None):
"""
Set the map view
Args:
bounds (list, optional): [[south, west], [north, east]]
center (list, optional): [lat, lng] center point
zoom (int, optional): Zoom level
"""
data = {}
if bounds:
data["bounds"] = bounds
if center:
data["center"] = center
if zoom:
data["zoom"] = zoom
self.send_map_command("SET_VIEW", data)
def get_current_view(self):
"""
Get the current map view
Returns:
dict: Current view information
"""
return self.current_view
def set_title(self, title, options=None):
"""
Set the map title displayed at the bottom right of the map
Args:
title (str): Title text to display
options (dict, optional): Styling options like fontSize, color, etc.
"""
data = {"title": title, "options": options or {}}
self.send_map_command("SET_TITLE", data)
def capture_screenshot(self):
"""
Request a screenshot from the map and wait for it to be received
Returns:
str: Base64-encoded image data, or None if no screenshot is available
"""
# Send command to capture screenshot
self.send_map_command("CAPTURE_SCREENSHOT", {})
# Wait for the screenshot to be received (with timeout)
start_time = time.time()
timeout = 5 # seconds
while time.time() - start_time < timeout:
if self.latest_screenshot:
screenshot = self.latest_screenshot
self.latest_screenshot = None # Clear after retrieving
return screenshot
time.sleep(0.1)
logger.warning("Screenshot capture timed out")
return None
def geolocate(self, query):
"""
Send a geolocate request to the web client and wait for the response
Args:
query (str): The location name to search for
Returns:
list: Nominatim search results or None if the request times out
"""
# Generate a unique request ID
request_id = str(int(time.time() * 1000))
# Send the geolocate command to the web client
data = {"requestId": request_id, "query": query}
self.send_map_command("GEOLOCATE", data)
# Wait for the response (with timeout)
start_time = time.time()
timeout = 10 # seconds
while time.time() - start_time < timeout:
if request_id in self.geolocate_responses:
results = self.geolocate_responses.pop(request_id)
return results
time.sleep(0.1)
logger.warning(f"Geolocate request for '{query}' timed out")
return None
# For testing the Flask server directly
if __name__ == "__main__":
server = FlaskServer()
server.start()
# Keep the main thread running
try:
logger.info("Press Ctrl+C to stop the server")
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
logger.info("Server stopped")