Skip to main content
Glama
googleSandy

Google Threat Intelligence MCP Server

by googleSandy

get_file_behavior_report

Analyze file behavior in sandbox environments to identify malware threats and security risks using Google's threat intelligence data.

Instructions

Retrieve the file behaviour report of the given file behaviour identifier.

You can get all the file behaviour of a given a file by calling get_entities_related_to_a_file as the file hash and the behaviours as relationship name.

The file behaviour ID is composed using the following pattern: "{file hash}_{sandbox name}".

Args: file_behaviour_id (required): File behaviour ID. Returns: The file behaviour report.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
file_behaviour_idYes
api_keyNo

Implementation Reference

  • Main implementation of get_file_behavior_report tool. This function retrieves the file behavior report using a file behaviour ID (pattern: '{file hash}_{sandbox name}'). It fetches related entities including contacted domains/IPs/URLs, dropped files, and embedded indicators using the VirusTotal API.
    @server.tool()
    async def get_file_behavior_report(
        file_behaviour_id: str, ctx: Context, api_key: str = None
    ) -> typing.Dict[str, typing.Any]:
      """Retrieve the file behaviour report of the given file behaviour identifier.
    
      You can get all the file behaviour of a given a file by calling `get_entities_related_to_a_file` as the file hash and the `behaviours` as relationship name.
    
      The file behaviour ID is composed using the following pattern: "{file hash}_{sandbox name}".
    
      Args:
        file_behaviour_id (required): File behaviour ID.
      Returns:
        The file behaviour report.
      """
      async with vt_client(ctx, api_key=api_key) as client:
        res = await utils.fetch_object(
            client,
            "file_behaviours",
            "file_behaviour",
            file_behaviour_id,
            relationships=[
                "contacted_domains",
                "contacted_ips",
                "contacted_urls",
                "dropped_files",
                "embedded_domains",
                "embedded_ips",
                "embedded_urls",
                "associations",
            ],
        )
      return utils.sanitize_response(res)
  • Tool registration with @server.tool() decorator that registers get_file_behavior_report with the FastMCP server
    @server.tool()
  • Helper function fetch_object used by get_file_behavior_report to retrieve objects from Google Threat Intelligence API. It handles API errors, builds request parameters for attributes and relationships, and returns structured responses.
    async def fetch_object(
        vt_client: vt.Client,
        resource_collection_type: str,
        resource_type: str,
        resource_id: str,
        attributes: list[str] | None = None,
        relationships: list[str] | None = None,
        params: dict[str, typing.Any] | None = None):
      """Fetches objects from Google Threat Intelligence API."""
      logging.info(
          f"Fetching comprehensive {resource_collection_type} "
          f"report for id: {resource_id}")
      
      params = {k: v for k, v in params.items()} if params else {}
    
      # Retrieve a selection of object attributes and/or relationships.
      if attributes:
        params["attributes"] = ",".join(attributes)
      if relationships:
        params["relationships"] = ",".join(relationships)
    
      try:
        obj = await vt_client.get_object_async(
            f"/{resource_collection_type}/{resource_id}", params=params)
    
        if obj.error:
          logging.error(
              f"Error fetching main {resource_type} report for {resource_id}: {obj.error}"
          )
          return {
              "error": f"Failed to get main {resource_type} report: {obj.error}",
              # "details": report.get("details"),
          }
      except vt.error.APIError as e:
        logging.warning(
            f"VirusTotal API Error fetching {resource_type} {resource_id}: {e.code} - {e.message}"
        )
        return {
            "error": f"VirusTotal API Error: {e.code} - {e.message}",
            "details": f"The requested {resource_type} '{resource_id}' could not be found or there was an issue with the API request."
        }
      except Exception as e:
        logging.exception(
            f"Unexpected error fetching {resource_type} {resource_id}: {e}"
        )
        return {"error": "An unexpected internal error occurred."}
    
      # Build response.
      obj_dict = obj.to_dict()
      obj_dict['id'] = obj.id
      if 'aggregations' in obj_dict['attributes']:
        del obj_dict['attributes']['aggregations']
    
      logging.info(
          f"Successfully generated concise threat summary for id: {resource_id}")
      return obj_dict
  • Helper function sanitize_response used by get_file_behavior_report to remove empty dictionaries and lists recursively from API responses, cleaning up the output before returning to clients.
    def sanitize_response(data: typing.Any) -> typing.Any:
      """Removes empty dictionaries and lists recursively from a response."""
      if isinstance(data, dict):
        sanitized_dict = {}
        for key, value in data.items():
          sanitized_value = sanitize_response(value)
          if sanitized_value is not None:
            sanitized_dict[key] = sanitized_value
        return sanitized_dict
      elif isinstance(data, list):
        sanitized_list = []
        for item in data:
          sanitized_item = sanitize_response(item)
          if sanitized_item is not None:
            sanitized_list.append(sanitized_item)
        return sanitized_list
      elif isinstance(data, str):
        return data if data else None
      else:
        return data
  • Context manager vt_client that provides a vt.Client instance for the current request, ensuring proper cleanup after use. Used by get_file_behavior_report to get authenticated API access.
    @asynccontextmanager
    async def vt_client(ctx: Context, api_key: str = None) -> AsyncIterator[vt.Client]:
      """Provides a vt.Client instance for the current request."""
      client = vt_client_factory(ctx, api_key)
    
      try:
        yield client
      finally:
        await client.close_async()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/googleSandy/gti-mcp-standalone'

If you have feedback or need assistance with the MCP directory API, please join our Discord server