Skip to main content
Glama
obd.py3 kB
from __future__ import annotations from typing import List, Optional, Tuple OBD_BROADCAST_ID = 0x7DF OBD_RESPONSE_BASE_ID = 0x7E8 # first ECU response ID def _single_frame(payload: List[int]) -> List[int]: """Build a single-frame ISO-TP message: [len] + payload, padded to 8 bytes.""" length = len(payload) data = [length & 0xFF] + payload while len(data) < 8: data.append(0x00) return data[:8] def build_request(service: int, pid: Optional[int] = None) -> Tuple[int, bytes]: payload: List[int] = [service] if pid is not None: payload.append(pid) data = _single_frame(payload) return (OBD_BROADCAST_ID, bytes(data)) def _supported_mask(pids: List[int]) -> Tuple[int, int, int, int]: """Return 4 bytes bitmask for PIDs 0x01-0x20.""" mask = [0, 0, 0, 0] for pid in pids: if 0x01 <= pid <= 0x20: idx = (pid - 1) // 8 bit = 7 - ((pid - 1) % 8) mask[idx] |= (1 << bit) return (mask[0], mask[1], mask[2], mask[3]) def simulate_response(service: int, pid: Optional[int]) -> Optional[List[int]]: """Return payload bytes (without length) for a given OBD-II request. We implement a small subset as single-frame responses. """ if service == 0x01: if pid == 0x00: a, b, c, d = _supported_mask([0x05, 0x0D, 0x2F, 0x51]) return [0x41, 0x00, a, b, c, d] if pid == 0x05: # Coolant temp = A-40 temp_c = 90 A = temp_c + 40 return [0x41, 0x05, A] if pid == 0x0D: # Speed km/h speed = 50 return [0x41, 0x0D, speed] if pid == 0x2F: # Fuel tank level input % = 100/255 * A level_pct = 50 A = int(round(level_pct * 255 / 100)) return [0x41, 0x2F, A] if pid == 0x51: # Fuel type (1 = gasoline) return [0x41, 0x51, 0x01] if service == 0x03: # DTCs # Return no DTCs return [0x43] if service == 0x09: if pid == 0x00: a, b, c, d = _supported_mask([0x02, 0x0A]) return [0x49, 0x00, a, b, c, d] if pid == 0x0A: # ECU name (ASCII), simple short name in single frame name = b"MCP-ECU" return [0x49, 0x0A] + list(name[:5]) # truncate to fit single-frame demo # VIN (0x02) is multi-frame typically; not supported in this minimal demo return None def parse_request(data: bytes) -> Tuple[int, Optional[int]]: """Parse a single-frame request and return (service, pid).""" if not data: return (0, None) length = data[0] payload = list(data[1:1 + length]) if not payload: return (0, None) service = payload[0] pid = payload[1] if len(payload) > 1 else None return (service, pid) def build_response_frame( payload: List[int], responder_id: int = OBD_RESPONSE_BASE_ID, ) -> Tuple[int, bytes]: data = _single_frame(payload) return (responder_id, bytes(data))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/farzadnadiri/mcp-ecu'

If you have feedback or need assistance with the MCP directory API, please join our Discord server