Skip to main content
Glama
googleSandy

Google Threat Intelligence MCP Server

by googleSandy

get_collection_rules

Retrieve community and curated threat detection rules from Google Threat Intelligence to analyze specific malware collections and identify security threats.

Instructions

Retrieve top N community rules and all curated hunting rules for a specific collection.

Note: The rule_types argument filters the types of rules returned. Available types are:

  • 'crowdsourced_ids'

  • 'crowdsourced_sigma'

  • 'crowdsourced_yara'

  • 'curated_yara_rule' If rule_types is not provided, all types are returned.

Example:

  • rule_types=['crowdsourced_yara']: Only crowdsourced YARA rules.

  • rule_types=['crowdsourced_ids', 'curated_yara_rule']: Crowdsourced IDS and curated YARA rules.

Args: collection_id (required): The ID of the collection. top_n (optional): The number of top community rules to return from each category. Defaults to 4. rule_types (optional): List of rule types to fetch.

Returns: A list of dictionaries, where each dictionary contains a rule and its metadata, or an error dictionary.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
collection_idYes
top_nNo
rule_typesNo
api_keyNo

Implementation Reference

  • Main handler function for the get_collection_rules tool. This function retrieves top N community rules and all curated hunting rules for a specific collection. It handles different rule types (crowdsourced_ids, crowdsourced_sigma, crowdsourced_yara, curated_yara_rule) and fetches detailed rule content for each type.
    @server.tool()
    async def get_collection_rules(collection_id: str, ctx: Context, top_n: int = 4, rule_types: typing.List[str] = None, api_key: str = None) -> typing.Union[typing.List[typing.Dict[str, typing.Any]], typing.Dict[str, str]]:
      """Retrieve top N community rules and all curated hunting rules for a specific collection.
    
      Note:
        The `rule_types` argument filters the types of rules returned. Available types are:
        - 'crowdsourced_ids'
        - 'crowdsourced_sigma'
        - 'crowdsourced_yara'
        - 'curated_yara_rule'
        If `rule_types` is not provided, all types are returned.
    
      Example:
        - `rule_types=['crowdsourced_yara']`: Only crowdsourced YARA rules.
        - `rule_types=['crowdsourced_ids', 'curated_yara_rule']`: Crowdsourced IDS and curated YARA rules.
    
      Args:
        collection_id (required): The ID of the collection.
        top_n (optional): The number of top community rules to return from each category. Defaults to 4.
        rule_types (optional): List of rule types to fetch.
    
      Returns:
        A list of dictionaries, where each dictionary contains a rule and its metadata, or an error dictionary.
      """
      crowsourced_rules = []
      if rule_types is None:
        rule_types = ["crowdsourced_ids", "crowdsourced_sigma", "crowdsourced_yara", "curated_yara_rule"]
    
      rule_keys_map = {
          "crowdsourced_yara_results": "crowdsourced_yara",
          "crowdsourced_sigma_results": "crowdsourced_sigma",
          "crowdsourced_ids_results": "crowdsourced_ids",
      }
      # Fetch community rules from aggregations if requested
      if any(rt in rule_types for rt in ["crowdsourced_ids", "crowdsourced_sigma", "crowdsourced_yara"]):
          try:
            async with vt_client(ctx, api_key=api_key) as client:
              data = await client.get_async(f"/collections/{collection_id}?attributes=aggregations")
              data = await data.json_async()
    
            files_aggregations = data.get("data", {}).get("attributes", {}).get("aggregations", {}).get("files", {})
    
            if files_aggregations:
              # Iterate through different community rule types
              for key, rule_type in rule_keys_map.items():
                rules = files_aggregations.get(key, [])
                if rule_type not in rule_type and not rules:
                  continue
    
                # Sort rules by count and take the top N
                sorted_rules = sorted(rules, key=lambda x: x.get("count", 0), reverse=True)
                top_rules = sorted_rules[:top_n]
                # Fetch detailed rule content for each type
                for rule in top_rules:
                  if key == "crowdsourced_yara_results":
                    rule_details = await _get_yara_rule_details(ctx, rule, rule_type, api_key=api_key)
                    if "error" not in rule_details:
                      crowsourced_rules.append(rule_details)
                  elif key == "crowdsourced_sigma_results":
                    rule_details = await _get_sigma_rule_details(ctx, rule, rule_type, api_key=api_key)
                    if "error" not in rule_details:
                      crowsourced_rules.append(rule_details)
                  else: # IDS rules
                    rule_value = rule.get("value", {})
                    crowsourced_rules.append({
                        "rule_id": rule.get("id", ""),
                        "rule_name": rule_value.get("message", ""),
                        "rule_source": rule_value.get("url", ""),
                        "rule_content": rule_value.get("rule", ""),
                        "count" : rule.get("count", 0),
                        "rule_type": rule_type
                      })
          except Exception as e:
            logging.exception("Error fetching community rules aggregations: %s", e)
            # Continue execution to fetch other rule types
      crowsourced_rules = sorted(crowsourced_rules, key=lambda x: x.get("count", 0), reverse=True)
      curated_rules = []
      # Fetch curated hunting rulesets if requested
      if "curated_yara_rule" in rule_types:
        try:
          async with vt_client(ctx, api_key=api_key) as client:
            # 1. Get related hunting ruleset IDs from the collection
            related_rulesets_resp = await client.get_async(f"/collections/{collection_id}/hunting_rulesets")
            related_rulesets_data = await related_rulesets_resp.json_async()
            related_rulesets = related_rulesets_data.get("data", [])
    
          # Iterate through each related ruleset
          for ruleset in related_rulesets:
            ruleset_id = ruleset.get("id", None)
            if not ruleset_id:
              continue
            try:
              # 2. Get the full hunting ruleset object for each ID.
              async with vt_client(ctx) as client:
                ruleset_resp = await client.get_async(f"/intelligence/hunting_rulesets/{ruleset_id}")
                ruleset_data = await ruleset_resp.json_async()
            except Exception as e:
              logging.exception("Error processing rule: %s", e)
              continue
    
            attributes = ruleset_data.get("data", {}).get("attributes", {})
            rules = attributes.get("rules", "")
            rule_names = attributes.get("rule_names", [])
            n_rules = attributes.get("number_of_rules", 0)
    
            # Append each rule to the curated_rules list
            if n_rules == 1:
              curated_rules.append({
                  "rule_type": "curated_yara_rule",
                  "rule_name": rule_names[0],
                  "rule_content": rules,
              })
            else:
              for i in range(n_rules):
                curated_rules.append({
                    "rule_type": "curated_yara_rule",
                    "rule_name": rule_names[i],
                    "rule_content": rules[i],
                })
        except Exception as e:
          logging.exception("Error fetching curated rules: %s", e)
      all_rules = curated_rules + crowsourced_rules
      return utils.sanitize_response(all_rules)
  • Helper function _get_yara_rule_details that fetches details for a single YARA ruleset and formats the output. It retrieves the ruleset from the VirusTotal API and returns structured data including rule_id, rule_name, rule_source, and rule_content.
    async def _get_yara_rule_details(ctx: Context, rule: dict, rule_type: str, api_key: str = None) -> typing.Dict[str, typing.Any]:
      """Fetches details for a single YARA ruleset and formats the output."""
    
      ruleset_id = rule.get("value",{}).get("ruleset_id", None)
      if not ruleset_id:  
        return {"error": f"No ruleset_id found in rule"} 
      
      try:
        async with vt_client(ctx, api_key=api_key) as client:
          ruleset_resp = await client.get_async(f"/yara_rulesets/{ruleset_id}")
          ruleset_data = await ruleset_resp.json_async()
          ruleset_data = ruleset_data.get("data", {})
          ruleset_attributes = ruleset_data.get("attributes", {})
          if ruleset_data and ruleset_attributes:
            return {
                "rule_id": ruleset_data.get("id"),
                "rule_name": ruleset_attributes.get("name", ""),
                "rule_source": ruleset_attributes.get("source", ""),
                "rule_content": ruleset_attributes.get("rules", ""),
                "count" : rule.get("count", 0),
                "rule_type": rule_type
            }
          return {"error": f"No data found for YARA ruleset {ruleset_id}"}
      except Exception as e:
        logging.exception("Error fetching YARA ruleset %s: %s", ruleset_id, e)
        return {"error": f"Error fetching YARA ruleset {ruleset_id}: {e}"}
  • Helper function _get_sigma_rule_details that fetches details for a single Sigma ruleset and formats the output. It retrieves the Sigma ruleset from the VirusTotal API and returns structured data including rule_id, rule_name, rule_source, and rule_content.
    async def _get_sigma_rule_details(ctx: Context, rule: dict, rule_type: str, api_key: str = None) -> typing.Dict[str, typing.Any]:
      """Fetches details for a single Sigma ruleset and formats the output."""
    
      ruleset_id = rule.get("value",{}).get("id", None)
      if not ruleset_id:  
        return {"error": f"No ruleset_id found in rule"} 
      
      try:
        async with vt_client(ctx, api_key=api_key) as client:
          ruleset_resp = await client.get_async(f"/sigma_rules/{ruleset_id}")
          ruleset_data = await ruleset_resp.json_async()
          ruleset_data = ruleset_data.get("data", {})
          ruleset_attributes = ruleset_data.get("attributes", {})
          if ruleset_data and ruleset_attributes:
            return {
                "rule_id": ruleset_data.get("id", ""),
                "rule_name": rule.get("value", {}).get("title", ""),
                "rule_source": ruleset_attributes.get("source_url", ""),
                "rule_content": ruleset_attributes.get("rule", ""),
                "count" : rule.get("count", 0),
                "rule_type": rule_type
            }
          return {"error": f"No data found for Sigma ruleset {ruleset_id}"}
      except Exception as e:
        logging.exception("Error fetching Sigma ruleset %s: %s", ruleset_id, e)
        return {"error": f"Error fetching Sigma ruleset {ruleset_id}: {e}"}
  • Schema definition for the get_collection_rules tool through the @server.tool() decorator, function signature type hints, and docstring. Defines parameters (collection_id, ctx, top_n, rule_types, api_key) and return type documentation.
    @server.tool()
    async def get_collection_rules(collection_id: str, ctx: Context, top_n: int = 4, rule_types: typing.List[str] = None, api_key: str = None) -> typing.Union[typing.List[typing.Dict[str, typing.Any]], typing.Dict[str, str]]:
      """Retrieve top N community rules and all curated hunting rules for a specific collection.
    
      Note:
        The `rule_types` argument filters the types of rules returned. Available types are:
        - 'crowdsourced_ids'
        - 'crowdsourced_sigma'
        - 'crowdsourced_yara'
        - 'curated_yara_rule'
        If `rule_types` is not provided, all types are returned.
    
      Example:
        - `rule_types=['crowdsourced_yara']`: Only crowdsourced YARA rules.
        - `rule_types=['crowdsourced_ids', 'curated_yara_rule']`: Crowdsourced IDS and curated YARA rules.
    
      Args:
        collection_id (required): The ID of the collection.
        top_n (optional): The number of top community rules to return from each category. Defaults to 4.
        rule_types (optional): List of rule types to fetch.
    
      Returns:
        A list of dictionaries, where each dictionary contains a rule and its metadata, or an error dictionary.
      """
  • Server setup and tool registration. The FastMCP server instance is created at line 67-70, and all tools (including get_collection_rules) are automatically registered through the import at line 73 when the @server.tool() decorator is applied.
    server = FastMCP(
        "Google Threat Intelligence MCP server",
        dependencies=["vt-py"],
        stateless_http=stateless)
    
    # Load tools.
    from gti_mcp.tools import *

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/googleSandy/gti-mcp-standalone'

If you have feedback or need assistance with the MCP directory API, please join our Discord server