Skip to main content
Glama

Joern MCP Server

by sfncat
server.py3.52 kB
import json import sys import os import re import time from typing import Dict, Any import requests from fastmcp import FastMCP from dotenv import load_dotenv from common_tools import * load_dotenv() def load_server_config(config_file: str = "mcp_settings.json") -> Dict[str, Any]: """load server config from config file""" try: with open(config_file, 'r') as f: config = json.load(f) return config.get('mcpServers').get('joern').get('config') except FileNotFoundError: print(f"config file {config_file} not exist") return {} except json.JSONDecodeError: print(f"config file {config_file} format error") return {} joern_config = load_server_config() # server_endpoint = f'{os.getenv("HOST")}:{os.getenv("PORT")}' server_endpoint = f'{joern_config.get('host')}:{joern_config.get("port")}' log_level = joern_config.get('log_level', 'ERROR') joern_mcp = FastMCP("joern-mcp", log_level=log_level) # joern_mcp._tool_manager. print(server_endpoint) basic_auth = (os.getenv("JOERN_AUTH_USERNAME"), os.getenv("JOERN_AUTH_PASSWORD")) timeout = int(joern_config.get('timeout', '300')) def joern_remote(query): """ Execute remote query and return results Parameters: query -- The query string to execute Returns: Returns the server response stdout content on success Returns None on failure, error message will be output to stderr """ data = {"query": query} headers = {'Content-Type': 'application/json'} try: response = requests.post( f'http://{server_endpoint}/query-sync', data=json.dumps(data), headers=headers, auth=basic_auth, timeout=timeout ) response.raise_for_status() result = response.json() return remove_ansi_escape_sequences(result.get('stdout', '')) except requests.exceptions.RequestException as e: sys.stderr.write(f"Request Error: {str(e)}\n") except json.JSONDecodeError: sys.stderr.write("Error: Invalid JSON response\n") return None @joern_mcp.tool() def get_help(): """Get help information from joern server""" response = joern_remote('help') if response: return response else: return 'Query Failed' @joern_mcp.tool() def check_connection() -> str: """Check if the Joern MCP plugin is running""" try: metadata = extract_value(joern_remote("version")) if not metadata: return f"Failed to connect to Joern MCP! Make sure the Joern MCP server is running." return f"Successfully connected to Joern MCP, joern server version is {metadata}" except Exception as e: return f"Failed to connect to Joern MCP! Make sure the Joern MCP server is running." SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) GENERATED_PY = os.path.join(SCRIPT_DIR, "server_tools.py") def generate(): """Generate and execute additional server tools from server_tools.py file. This function reads the content of server_tools.py and executes it to add more functionality to the server. """ with open(GENERATED_PY, "r") as f: code = f.read() exec(compile(code, GENERATED_PY, "exec")) generate() def main(): """Start the MCP server using stdio transport. This is the main entry point for running the Joern MCP server. """ joern_mcp.run(transport="stdio") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sfncat/mcp-joern'

If you have feedback or need assistance with the MCP directory API, please join our Discord server