Skip to main content
Glama

WinLog-mcp

by XD3an
main.py8.8 kB
from mcp.server.fastmcp import FastMCP import win32evtlog import win32evtlogutil import datetime import os import traceback import logging import argparse import ctypes parser = argparse.ArgumentParser() parser.add_argument('--storage-path', type=str, default="C:\\logs\\", help='Log storage path') args, unknown = parser.parse_known_args() class Settings: STORAGE_PATH = args.storage_path LOG_NAME = "Microsoft-Windows-Sysmon/Operational" # 預設抓 Sysmon 通道 SOURCE_NAME = "Microsoft-Windows-Sysmon" # 預設只抓 Sysmon 來源 SERVER_NAME = "localhost" SIZE = 10 mcp = FastMCP( describe="Pywin32 win32evtlog for Retrieval Windows Logs" ) @mcp.tool() def ingest_syslog( source_name: str = Settings.SOURCE_NAME, log_name: str = Settings.LOG_NAME, server_name: str = Settings.SERVER_NAME, size: int = Settings.SIZE ): """ Ingest Windows logs Args: source_name (str, optional): event source name. Empty string means no filter (all sources). log_name (str, optional): event log name. Defaults to "Microsoft-Windows-Sysmon/Operational". server_name (str, optional): server. Defaults to "localhost". size (int, optional): number of log lines to return Returns: str: log content """ try: if os.path.isfile(Settings.STORAGE_PATH): return f"ERROR: storage_path points to a file instead of a directory ({Settings.STORAGE_PATH}), please use a directory path." os.makedirs(Settings.STORAGE_PATH, exist_ok=True) timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') safe_log_name = log_name.replace("/", "_").replace("\\", "_") dest_file = os.path.join(Settings.STORAGE_PATH, f"{timestamp}_{safe_log_name}.log") # Check admin privileges if not ctypes.windll.shell32.IsUserAnAdmin(): return "ERROR: Administrator privileges required." if log_name == "Microsoft-Windows-Sysmon/Operational": # Use EvtQuery for Sysmon log flags = win32evtlog.EvtQueryReverseDirection query = f"*[System/Provider/@Name='{source_name or 'Microsoft-Windows-Sysmon'}']" handle = win32evtlog.EvtQuery(log_name, flags, query) events = 1 written = 0 with open(dest_file, "a", encoding="utf-8") as f: while events and written < size: events = win32evtlog.EvtNext(handle, 1) if not events: break for event in events: xml = win32evtlog.EvtRender(event, win32evtlog.EvtRenderEventXml) f.write(xml + "\n") written += 1 if written >= size: break else: # Use OpenEventLog for other logs handle = win32evtlog.OpenEventLog(server_name, log_name) flags = win32evtlog.EVENTLOG_BACKWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ written = 0 with open(dest_file, "a", encoding="utf-8") as f: events = True while events and written < size: events = win32evtlog.ReadEventLog(handle, flags, 0) if not events: break for event in events: # If source_name is empty, do not filter print(event) try: event_record = {} event_record['RecordNumber'] = getattr(event, 'RecordNumber', None) event_record['EventID'] = getattr(event, 'EventID', 0) & 0xFFFF event_record['EventCategory'] = getattr(event, 'EventCategory', None) event_record['EventType'] = getattr(event, 'EventType', None) event_record['SourceName'] = getattr(event, 'SourceName', None) event_record['ComputerName'] = getattr(event, 'ComputerName', None) event_record['Sid'] = str(getattr(event, 'Sid', None)) if getattr(event, 'Sid', None) is not None else None event_record['StringInserts'] = getattr(event, 'StringInserts', None) event_record['Data'] = getattr(event, 'Data', None) event_record['TimeGenerated'] = getattr(event, 'TimeGenerated', None).isoformat() if hasattr(getattr(event, 'TimeGenerated', None), 'isoformat') else str(getattr(event, 'TimeGenerated', None)) event_record['TimeWritten'] = getattr(event, 'TimeWritten', None).isoformat() if hasattr(getattr(event, 'TimeWritten', None), 'isoformat') else str(getattr(event, 'TimeWritten', None)) event_record['Message'] = None try: # Attempt to get the formatted message event_record['Message'] = win32evtlogutil.SafeFormatMessage(event, log_name) except Exception as msg_ex: # Handle message parsing error event_record['Message'] = f"Message parse error: {msg_ex}" f.write(str(event_record) + "\n") except Exception as e: # Handle event parsing error f.write(f"Error parsing event: {e}\n") written += 1 if written >= size: break win32evtlog.CloseEventLog(handle) return dest_file except Exception as e: logging.debug(traceback.format_exc()) return f"ERROR: {e}" @mcp.tool() def query_syslog( timestamp: str, source_name: str = Settings.SOURCE_NAME, size: int = Settings.SIZE ): """ Query Windows logs Args: timestamp (str): the timestamp (YYYY-MM-DD_HH-MM-SS) to filter log files source_name (str, optional): event source name. Defaults to "Microsoft-Windows-Sysmon". size (int, optional): number of log lines to return Returns: str: log content """ files = os.listdir(Settings.STORAGE_PATH) matched_files = [ f for f in files if f.endswith(".log") and timestamp in f ] if not matched_files: return "No log files found matching the timestamp" logs = [] for file in matched_files: with open(os.path.join(Settings.STORAGE_PATH, file), "r", encoding="utf-8") as f: events = f.readlines() for event in events: try: import ast event_dict = ast.literal_eval(event.strip()) if event.strip().startswith('{') else None except Exception: event_dict = None if source_name and event_dict and 'SourceName' in event_dict: # Filter events by source name if event_dict['SourceName'] != source_name: continue logs.append(event.strip()) if not logs: return "No events found in matching log files" return "\n".join(logs[-size:]) @mcp.prompt() def prompt_guide(): return f""" You are a Windows log analyst. Your task is to analyze the Windows log and provide a summary of the events. The log is stored in the following path: ``` {Settings.STORAGE_PATH} ``` You can use the following tools: - ingest_syslog: Ingest Windows log - Args: - source_name (str, optional): event source name. Defaults to "Microsoft-Windows-Sysmon". - size (int, optional): number of log lines to return - Returns: - str: log file path - query_syslog: Query Windows log - Args: - timestamp (str): the timestamp (YYYY-MM-DD_HH-MM-SS) to filter log files - source_name (str, optional): event source name. Defaults to "Microsoft-Windows-Sysmon". - size (int, optional): number of log lines to return - Returns: - str: log file path if you want to ingest the log, use the following prompt: ``` ingest_syslog( source_name="{Settings.SOURCE_NAME}", size={Settings.SIZE} ) ``` if you want to query the log, use the following prompt: ``` query_syslog( timestamp="2025-05-15_14-47-24", source_name="{Settings.SOURCE_NAME}", size={Settings.SIZE} ) ``` """ if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/XD3an/winlog-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server