get_entities_related_to_a_hunting_ruleset
Retrieve files or other entities associated with a specific hunting ruleset to analyze matches and investigate potential threats.
Instructions
Retrieve entities related to the the given Hunting Ruleset.
The following table shows a summary of available relationships for Hunting ruleset objects.
Relationship | Return object type |
hunting_notification_files | Files that matched with the ruleset filters |
Args: ruleset_id (required): Hunting ruleset identifier. relationship_name (required): Relationship name. limit: Limit the number of entities to retrieve. 10 by default. Returns: List of objects related to the Hunting ruleset.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| ruleset_id | Yes | ||
| relationship_name | Yes | ||
| limit | No | ||
| api_key | No |
Implementation Reference
- gti_mcp/tools/intelligence.py:107-138 (handler)Main handler function that retrieves entities related to a hunting ruleset. It validates the relationship_name parameter, uses the vt_client context manager to get a VirusTotal API client, calls fetch_object_relationships to get the data, and returns sanitized results.
async def get_entities_related_to_a_hunting_ruleset( ruleset_id: str, relationship_name: str, ctx: Context, limit: int = 10, api_key: str = None ) -> list[dict[str, typing.Any]]: """Retrieve entities related to the the given Hunting Ruleset. The following table shows a summary of available relationships for Hunting ruleset objects. | Relationship | Return object type | | :------------------- | :------------------------------------------------ | | hunting_notification_files | Files that matched with the ruleset filters | Args: ruleset_id (required): Hunting ruleset identifier. relationship_name (required): Relationship name. limit: Limit the number of entities to retrieve. 10 by default. Returns: List of objects related to the Hunting ruleset. """ if not relationship_name in HUNTING_RULESET_RELATIONSHIPS: return { "error": f"Relationship {relationship_name} does not exist. " f"Available relationships are: {','.join(HUNTING_RULESET_RELATIONSHIPS)}" } async with vt_client(ctx, api_key=api_key) as client: res = await utils.fetch_object_relationships( client, "intelligence/hunting_rulesets", ruleset_id, [relationship_name], limit=limit) return utils.sanitize_response(res.get(relationship_name, [])) - gti_mcp/tools/intelligence.py:22-24 (schema)Schema constant defining valid relationship names for hunting rulesets. Used for validation in the handler.
HUNTING_RULESET_RELATIONSHIPS = [ "hunting_notification_files", ] - gti_mcp/utils.py:87-116 (helper)Helper function that fetches relationship descriptors from a VirusTotal object. Used by the handler to get related entities for a hunting ruleset.
async def fetch_object_relationships( vt_client: vt.Client, resource_collection_type: str, resource_id: str, relationships: typing.List[str], params: dict[str, typing.Any] | None = None, descriptors_only: bool = True, limit: int = 10): """Fetches the given relationships descriptors from the given object.""" rel_futures = {} # If true, returns descriptors instead of full objects. descriptors = '/relationship' if descriptors_only else '' async with asyncio.TaskGroup() as tg: for rel_name in relationships: rel_futures[rel_name] = tg.create_task( consume_vt_iterator( vt_client, f"/{resource_collection_type}/{resource_id}" f"{descriptors}/{rel_name}", params=params, limit=limit)) data = {} for name, items in rel_futures.items(): data[name] = [] for obj in items.result(): obj_dict = obj.to_dict() if 'aggregations' in obj_dict['attributes']: del obj_dict['attributes']['aggregations'] data[name].append(obj_dict) return data - gti_mcp/utils.py:119-138 (helper)Helper function that recursively removes empty dictionaries and lists from API responses. Used by the handler to sanitize the output before returning.
def sanitize_response(data: typing.Any) -> typing.Any: """Removes empty dictionaries and lists recursively from a response.""" if isinstance(data, dict): sanitized_dict = {} for key, value in data.items(): sanitized_value = sanitize_response(value) if sanitized_value is not None: sanitized_dict[key] = sanitized_value return sanitized_dict elif isinstance(data, list): sanitized_list = [] for item in data: sanitized_item = sanitize_response(item) if sanitized_item is not None: sanitized_list.append(sanitized_item) return sanitized_list elif isinstance(data, str): return data if data else None else: return data - gti_mcp/tools/intelligence.py:106-106 (registration)The @server.tool() decorator registers this function as an MCP tool. The server instance is imported from gti_mcp.server and all tools are automatically loaded via the wildcard import in gti_mcp/tools/__init__.py.
@server.tool()