Skip to main content
Glama
googleSandy

Google Threat Intelligence MCP Server

by googleSandy

get_entities_related_to_a_hunting_ruleset

Retrieve files or other entities associated with a specific hunting ruleset to analyze matches and investigate potential threats.

Instructions

Retrieve entities related to the the given Hunting Ruleset.

The following table shows a summary of available relationships for Hunting ruleset objects.

Relationship

Return object type

hunting_notification_files

Files that matched with the ruleset filters

Args: ruleset_id (required): Hunting ruleset identifier. relationship_name (required): Relationship name. limit: Limit the number of entities to retrieve. 10 by default. Returns: List of objects related to the Hunting ruleset.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
ruleset_idYes
relationship_nameYes
limitNo
api_keyNo

Implementation Reference

  • Main handler function that retrieves entities related to a hunting ruleset. It validates the relationship_name parameter, uses the vt_client context manager to get a VirusTotal API client, calls fetch_object_relationships to get the data, and returns sanitized results.
    async def get_entities_related_to_a_hunting_ruleset(
        ruleset_id: str, relationship_name: str, ctx: Context, limit: int = 10, api_key: str = None
    ) -> list[dict[str, typing.Any]]:
      """Retrieve entities related to the the given Hunting Ruleset.
    
        The following table shows a summary of available relationships for Hunting ruleset objects.
    
        | Relationship         | Return object type                                |
        | :------------------- | :------------------------------------------------ |
        | hunting_notification_files | Files that matched with the ruleset filters |
    
        Args:
          ruleset_id (required): Hunting ruleset identifier.
          relationship_name (required): Relationship name.
          limit: Limit the number of entities to retrieve. 10 by default.
        Returns:
          List of objects related to the Hunting ruleset.
      """
      if not relationship_name in HUNTING_RULESET_RELATIONSHIPS:
          return {
              "error": f"Relationship {relationship_name} does not exist. "
              f"Available relationships are: {','.join(HUNTING_RULESET_RELATIONSHIPS)}"
          }
    
      async with vt_client(ctx, api_key=api_key) as client:
        res = await utils.fetch_object_relationships(
            client,
            "intelligence/hunting_rulesets",
            ruleset_id,
            [relationship_name],
            limit=limit)
      return utils.sanitize_response(res.get(relationship_name, []))
  • Schema constant defining valid relationship names for hunting rulesets. Used for validation in the handler.
    HUNTING_RULESET_RELATIONSHIPS = [
        "hunting_notification_files",
    ]
  • Helper function that fetches relationship descriptors from a VirusTotal object. Used by the handler to get related entities for a hunting ruleset.
    async def fetch_object_relationships(
        vt_client: vt.Client,
        resource_collection_type: str,
        resource_id: str,
        relationships: typing.List[str],
        params: dict[str, typing.Any] | None = None,
        descriptors_only: bool = True,
        limit: int = 10):
      """Fetches the given relationships descriptors from the given object."""
      rel_futures = {}
      # If true, returns descriptors instead of full objects.
      descriptors = '/relationship' if descriptors_only else ''
      async with asyncio.TaskGroup() as tg:
        for rel_name in relationships:
          rel_futures[rel_name] = tg.create_task(
              consume_vt_iterator(
                  vt_client,
                  f"/{resource_collection_type}/{resource_id}"
                  f"{descriptors}/{rel_name}", params=params, limit=limit))
    
      data = {}
      for name, items in rel_futures.items():
        data[name] = []
        for obj in items.result():
          obj_dict = obj.to_dict()
          if 'aggregations' in obj_dict['attributes']:
            del obj_dict['attributes']['aggregations']
          data[name].append(obj_dict)
    
      return data
  • Helper function that recursively removes empty dictionaries and lists from API responses. Used by the handler to sanitize the output before returning.
    def sanitize_response(data: typing.Any) -> typing.Any:
      """Removes empty dictionaries and lists recursively from a response."""
      if isinstance(data, dict):
        sanitized_dict = {}
        for key, value in data.items():
          sanitized_value = sanitize_response(value)
          if sanitized_value is not None:
            sanitized_dict[key] = sanitized_value
        return sanitized_dict
      elif isinstance(data, list):
        sanitized_list = []
        for item in data:
          sanitized_item = sanitize_response(item)
          if sanitized_item is not None:
            sanitized_list.append(sanitized_item)
        return sanitized_list
      elif isinstance(data, str):
        return data if data else None
      else:
        return data
  • The @server.tool() decorator registers this function as an MCP tool. The server instance is imported from gti_mcp.server and all tools are automatically loaded via the wildcard import in gti_mcp/tools/__init__.py.
    @server.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/googleSandy/gti-mcp-standalone'

If you have feedback or need assistance with the MCP directory API, please join our Discord server