Skip to main content
Glama
rules.py14.5 kB
"""YARA rules router for YaraFlux MCP Server. This module provides API routes for YARA rule management, including listing, adding, updating, and deleting rules. """ import logging from datetime import UTC, datetime from typing import List, Optional from fastapi import ( APIRouter, Body, Depends, File, Form, HTTPException, Request, Response, UploadFile, status, ) from yaraflux_mcp_server.auth import get_current_active_user, validate_admin from yaraflux_mcp_server.models import ErrorResponse, User, YaraRuleCreate, YaraRuleMetadata from yaraflux_mcp_server.yara_service import YaraError, yara_service # Configure logging logger = logging.getLogger(__name__) # Create router router = APIRouter( prefix="/rules", tags=["rules"], responses={ 401: {"description": "Unauthorized", "model": ErrorResponse}, 403: {"description": "Forbidden", "model": ErrorResponse}, 404: {"description": "Not Found", "model": ErrorResponse}, 422: {"description": "Validation Error", "model": ErrorResponse}, }, ) # Import MCP tools with safeguards try: from yaraflux_mcp_server.mcp_tools import import_threatflux_rules as import_rules_tool from yaraflux_mcp_server.mcp_tools import validate_yara_rule as validate_rule_tool except Exception as e: logger.error(f"Error importing MCP tools: {str(e)}") # Create fallback functions def validate_rule_tool(content: str): try: # Create a temporary rule name for validation temp_rule_name = f"validate_{int(datetime.now(UTC).timestamp())}.yar" # Validate via direct service call yara_service.add_rule(temp_rule_name, content) yara_service.delete_rule(temp_rule_name) return {"valid": True, "message": "Rule is valid"} except Exception as error: return {"valid": False, "message": str(error)} def import_rules_tool(url: Optional[str] = None): # Simple import implementation url_msg = f" from {url}" if url else "" return {"success": False, "message": f"MCP tools not available for import{url_msg}"} @router.get("/", response_model=List[YaraRuleMetadata]) async def list_rules(source: Optional[str] = None): """List all YARA rules. Args: source: Optional source filter ("custom" or "community") current_user: Current authenticated user Returns: List of YARA rule metadata """ try: rules = yara_service.list_rules(source) return rules except YaraError as error: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(error)) from error @router.get("/{rule_name}", response_model=dict) async def get_rule( rule_name: str, source: Optional[str] = "custom", ): """Get a YARA rule's content and metadata. Args: rule_name: Name of the rule source: Source of the rule ("custom" or "community") current_user: Current authenticated user Returns: Rule content and metadata Raises: HTTPException: If rule not found """ try: # Get rule content content = yara_service.get_rule(rule_name, source) # Find metadata in the list of rules metadata = None rules = yara_service.list_rules(source) for rule in rules: if rule.name == rule_name: metadata = rule break return { "name": rule_name, "source": source, "content": content, "metadata": metadata.model_dump() if metadata else {}, } except YaraError as error: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(error)) from error @router.get("/{rule_name}/raw") async def get_rule_raw( rule_name: str, source: Optional[str] = "custom", ): """Get a YARA rule's raw content as plain text. Args: rule_name: Name of the rule source: Source of the rule ("custom" or "community") current_user: Current authenticated user Returns: Plain text rule content Raises: HTTPException: If rule not found """ try: # Get rule content content = yara_service.get_rule(rule_name, source) # Return as plain text return Response(content=content, media_type="text/plain") except YaraError as error: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(error)) from error @router.post("/", response_model=YaraRuleMetadata) async def create_rule(rule: YaraRuleCreate, current_user: User = Depends(get_current_active_user)): """Create a new YARA rule. Args: rule: Rule to create current_user: Current authenticated user Returns: Metadata of the created rule Raises: HTTPException: If rule creation fails """ try: metadata = yara_service.add_rule(rule.name, rule.content) logger.info(f"Rule {rule.name} created by {current_user.username}") return metadata except YaraError as error: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(error)) from error @router.post("/upload", response_model=YaraRuleMetadata) async def upload_rule( rule_file: UploadFile = File(...), source: str = Form("custom"), current_user: User = Depends(get_current_active_user), ): """Upload a YARA rule file. Args: rule_file: YARA rule file to upload source: Source of the rule ("custom" or "community") current_user: Current authenticated user Returns: Metadata of the uploaded rule Raises: HTTPException: If file upload or rule creation fails """ try: # Read file content content = await rule_file.read() # Get rule name from filename rule_name = rule_file.filename if not rule_name: raise ValueError("Filename is required") # Add rule metadata = yara_service.add_rule(rule_name, content.decode("utf-8"), source) logger.info(f"Rule {rule_name} uploaded by {current_user.username}") return metadata except YaraError as err: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(err)) from err except Exception as error: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(error)) from error @router.put("/{rule_name}", response_model=YaraRuleMetadata) async def update_rule( rule_name: str, content: str = Body(...), source: str = "custom", current_user: User = Depends(get_current_active_user), ): """Update an existing YARA rule. Args: rule_name: Name of the rule content: Updated rule content source: Source of the rule ("custom" or "community") current_user: Current authenticated user Returns: Metadata of the updated rule Raises: HTTPException: If rule update fails """ try: metadata = yara_service.update_rule(rule_name, content, source) logger.info(f"Rule {rule_name} updated by {current_user.username}") return metadata except YaraError as error: if "Rule not found" in str(error): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(error)) from error raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(error)) from error @router.put("/{rule_name}/plain", response_model=YaraRuleMetadata) async def update_rule_plain( rule_name: str, source: str = "custom", content: str = Body(..., media_type="text/plain"), current_user: User = Depends(get_current_active_user), ): """Update an existing YARA rule using plain text. This endpoint accepts the YARA rule as plain text in the request body, making it easier to update YARA rules without having to escape special characters for JSON. Args: rule_name: Name of the rule source: Source of the rule ("custom" or "community") content: Updated YARA rule content as plain text current_user: Current authenticated user Returns: Metadata of the updated rule Raises: HTTPException: If rule update fails """ try: metadata = yara_service.update_rule(rule_name, content, source) logger.info(f"Rule {rule_name} updated by {current_user.username} via plain text endpoint") return metadata except YaraError as error: if "Rule not found" in str(error): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(error)) from error raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(error)) from error @router.delete("/{rule_name}") async def delete_rule(rule_name: str, source: str = "custom", current_user: User = Depends(get_current_active_user)): """Delete a YARA rule. Args: rule_name: Name of the rule source: Source of the rule ("custom" or "community") current_user: Current authenticated user Returns: Success message Raises: HTTPException: If rule deletion fails """ try: result = yara_service.delete_rule(rule_name, source) if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Rule {rule_name} not found in {source}", ) logger.info(f"Rule {rule_name} deleted by {current_user.username}") return {"message": f"Rule {rule_name} deleted"} except YaraError as error: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(error)) from error @router.post("/import") async def import_rules(url: Optional[str] = None, current_user: User = Depends(validate_admin)): """Import ThreatFlux YARA rules from GitHub. Args: url: URL to the GitHub repository current_user: Current authenticated admin user Returns: Import result Raises: HTTPException: If import fails """ try: result = import_rules_tool(url) if not result.get("success"): raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=result.get("message", "Import failed"), ) logger.info(f"Rules imported from {url or 'ThreatFlux repository'} by {current_user.username}") return result except Exception as error: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(error)) from error @router.post("/validate") async def validate_rule(request: Request): """Validate a YARA rule. This endpoint tries to handle both JSON and plain text inputs, with some format detection. For guaranteed reliability, use the /validate/plain endpoint for plain text YARA rules. Args: request: Request object containing the rule content current_user: Current authenticated user Returns: Validation result """ try: # Read content as text content = await request.body() content_str = content.decode("utf-8") # Basic heuristic to detect YARA vs JSON: # If it starts with a curly brace and has line breaks, it might be a YARA rule # If it doesn't look like valid JSON, treat it as a YARA rule if not content_str.strip().startswith("rule"): try: # Try to parse as JSON import json # pylint: disable=import-outside-toplevel json_content = json.loads(content_str) # If it parsed as JSON, check what kind of content it has if isinstance(json_content, str): # It was a JSON string, use that as the content content_str = json_content elif isinstance(json_content, dict) and "content" in json_content: # It was a JSON object with a content field content_str = json_content["content"] except json.JSONDecodeError: # It wasn't valid JSON, assume it's a YARA rule logger.error("Failed to decode JSON content from %s", content_str) # Use the validate_yara_rule MCP tool result = validate_rule_tool(content_str) return result except Exception as error: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(error)) from error @router.post("/validate/plain") async def validate_rule_plain( content: str = Body(..., media_type="text/plain"), ): """Validate a YARA rule submitted as plain text. This endpoint accepts the YARA rule as plain text without requiring JSON formatting. Args: content: YARA rule content to validate as plain text current_user: Current authenticated user Returns: Validation result """ try: # Use the validate_yara_rule MCP tool result = validate_rule_tool(content) return result except Exception as e: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) from e @router.post("/plain", response_model=YaraRuleMetadata) async def create_rule_plain( rule_name: str, source: str = "custom", content: str = Body(..., media_type="text/plain"), current_user: User = Depends(get_current_active_user), ): """Create a new YARA rule using plain text content. This endpoint accepts the YARA rule as plain text in the request body, making it easier to submit YARA rules without having to escape special characters for JSON. Args: rule_name: Name of the rule file (with or without .yar extension) source: Source of the rule ("custom" or "community") content: YARA rule content as plain text current_user: Current authenticated user Returns: Metadata of the created rule Raises: HTTPException: If rule creation fails """ try: metadata = yara_service.add_rule(rule_name, content, source) logger.info(f"Rule {rule_name} created by {current_user.username} via plain text endpoint") return metadata except YaraError as error: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(error)) from error

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreatFlux/YaraFlux'

If you have feedback or need assistance with the MCP directory API, please join our Discord server