get_collection_rules
Retrieve community and curated threat detection rules from Google Threat Intelligence to analyze specific malware collections and identify security threats.
Instructions
Retrieve top N community rules and all curated hunting rules for a specific collection.
Note:
The rule_types argument filters the types of rules returned. Available types are:
'crowdsourced_ids'
'crowdsourced_sigma'
'crowdsourced_yara'
'curated_yara_rule' If
rule_typesis not provided, all types are returned.
Example:
rule_types=['crowdsourced_yara']: Only crowdsourced YARA rules.rule_types=['crowdsourced_ids', 'curated_yara_rule']: Crowdsourced IDS and curated YARA rules.
Args: collection_id (required): The ID of the collection. top_n (optional): The number of top community rules to return from each category. Defaults to 4. rule_types (optional): List of rule types to fetch.
Returns: A list of dictionaries, where each dictionary contains a rule and its metadata, or an error dictionary.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| collection_id | Yes | ||
| top_n | No | ||
| rule_types | No | ||
| api_key | No |
Implementation Reference
- gti_mcp/tools/collections.py:717-839 (handler)Main handler function for the get_collection_rules tool. This function retrieves top N community rules and all curated hunting rules for a specific collection. It handles different rule types (crowdsourced_ids, crowdsourced_sigma, crowdsourced_yara, curated_yara_rule) and fetches detailed rule content for each type.
@server.tool() async def get_collection_rules(collection_id: str, ctx: Context, top_n: int = 4, rule_types: typing.List[str] = None, api_key: str = None) -> typing.Union[typing.List[typing.Dict[str, typing.Any]], typing.Dict[str, str]]: """Retrieve top N community rules and all curated hunting rules for a specific collection. Note: The `rule_types` argument filters the types of rules returned. Available types are: - 'crowdsourced_ids' - 'crowdsourced_sigma' - 'crowdsourced_yara' - 'curated_yara_rule' If `rule_types` is not provided, all types are returned. Example: - `rule_types=['crowdsourced_yara']`: Only crowdsourced YARA rules. - `rule_types=['crowdsourced_ids', 'curated_yara_rule']`: Crowdsourced IDS and curated YARA rules. Args: collection_id (required): The ID of the collection. top_n (optional): The number of top community rules to return from each category. Defaults to 4. rule_types (optional): List of rule types to fetch. Returns: A list of dictionaries, where each dictionary contains a rule and its metadata, or an error dictionary. """ crowsourced_rules = [] if rule_types is None: rule_types = ["crowdsourced_ids", "crowdsourced_sigma", "crowdsourced_yara", "curated_yara_rule"] rule_keys_map = { "crowdsourced_yara_results": "crowdsourced_yara", "crowdsourced_sigma_results": "crowdsourced_sigma", "crowdsourced_ids_results": "crowdsourced_ids", } # Fetch community rules from aggregations if requested if any(rt in rule_types for rt in ["crowdsourced_ids", "crowdsourced_sigma", "crowdsourced_yara"]): try: async with vt_client(ctx, api_key=api_key) as client: data = await client.get_async(f"/collections/{collection_id}?attributes=aggregations") data = await data.json_async() files_aggregations = data.get("data", {}).get("attributes", {}).get("aggregations", {}).get("files", {}) if files_aggregations: # Iterate through different community rule types for key, rule_type in rule_keys_map.items(): rules = files_aggregations.get(key, []) if rule_type not in rule_type and not rules: continue # Sort rules by count and take the top N sorted_rules = sorted(rules, key=lambda x: x.get("count", 0), reverse=True) top_rules = sorted_rules[:top_n] # Fetch detailed rule content for each type for rule in top_rules: if key == "crowdsourced_yara_results": rule_details = await _get_yara_rule_details(ctx, rule, rule_type, api_key=api_key) if "error" not in rule_details: crowsourced_rules.append(rule_details) elif key == "crowdsourced_sigma_results": rule_details = await _get_sigma_rule_details(ctx, rule, rule_type, api_key=api_key) if "error" not in rule_details: crowsourced_rules.append(rule_details) else: # IDS rules rule_value = rule.get("value", {}) crowsourced_rules.append({ "rule_id": rule.get("id", ""), "rule_name": rule_value.get("message", ""), "rule_source": rule_value.get("url", ""), "rule_content": rule_value.get("rule", ""), "count" : rule.get("count", 0), "rule_type": rule_type }) except Exception as e: logging.exception("Error fetching community rules aggregations: %s", e) # Continue execution to fetch other rule types crowsourced_rules = sorted(crowsourced_rules, key=lambda x: x.get("count", 0), reverse=True) curated_rules = [] # Fetch curated hunting rulesets if requested if "curated_yara_rule" in rule_types: try: async with vt_client(ctx, api_key=api_key) as client: # 1. Get related hunting ruleset IDs from the collection related_rulesets_resp = await client.get_async(f"/collections/{collection_id}/hunting_rulesets") related_rulesets_data = await related_rulesets_resp.json_async() related_rulesets = related_rulesets_data.get("data", []) # Iterate through each related ruleset for ruleset in related_rulesets: ruleset_id = ruleset.get("id", None) if not ruleset_id: continue try: # 2. Get the full hunting ruleset object for each ID. async with vt_client(ctx) as client: ruleset_resp = await client.get_async(f"/intelligence/hunting_rulesets/{ruleset_id}") ruleset_data = await ruleset_resp.json_async() except Exception as e: logging.exception("Error processing rule: %s", e) continue attributes = ruleset_data.get("data", {}).get("attributes", {}) rules = attributes.get("rules", "") rule_names = attributes.get("rule_names", []) n_rules = attributes.get("number_of_rules", 0) # Append each rule to the curated_rules list if n_rules == 1: curated_rules.append({ "rule_type": "curated_yara_rule", "rule_name": rule_names[0], "rule_content": rules, }) else: for i in range(n_rules): curated_rules.append({ "rule_type": "curated_yara_rule", "rule_name": rule_names[i], "rule_content": rules[i], }) except Exception as e: logging.exception("Error fetching curated rules: %s", e) all_rules = curated_rules + crowsourced_rules return utils.sanitize_response(all_rules) - gti_mcp/tools/collections.py:663-688 (helper)Helper function _get_yara_rule_details that fetches details for a single YARA ruleset and formats the output. It retrieves the ruleset from the VirusTotal API and returns structured data including rule_id, rule_name, rule_source, and rule_content.
async def _get_yara_rule_details(ctx: Context, rule: dict, rule_type: str, api_key: str = None) -> typing.Dict[str, typing.Any]: """Fetches details for a single YARA ruleset and formats the output.""" ruleset_id = rule.get("value",{}).get("ruleset_id", None) if not ruleset_id: return {"error": f"No ruleset_id found in rule"} try: async with vt_client(ctx, api_key=api_key) as client: ruleset_resp = await client.get_async(f"/yara_rulesets/{ruleset_id}") ruleset_data = await ruleset_resp.json_async() ruleset_data = ruleset_data.get("data", {}) ruleset_attributes = ruleset_data.get("attributes", {}) if ruleset_data and ruleset_attributes: return { "rule_id": ruleset_data.get("id"), "rule_name": ruleset_attributes.get("name", ""), "rule_source": ruleset_attributes.get("source", ""), "rule_content": ruleset_attributes.get("rules", ""), "count" : rule.get("count", 0), "rule_type": rule_type } return {"error": f"No data found for YARA ruleset {ruleset_id}"} except Exception as e: logging.exception("Error fetching YARA ruleset %s: %s", ruleset_id, e) return {"error": f"Error fetching YARA ruleset {ruleset_id}: {e}"} - gti_mcp/tools/collections.py:690-715 (helper)Helper function _get_sigma_rule_details that fetches details for a single Sigma ruleset and formats the output. It retrieves the Sigma ruleset from the VirusTotal API and returns structured data including rule_id, rule_name, rule_source, and rule_content.
async def _get_sigma_rule_details(ctx: Context, rule: dict, rule_type: str, api_key: str = None) -> typing.Dict[str, typing.Any]: """Fetches details for a single Sigma ruleset and formats the output.""" ruleset_id = rule.get("value",{}).get("id", None) if not ruleset_id: return {"error": f"No ruleset_id found in rule"} try: async with vt_client(ctx, api_key=api_key) as client: ruleset_resp = await client.get_async(f"/sigma_rules/{ruleset_id}") ruleset_data = await ruleset_resp.json_async() ruleset_data = ruleset_data.get("data", {}) ruleset_attributes = ruleset_data.get("attributes", {}) if ruleset_data and ruleset_attributes: return { "rule_id": ruleset_data.get("id", ""), "rule_name": rule.get("value", {}).get("title", ""), "rule_source": ruleset_attributes.get("source_url", ""), "rule_content": ruleset_attributes.get("rule", ""), "count" : rule.get("count", 0), "rule_type": rule_type } return {"error": f"No data found for Sigma ruleset {ruleset_id}"} except Exception as e: logging.exception("Error fetching Sigma ruleset %s: %s", ruleset_id, e) return {"error": f"Error fetching Sigma ruleset {ruleset_id}: {e}"} - gti_mcp/tools/collections.py:717-740 (schema)Schema definition for the get_collection_rules tool through the @server.tool() decorator, function signature type hints, and docstring. Defines parameters (collection_id, ctx, top_n, rule_types, api_key) and return type documentation.
@server.tool() async def get_collection_rules(collection_id: str, ctx: Context, top_n: int = 4, rule_types: typing.List[str] = None, api_key: str = None) -> typing.Union[typing.List[typing.Dict[str, typing.Any]], typing.Dict[str, str]]: """Retrieve top N community rules and all curated hunting rules for a specific collection. Note: The `rule_types` argument filters the types of rules returned. Available types are: - 'crowdsourced_ids' - 'crowdsourced_sigma' - 'crowdsourced_yara' - 'curated_yara_rule' If `rule_types` is not provided, all types are returned. Example: - `rule_types=['crowdsourced_yara']`: Only crowdsourced YARA rules. - `rule_types=['crowdsourced_ids', 'curated_yara_rule']`: Crowdsourced IDS and curated YARA rules. Args: collection_id (required): The ID of the collection. top_n (optional): The number of top community rules to return from each category. Defaults to 4. rule_types (optional): List of rule types to fetch. Returns: A list of dictionaries, where each dictionary contains a rule and its metadata, or an error dictionary. """ - gti_mcp/server.py:67-73 (registration)Server setup and tool registration. The FastMCP server instance is created at line 67-70, and all tools (including get_collection_rules) are automatically registered through the import at line 73 when the @server.tool() decorator is applied.
server = FastMCP( "Google Threat Intelligence MCP server", dependencies=["vt-py"], stateless_http=stateless) # Load tools. from gti_mcp.tools import *