Skip to main content
Glama
googleSandy

Google Threat Intelligence MCP Server

by googleSandy

get_collection_report

Retrieve threat intelligence collections from Google's platform to analyze malware families, threat actors, campaigns, and security reports.

Instructions

At Google Threat Intelligence, threats are modeled as "collections". This tool retrieves them from the platform.

They have different collections types like:

  • "malware-family"

  • "threat-actor"

  • "campaign"

  • "report"

  • "collection".

You can find the collection type in the "collection_type" field.

Args: id (required): Google Threat Intelligence identifier. Returns: A collection object. Put attention to the collection type to correctly understand what it represents.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
idYes
api_keyNo

Implementation Reference

  • Main handler function for get_collection_report tool. Uses @server.tool() decorator for registration and calls utils.fetch_object to retrieve collection data from the API.
    @server.tool()
    async def get_collection_report(id: str, ctx: Context, api_key: str = None) -> typing.Dict[str, typing.Any]:
      """At Google Threat Intelligence, threats are modeled as "collections". This tool retrieves them from the platform.
    
      They have different collections types like: 
        - "malware-family"
        - "threat-actor"
        - "campaign"
        - "report"
        - "collection". 
    
      You can find the collection type in the "collection_type" field.
    
      Args:
        id (required): Google Threat Intelligence identifier.
      Returns:
        A collection object. Put attention to the collection type to correctly understand what it represents.
      """
      async with vt_client(ctx, api_key=api_key) as client:
        res = await utils.fetch_object(
            client,
            "collections",
            "collection",
            id,
            relationships=COLLECTION_KEY_RELATIONSHIPS,
            params={"exclude_attributes": COLLECTION_EXCLUDED_ATTRS})
      return res
  • Helper utility function that performs the actual API call to fetch collection data from VirusTotal. Handles API errors and processes the response.
    async def fetch_object(
        vt_client: vt.Client,
        resource_collection_type: str,
        resource_type: str,
        resource_id: str,
        attributes: list[str] | None = None,
        relationships: list[str] | None = None,
        params: dict[str, typing.Any] | None = None):
      """Fetches objects from Google Threat Intelligence API."""
      logging.info(
          f"Fetching comprehensive {resource_collection_type} "
          f"report for id: {resource_id}")
      
      params = {k: v for k, v in params.items()} if params else {}
    
      # Retrieve a selection of object attributes and/or relationships.
      if attributes:
        params["attributes"] = ",".join(attributes)
      if relationships:
        params["relationships"] = ",".join(relationships)
    
      try:
        obj = await vt_client.get_object_async(
            f"/{resource_collection_type}/{resource_id}", params=params)
    
        if obj.error:
          logging.error(
              f"Error fetching main {resource_type} report for {resource_id}: {obj.error}"
          )
          return {
              "error": f"Failed to get main {resource_type} report: {obj.error}",
              # "details": report.get("details"),
          }
      except vt.error.APIError as e:
        logging.warning(
            f"VirusTotal API Error fetching {resource_type} {resource_id}: {e.code} - {e.message}"
        )
        return {
            "error": f"VirusTotal API Error: {e.code} - {e.message}",
            "details": f"The requested {resource_type} '{resource_id}' could not be found or there was an issue with the API request."
        }
      except Exception as e:
        logging.exception(
            f"Unexpected error fetching {resource_type} {resource_id}: {e}"
        )
        return {"error": "An unexpected internal error occurred."}
    
      # Build response.
      obj_dict = obj.to_dict()
      obj_dict['id'] = obj.id
      if 'aggregations' in obj_dict['attributes']:
        del obj_dict['attributes']['aggregations']
    
      logging.info(
          f"Successfully generated concise threat summary for id: {resource_id}")
      return obj_dict
  • The @server.tool() decorator registers the get_collection_report function as an MCP tool in the server.
    @server.tool()
  • Schema constants defining valid collection types, relationships, and excluded attributes used by get_collection_report and other collection-related tools.
    COLLECTION_RELATIONSHIPS = [
        "associations",
        "attack_techniques",
        "domains",
        "files",
        "ip_addresses",
        "urls",
        "threat_actors",
        "malware_families",
        "software_toolkits",
        "campaigns",
        "vulnerabilities",
        "reports",
        "suspected_threat_actors",
        "hunting_rulesets",
    ]
    
    COLLECTION_KEY_RELATIONSHIPS = [
        "associations",
    ]
    COLLECTION_EXCLUDED_ATTRS = ",".join(["aggregations"])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/googleSandy/gti-mcp-standalone'

If you have feedback or need assistance with the MCP directory API, please join our Discord server