Skip to main content
Glama
sse.go3.26 kB
package sse import ( "bufio" "fmt" "io" "net/http" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/sashabaranov/go-openai" "k8s.io/klog/v2" ) func WriteSSE(c *gin.Context, stream io.ReadCloser) { defer func() { if err := stream.Close(); err != nil { // 处理关闭流时的错误 klog.V(6).Infof("stream close error:%v", err) } }() c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.WriteHeader(http.StatusOK) // 逐行读取日志并发送到 Channel reader := bufio.NewReader(stream) for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { break } // 处理读取错误,向客户端发送错误消息 c.SSEvent("error", fmt.Sprintf("Error reading stream: %v", err)) c.Writer.Flush() break } // 发送 SSE 消息 c.SSEvent("message", line) // 刷新输出缓冲区 c.Writer.Flush() } } func WriteSSEWithChannel(c *gin.Context, logCh <-chan string, done chan struct{}) { c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.WriteHeader(http.StatusOK) for { select { case message, ok := <-logCh: if !ok { return } if message == ":heartbeat" { c.SSEvent("heartbeat", "") } else { c.SSEvent("message", message) } c.Writer.Flush() case <-c.Request.Context().Done(): close(done) // 停止数据库查询 return } } } func WriteWebSocketChatCompletionStream(c *gin.Context, stream *openai.ChatCompletionStream) { // 定义 WebSocket 升级器 var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // 允许所有来源 return true }, } // 将 HTTP 连接升级为 WebSocket 连接 conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { klog.Errorf("WebSocket Upgrade Error:%v", err) return } defer conn.Close() klog.V(6).Infof("ws Client connected") defer func() { if err := stream.Close(); err != nil { // 处理关闭流时的错误 klog.V(6).Infof("stream close error:%v", err) } klog.V(6).Infof("stream close ") }() for { response, err := stream.Recv() if err != nil { if err == io.EOF { break } // 处理其他错误 continue } // 发送数据给客户端 conn.WriteJSON(gin.H{ "data": string(response.Choices[0].Delta.Content), }) } } func WriteSSEChatCompletionStream(c *gin.Context, stream *openai.ChatCompletionStream) { defer func() { if err := stream.Close(); err != nil { // 处理关闭流时的错误 klog.V(6).Infof("stream close error:%v", err) } }() c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.WriteHeader(http.StatusOK) for { response, err := stream.Recv() if err != nil { if err == io.EOF { break } // 处理其他错误 continue } // 发送 SSE 消息 c.SSEvent("message", response.Choices[0].Delta.Content) // 刷新输出缓冲区 c.Writer.Flush() } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/weibaohui/k8m'

If you have feedback or need assistance with the MCP directory API, please join our Discord server