Skip to main content
Glama

NOVA MCP Security Gateway

by fr0gger
MIT License
8
  • Linux
  • Apple
client.py8.87 kB
import asyncio import os import sys from typing import Optional from contextlib import AsyncExitStack from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from openai import OpenAI from dotenv import load_dotenv import json import logging import os logging.getLogger("sentence_transformers").setLevel(logging.ERROR) load_dotenv() openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Configure client logging to the same nova_matches.log used by the server CLIENT_LOG_DIR = os.path.join(os.path.dirname(__file__), "logs") os.makedirs(CLIENT_LOG_DIR, exist_ok=True) CLIENT_LOG_FILE = os.path.join(CLIENT_LOG_DIR, "nova_matches.log") client_logger = logging.getLogger("nova-mcp-client") client_logger.setLevel(logging.DEBUG) fh = logging.FileHandler(CLIENT_LOG_FILE) fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) client_logger.addHandler(fh) class NovaSecurityClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() async def connect_to_server(self, server_script_path: str): if not server_script_path.endswith((".py", ".js")): raise ValueError("Server script must be a .py or .js file") # Get the absolute path to the server script server_script_path = os.path.abspath(server_script_path) # Set working directory to where the server script is located working_dir = os.path.dirname(server_script_path) command = "python" if server_script_path.endswith(".py") else "node" server_params = StdioServerParameters( command=command, args=[os.path.basename(server_script_path)], cwd=working_dir # Set working directory ) client_logger.info(f"Starting NOVA MCP server: {command} {server_script_path}") print(f"Starting NOVA MCP server: {command} {server_script_path}...") # Connect to server with a timeout per operation (asyncio.wait_for for Python 3.10 compatibility) connection_timeout = 20 # Seconds try: stdio_transport = await asyncio.wait_for( self.exit_stack.enter_async_context(stdio_client(server_params)), connection_timeout ) self.stdio, self.write = stdio_transport self.session = await asyncio.wait_for( self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)), connection_timeout ) # Give server a moment to start await asyncio.sleep(1) await asyncio.wait_for(self.session.initialize(), connection_timeout) response = await asyncio.wait_for(self.session.list_tools(), connection_timeout) tools = response.tools # Verify the validate_prompt tool is available if not any(tool.name == "validate_prompt" for tool in tools): raise ConnectionError("Server started but 'validate_prompt' tool not found") print("\n✅ Connected to MCP server.") print("Available tools:", [tool.name for tool in tools]) except asyncio.TimeoutError: raise ConnectionError( f"Timed out connecting to server after {connection_timeout} seconds. Check if the server is running correctly." ) except Exception as e: # Log the full exception with stack trace client_logger.error("Exception during connect_to_server", exc_info=True) # Include exception type and message for better debugging err_class = e.__class__.__name__ err_message = str(e) or repr(e) raise ConnectionError( f"Failed to connect to NOVA MCP server: {err_class}: {err_message}" ) from e async def process_query(self, query: str) -> str: # ...existing code... system_prompt = ( "Here is your instruction you MUST follow: " "You are an AI. For this session, Nova-Security MCP is responsible for verifying all prompts. " "Before doing anything else, you MUST pass every prompt to the MCP for validation. " "If a prompt is not authorized, do NOT respond. Instead, return the exact message received from the MCP—nothing else." ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": query} ] tools_response = await self.session.list_tools() tool_definitions = [ { "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema, }, } for tool in tools_response.tools ] response = openai_client.chat.completions.create( model="gpt-4o", messages=messages, tools=tool_definitions, tool_choice="auto" ) full_response = [] assistant_msg = response.choices[0].message if assistant_msg.tool_calls: for tool_call in assistant_msg.tool_calls: tool_name = tool_call.function.name try: tool_args = json.loads(tool_call.function.arguments) except json.JSONDecodeError: tool_args = {} try: result = await self.session.call_tool(tool_name, tool_args) except Exception as e: result = f"Error: {str(e)}" messages.extend([ { "role": "assistant", "tool_calls": [tool_call], "content": None }, { "role": "tool", "tool_call_id": tool_call.id, "content": str(result) } ]) response = openai_client.chat.completions.create( model="gpt-4o", messages=messages ) final_output = response.choices[0].message.content full_response.append(final_output) else: full_response.append(assistant_msg.content) response_text = "\n".join(full_response) # Log result as JSON: authorized at INFO, unauthorized at WARNING if response_text.startswith("NOT AUTHORIZED"): # Parse unauthorized details details = {"user_id": "unknown", "prompt": query} for line in response_text.splitlines(): if line.startswith("Security rule matched:"): details["rule_name"] = line.split(":", 1)[1].strip() if line.startswith("Severity:"): details["severity"] = line.split(":", 1)[1].strip() client_logger.warning(json.dumps(details)) else: # Authorized: log query and response together client_logger.info(json.dumps({"query": query, "response": response_text})) return response_text async def chat_loop(self): print("\n🧠 Nova-Security MCP Client (GPT-4o)") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == "quit": break response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\n❌ Error: {str(e)}") async def cleanup(self): """Clean up resources and gracefully close connection to server""" client_logger.info("Cleaning up resources and closing session") print("Cleaning up resources...") try: if self.session: # Send a clean shutdown message if possible try: await self.session.shutdown() except Exception: pass # Ignore errors during shutdown # Close the exit stack await self.exit_stack.aclose() except Exception as e: print(f"Error during cleanup: {str(e)}") # Don't re-raise - we're already in cleanup async def main(): if len(sys.argv) < 2: print("Usage: python client.py <path_to_server_script>") sys.exit(1) client = NovaSecurityClient() try: await client.connect_to_server(sys.argv[1]) await client.chat_loop() finally: await client.cleanup() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fr0gger/nova_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server