Skip to main content
Glama

baidu-ai-search

Official
by baidubce
sse_util.py8.92 kB
# Copyright (c) 2023 Baidu, Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ SSE Client util """ from appbuilder.utils.logger_util import logger import logging import aiohttp class SSEClient: """ 一个简易的SSE Client,用于接收服务端发送的SSE事件。 """ def __init__(self, event_source, char_enc="utf-8"): """ 通过现有的事件源初始化 SSE 客户端。 事件源应为二进制流,并具有 close() 方法。 这通常是实现 io.BinaryIOBase 的东西,比如 httplib 或 urllib3HTTPResponse 对象。 """ logger.debug(f"Initialized SSE client from event source {event_source}") self._event_source = event_source self._char_enc = char_enc def _read(self): """ 读取传入的事件源流并生成事件块。 不幸的是,有些服务器可能会决定在响应中将事件分解为多个HTTP块。 因此,有必要正确地将连续的响应块缝合在一起,并找到SSE分隔符(空的新行),以生成完整、正确的事件块。 """ data = b"" for chunk in self._event_source: for line in chunk.splitlines(True): data += line if data.endswith((b"\r\r", b"\n\n", b"\r\n\r\n")): yield data data = b"" if data: yield data def events(self): """ 从给定的输入流中读取 Server-Side-Event (SSE) 数据,并生成解析后的 Event 对象。 Args: 无 Returns: generator: 解析后的 Event 对象的生成器。 """ for chunk in self._read(): event = Event() # Split before decoding so splitlines() only uses \r and \n for line in chunk.splitlines(): # Decode the line. line = line.decode(self._char_enc) # Lines starting with a separator are comments and are to be # ignored. if not line.strip() or line.startswith(":"): continue logger.debug(f"raw line: {line}") data = line.split(":", 1) field = data[0] # Ignore unknown fields. if field not in event.__dict__: event.raw += line logger.debug( f"Saw invalid field {field} while parsing Server Side Event" ) continue if len(data) > 1: # From the spec: # "If value starts with a single U+0020 SPACE character, # remove it from value." if data[1].startswith(" "): value = data[1][1:] else: value = data[1] else: # If no value is present after the separator, # assume an empty value. value = "" # The data field may come over multiple lines and their values # are concatenated with each other. if field == "data": event.__dict__[field] += value + "\n" event.raw += value + "\n" else: event.__dict__[field] = value event.raw += value # Events with no data are not dispatched. if not event.data: if event.raw: # unknown error pass else: continue else: # If the data field ends with a newline, remove it. if event.data.endswith("\n"): event.data = event.data[0:-1] # Empty event names default to 'message' event.event = event.event or "message" # Dispatch the event if logger.getEffectiveLevel() == logging.DEBUG: logger.debug(f"Dispatching {event.debug_str}...") else: logger.debug(f"Dispatching {event}...") yield event def close(self): """ 手动关闭事件源流。 """ self._event_source.close() class AsyncSSEClient: """ 一个简易的SSE Client,用于接收服务端发送的SSE事件。 """ def __init__(self, response, char_enc='utf-8'): """ 通过现有的事件源response初始化 SSE 客户端。 response应为aiohttp.ClientResponse实例 """ self._response = response self._char_enc = char_enc async def _read(self): """ 读取传入的事件源流并生成事件块。 """ data = b'' async for chunk in self._response.content.iter_any(): for line in chunk.splitlines(True): data += line if data.endswith((b'\r\r', b'\n\n', b'\r\n\r\n')): yield data data = b'' if data: yield data async def events(self): """ 从给定的输入流中读取 Server-Side-Event (SSE) 数据,并生成解析后的 Event 对象。 Returns: generator: 解析后的 Event 对象的生成器。 """ async for chunk in self._read(): event = Event() # Split before decoding so splitlines() only uses \r and \n for line in chunk.splitlines(): # Decode the line. line = line.decode(self._char_enc) # Lines starting with a separator are comments and are to be ignored. if not line.strip() or line.startswith(':'): continue data = line.split(':', 1) field = data[0] # Ignore unknown fields. if field not in event.__dict__: event.raw += line continue if len(data) > 1: # From the spec: # "If value starts with a single U+0020 SPACE character, # remove it from value." if data[1].startswith(' '): value = data[1][1:] else: value = data[1] else: # If no value is present after the separator, # assume an empty value. value = '' # The data field may come over multiple lines and their values are concatenated with each other. if field == 'data': event.__dict__[field] += value + '\n' event.raw += value + '\n' else: event.__dict__[field] = value event.raw += value # Events with no data are not dispatched. if not event.data: continue # If the data field ends with a newline, remove it. if event.data.endswith('\n'): event.data = event.data[0:-1] # Empty event names default to 'message' event.event = event.event or 'message' yield event class Event(object): """ 事件流中的事件。 """ def __init__(self, id=None, event="message", data="", retry=None): self.id = id self.event = event self.data = data self.retry = retry self.raw = "" def __str__(self): s = f"{self.event} event" if self.id: s += f" #{self.id}" if self.data: s += f", {len(self.data)} byte" else: s += ", no data" if self.retry: s += f", retry in {self.retry} ms" return s @property def debug_str(self): s = f"{self.event} event" if self.id: s += f" #{self.id}" if self.data: s += f", {len(self.data)} byte, DATA<<{self.data}>>" else: s += ", no data" if self.raw: s += f", RAW<<{self.raw}>>" else: s += ", no raw" if self.retry: s += f", retry in {self.retry} ms" return s

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/baidubce/app-builder'

If you have feedback or need assistance with the MCP directory API, please join our Discord server