sse.py•1.11 kB
import json
import time
import uuid
from typing import Mapping
from dify_plugin import Endpoint
from werkzeug import Request, Response
def create_sse_message(event, data):
return f"event: {event}\ndata: {json.dumps(data) if isinstance(data, (dict, list)) else data}\n\n"
class SSEEndpoint(Endpoint):
def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
"""
Invokes the endpoint with the given request.
"""
session_id = str(uuid.uuid4()).replace("-", "")
def generate():
endpoint = f"messages/?session_id={session_id}"
yield create_sse_message("endpoint", endpoint)
while True:
if self.session.storage.exist(session_id):
message = self.session.storage.get(session_id)
message = message.decode()
self.session.storage.delete(session_id)
yield create_sse_message("message", message)
time.sleep(0.5)
return Response(generate(), status=200, content_type="text/event-stream")