Skip to main content
Glama
googleSandy

Google Threat Intelligence MCP Server

by googleSandy

search_malware_families

Search for malware families in Google Threat Intelligence to identify and investigate threats. Use this tool to find collections of threats and retrieve detailed reports for security analysis.

Instructions

Search malware families in the Google Threat Intelligence platform.

Malware families are modeled as collections. Once you get collections from this tool, you can use get_collection_report to fetch the full reports and their relationships.

You can use order_by to sort the results by: "relevance", "creation_date". You can use the sign "+" to make it order ascending, or "-" to make it descending. By default is "relevance-"

Args: query (required): Search query to find threats. limit: Limit the number of threats to retrieve. 10 by default. order_by: Order results by the given order key. "relevance-" by default.

Returns: List of collections, aka threats.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
limitNo
order_byNorelevance-
api_keyNo

Implementation Reference

  • Main handler function search_malware_families decorated with @server.tool() which registers it as an MCP tool. Takes query, context, limit, order_by, and api_key parameters and calls _search_threats_by_collection_type with 'malware-family' collection type.
    @server.tool()
    async def search_malware_families(
        query: str, ctx: Context, limit: int = 10, order_by: str = "relevance-", api_key: str = None
    ) -> typing.List[typing.Dict[str, typing.Any]]:
      """Search malware families in the Google Threat Intelligence platform.
    
      Malware families are modeled as collections. Once you get collections from this tool, you can use `get_collection_report` to fetch the full reports and their relationships.
    
      You can use order_by to sort the results by: "relevance", "creation_date". You can use the sign "+" to make it order ascending, or "-" to make it descending. By default is "relevance-"
    
      Args:
        query (required): Search query to find threats.
        limit: Limit the number of threats to retrieve. 10 by default.
        order_by: Order results by the given order key. "relevance-" by default.
    
      Returns:
        List of collections, aka threats.
      """
      res = await _search_threats_by_collection_type(
          query, "malware-family", ctx, limit, order_by, api_key=api_key)
      return res
  • Core helper function _search_threats_by_collection_type that performs the actual API call to VirusTotal. Validates collection_type, uses vt_client context manager, calls consume_vt_iterator to fetch collections with appropriate filters, and sanitizes the response.
    async def _search_threats_by_collection_type(
        query: str,
        collection_type: str,
        ctx: Context,
        limit: int = 10,
        order_by: str = "relevance-",
        api_key: str = None,
    ) -> typing.List[typing.Dict[str, typing.Any]]:
      """Search a given threat type in the Google Threat Intelligence platform,
    
      Args:
        query (required): Search query to find threats. If you want any threat, just pass an empty string.
        collection_type (required): Collection type. One of: "threat-actor", "malware-family", "campaign", "report", "vulnerability", "collection".
        limit: Limit the number of threats to retrieve. 10 by default.
        order_by: Order results by the given order key. "relevance-" by default.
    
      Returns:
        List of collections, aka threats.
      """
      if collection_type not in COLLECTION_TYPES:
          raise ValueError(
              f"wrong collection_type. Available collection_type are: {','.join(COLLECTION_TYPES)} ")
    
      async with vt_client(ctx, api_key=api_key) as client:
        res = await utils.consume_vt_iterator(
            client,
            "/collections",
            params={
                "filter": f"collection_type:{collection_type} {query}",
                "order": order_by,
                "relationships": COLLECTION_KEY_RELATIONSHIPS,
                "exclude_attributes": COLLECTION_EXCLUDED_ATTRS,
            },
            limit=limit,
        )
      return utils.sanitize_response([o.to_dict() for o in res])
  • Schema definition - COLLECTION_TYPES constant defining valid collection types including 'malware-family', 'threat-actor', 'campaign', 'report', 'software-toolkit', 'vulnerability', and 'collection'. Used for validation in the search function.
    COLLECTION_TYPES = {
        "threat-actor",
        "malware-family",
        "campaign",
        "report",
        "software-toolkit",
        "vulnerability",
        "collection",
    }
  • vt_client context manager that creates and manages a VirusTotal client instance for API requests. Handles API key resolution and proper client cleanup after use.
    @asynccontextmanager
    async def vt_client(ctx: Context, api_key: str = None) -> AsyncIterator[vt.Client]:
      """Provides a vt.Client instance for the current request."""
      client = vt_client_factory(ctx, api_key)
    
      try:
        yield client
      finally:
        await client.close_async()
  • consume_vt_iterator helper function that consumes a VirusTotal iterator asynchronously and returns a list of objects, used by the search function to retrieve collection results.
    async def consume_vt_iterator(
        vt_client: vt.Client, endpoint: str, params: dict | None = None, limit: int = 10):
      """Consumes a vt.Iterator iterator and return the list of objects."""
      res = []
      async for obj in vt_client.iterator(endpoint, params=params, limit=limit):
        res.append(obj)
      return res

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/googleSandy/gti-mcp-standalone'

If you have feedback or need assistance with the MCP directory API, please join our Discord server