Skip to main content
Glama
bus.py1.25 kB
import time from typing import List import can from .models import Frame def make_bus(interface: str, channel: str) -> can.BusABC: # ThreadSafeBus provides thread-safe send; for reading, regular interface is fine try: return can.ThreadSafeBus(interface=interface, channel=channel) except Exception: # Fallback to standard Bus if ThreadSafeBus not available or fails return can.interface.Bus(interface=interface, channel=channel) # type: ignore[arg-type] def read_frames(bus: can.BusABC, duration_s: float = 1.0) -> List[Frame]: end = time.time() + duration_s frames: List[Frame] = [] while time.time() < end: msg = bus.recv(timeout=0.1) if msg: frames.append( Frame( timestamp=msg.timestamp, arbitration_id=msg.arbitration_id, data=bytes(msg.data), ) ) return frames def shutdown_bus(bus: can.BusABC) -> None: """Attempt to cleanly shutdown a python-can bus; swallow errors for mock buses.""" shutdown_fn = getattr(bus, "shutdown", None) if callable(shutdown_fn): try: shutdown_fn() except Exception: pass

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/farzadnadiri/mcp-ecu'

If you have feedback or need assistance with the MCP directory API, please join our Discord server