server_sse.py•4.61 kB
import uvicorn
import uuid
import json
import logging
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from mcp.server.sse import SseServerTransport
from src.server import script_server
from config import HOST, PORT
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Session store
SESSION_STORE = {}
# Example function that will be called and return a response
async def process_data(data):
"""Process the incoming data and return a result"""
logger.info(f"Processing data: {data}")
# This is where you'd put your actual processing logic
result = {
"status": "success",
"processed_data": f"Processed: {data}",
"timestamp": str(uuid.uuid4()),
}
logger.info(f"Processed result: {result}")
return result
def create_app():
sse = SseServerTransport("/scrape")
sse = SseServerTransport("/script")
async def generate_session_id(request):
session_id = str(uuid.uuid4())
SESSION_STORE[session_id] = {"client": request.client.host}
logger.info(
f"Generated session ID: {session_id} for client: {request.client.host}"
)
return JSONResponse({"session_id": session_id})
class HandleSSE:
def __init__(self, sse):
self.sse = sse
async def __call__(self, scope, receive, send):
async with self.sse.connect_sse(scope, receive, send) as streams:
await script_server.run(
streams[0],
streams[1],
script_server.create_initialization_options(),
)
class HandleMessages:
def __init__(self, sse):
self.sse = sse
async def __call__(self, scope, receive, send):
await self.sse.handle_post_message(scope, receive, send)
async def call_function(request):
"""Endpoint to call a function and return the response"""
data = await request.json()
function_name = data.get("function")
function_args = data.get("args", {})
logger.info(
f"Function call received: {function_name} with args: {function_args}"
)
# Function routing logic
if function_name == "process_data":
result = await process_data(function_args)
return JSONResponse(result)
else:
logger.error(f"Unknown function: {function_name}")
return JSONResponse(
{"status": "error", "message": f"Unknown function: {function_name}"},
status_code=400,
)
async def stream_function_results(request):
"""Endpoint to call a function and stream the results via SSE"""
data = await request.json()
session_id = data.get("session_id")
if session_id not in SESSION_STORE:
logger.error(f"Invalid session ID: {session_id}")
return JSONResponse(
{"status": "error", "message": "Invalid session ID"}, status_code=400
)
# Get the client from the session store
client = SESSION_STORE[session_id].get("client")
# Example of sending SSE events with function results
async def event_publisher(sse_sender):
result = await process_data(data.get("args", {}))
event_data = json.dumps(result)
await sse_sender.send(event_data)
# Signal the end of the stream
await sse_sender.send("[DONE]")
return await sse.send_events(request, event_publisher)
routes = [
Route("/session", endpoint=generate_session_id, methods=["GET"]),
Route("/sse", endpoint=HandleSSE(sse), methods=["GET"]),
Route("/scrape", endpoint=HandleMessages(sse), methods=["POST"]),
Route("/call-function", endpoint=call_function, methods=["POST"]),
Route("/stream-function", endpoint=stream_function_results, methods=["POST"]),
Route("/script", endpoint=HandleMessages(sse), methods=["POST"]),
]
middleware = [
Middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
]
return Starlette(routes=routes, middleware=middleware)
if __name__ == "__main__":
app = create_app()
logger.info(f"Starting server at {HOST}:{PORT}")
uvicorn.run(app, host=HOST, port=PORT)