search_campaigns
Search for threat campaigns using queries, returning collections of related threats that can be analyzed with detailed reports.
Instructions
Search threat campaigns in the Google Threat Intelligence platform.
Campaigns are modeled as collections. Once you get collections from this tool, you can use get_collection_report to fetch the full reports and their relationships.
You can use order_by to sort the results by: "relevance", "creation_date". You can use the sign "+" to make it order ascending, or "-" to make it descending. By default is "relevance-"
Args: query (required): Search query to find threats. limit: Limit the number of threats to retrieve. 10 by default. order_by: Order results by the given order key. "relevance-" by default.
Returns: List of collections, aka threats.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | ||
| limit | No | ||
| order_by | No | relevance- | |
| api_key | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- gti_mcp/tools/collections.py:244-263 (handler)The 'search_campaigns' tool handler function. It delegates to _search_threats_by_collection_type with collection_type='campaign'.
async def search_campaigns( query: str, ctx: Context, limit: int = 10, order_by: str = "relevance-", api_key: str = None ) -> typing.List[typing.Dict[str, typing.Any]]: """Search threat campaigns in the Google Threat Intelligence platform. Campaigns are modeled as collections. Once you get collections from this tool, you can use `get_collection_report` to fetch the full reports and their relationships. You can use order_by to sort the results by: "relevance", "creation_date". You can use the sign "+" to make it order ascending, or "-" to make it descending. By default is "relevance-" Args: query (required): Search query to find threats. limit: Limit the number of threats to retrieve. 10 by default. order_by: Order results by the given order key. "relevance-" by default. Returns: List of collections, aka threats. """ res = await _search_threats_by_collection_type( query, "campaign", ctx, limit, order_by, api_key=api_key) return res - gti_mcp/tools/collections.py:140-175 (helper)The underlying helper function _search_threats_by_collection_type used by search_campaigns. It queries the VirusTotal /collections API with a filter for the specific collection type (campaign).
async def _search_threats_by_collection_type( query: str, collection_type: str, ctx: Context, limit: int = 10, order_by: str = "relevance-", api_key: str = None, ) -> typing.List[typing.Dict[str, typing.Any]]: """Search a given threat type in the Google Threat Intelligence platform, Args: query (required): Search query to find threats. If you want any threat, just pass an empty string. collection_type (required): Collection type. One of: "threat-actor", "malware-family", "campaign", "report", "vulnerability", "collection". limit: Limit the number of threats to retrieve. 10 by default. order_by: Order results by the given order key. "relevance-" by default. Returns: List of collections, aka threats. """ if collection_type not in COLLECTION_TYPES: raise ValueError( f"wrong collection_type. Available collection_type are: {','.join(COLLECTION_TYPES)} ") async with vt_client(ctx, api_key=api_key) as client: res = await utils.consume_vt_iterator( client, "/collections", params={ "filter": f"collection_type:{collection_type} {query}", "order": order_by, "relationships": COLLECTION_KEY_RELATIONSHIPS, "exclude_attributes": COLLECTION_EXCLUDED_ATTRS, }, limit=limit, ) return utils.sanitize_response([o.to_dict() for o in res]) - gti_mcp/tools/collections.py:242-243 (registration)The @server.tool() decorators that register 'search_campaigns' as an MCP tool with the FastMCP server.
@server.tool() @server.tool() - gti_mcp/utils.py:20-75 (helper)The consume_vt_iterator utility that interfaces with the VT API to paginate through collection results.
async def consume_vt_iterator( vt_client: vt.Client, endpoint: str, params: dict | None = None, limit: int = 10): """Consumes a vt.Iterator iterator and return the list of objects.""" res = [] async for obj in vt_client.iterator(endpoint, params=params, limit=limit): res.append(obj) return res async def fetch_object( vt_client: vt.Client, resource_collection_type: str, resource_type: str, resource_id: str, attributes: list[str] | None = None, relationships: list[str] | None = None, params: dict[str, typing.Any] | None = None): """Fetches objects from Google Threat Intelligence API.""" logging.info( f"Fetching comprehensive {resource_collection_type} " f"report for id: {resource_id}") params = {k: v for k, v in params.items()} if params else {} # Retrieve a selection of object attributes and/or relationships. if attributes: params["attributes"] = ",".join(attributes) if relationships: params["relationships"] = ",".join(relationships) try: obj = await vt_client.get_object_async( f"/{resource_collection_type}/{resource_id}", params=params) if obj.error: logging.error( f"Error fetching main {resource_type} report for {resource_id}: {obj.error}" ) return { "error": f"Failed to get main {resource_type} report: {obj.error}", # "details": report.get("details"), } except vt.error.APIError as e: logging.warning( f"VirusTotal API Error fetching {resource_type} {resource_id}: {e.code} - {e.message}" ) return { "error": f"VirusTotal API Error: {e.code} - {e.message}", "details": f"The requested {resource_type} '{resource_id}' could not be found or there was an issue with the API request." } except Exception as e: logging.exception( f"Unexpected error fetching {resource_type} {resource_id}: {e}" ) return {"error": "An unexpected internal error occurred."}