mcp-vcd
from typing import Dict, Optional
import asyncio
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
server = Server("mcp-vcd")
# Store signal mappings globally since they're constant for a given VCD file
signal_mappings: Dict[str, Dict[str, str]] = {} # file_name -> {signal_name -> char}
async def parse_signal_mappings(file_name: str) -> Dict[str, str]:
"""Parse VCD file to extract signal name to character mappings."""
mappings = {}
try:
with open(file_name, 'r') as f:
for line in f:
if line.startswith('$enddefinitions'):
break
if line.startswith('$var'):
parts = line.split()
if len(parts) >= 5:
char, signal = parts[3], parts[4]
mappings[signal] = char
return mappings
except Exception as e:
raise RuntimeError(f"Failed to parse signal mappings: {str(e)}")
async def get_signal_lines(file_name: str, char: str, start_time: Optional[int] = None, end_time: Optional[int] = None) -> str:
"""Get all lines containing the specified character within the timestamp range."""
try:
matches = []
current_time = 0
with open(file_name, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.rstrip()
# Update current timestamp if we see a timestamp marker
if line.startswith('#'):
try:
current_time = int(line[1:])
except ValueError:
continue
# Skip if we're before start_time
if start_time is not None and current_time < start_time:
continue
# Break if we're past end_time
if end_time is not None and current_time > end_time:
break
# If we're in the desired time range and line contains our character
if char in line:
matches.append(f"{line_num}:{line}")
return '\n'.join(matches)
except Exception as e:
raise RuntimeError(f"Failed to read file: {str(e)}")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools.
Each tool specifies its arguments using JSON Schema validation.
"""
return [
types.Tool(
name="get-signal",
description="Get all instances of a specified signal in a VCD file",
inputSchema={
"type": "object",
"properties": {
"file_name": {
"type": "string",
"description": "Name of the VCD file to analyze",
},
"signal_name": {
"type": "string",
"description": "Name of the signal to search for",
},
"start_time": {
"type": "integer",
"description": "Start timestamp (optional)",
},
"end_time": {
"type": "integer",
"description": "End timestamp (optional)",
},
},
"required": ["file_name", "signal_name"],
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Handle tool execution requests.
Tools can analyze VCD files and return signal information.
"""
if not arguments:
raise ValueError("Missing arguments")
if name == "get-signal":
file_name = arguments.get("file_name")
signal_name = arguments.get("signal_name")
start_time = arguments.get("start_time")
end_time = arguments.get("end_time")
if not file_name or not signal_name:
raise ValueError("Missing required parameters")
try:
# Get or update signal mappings for this file
if file_name not in signal_mappings:
signal_mappings[file_name] = await parse_signal_mappings(file_name)
# Look up the character for this signal
signal_char = signal_mappings[file_name].get(signal_name)
if not signal_char:
return [types.TextContent(
type="text",
text=f"Signal '{signal_name}' not found in VCD file"
)]
# Get all lines containing this character within time range
output = await get_signal_lines(file_name, signal_char, start_time, end_time)
return [types.TextContent(
type="text",
text=output if output else "No match"
)]
except Exception as e:
return [types.TextContent(
type="text",
text=str(e)
)]
else:
raise ValueError(f"Unknown tool: {name}")
async def main():
# Run the server using stdin/stdout streams
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="mcp-vcd",
server_version="0.1.1",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())