get_threat_profile_recommendations
Retrieve ML-recommended threat objects (actors, malware, campaigns) tied to a threat profile, enabling focused threat intelligence for targeted industries and regions.
Instructions
Returns the list of objects associated to a given Threat Profile.
Each of these objects has one of the following types:
Threat Actors
Malware Families
Software or Toolkits
Campaigns
IoC Collections
Reports
Vulnerabilities
We can distinguish between two other types of objects based on how they were associated with the Threat Profile:
Recommended objects are automatically recommended or assigned to a Threat Profile based on our proprietary ML that takes into account the Threat Profile's configured interests such as the targeted industries, target regions, source regions, malware roles and actor motivations to recommend the most relevant threats. These objects are identified by the presence of "source": "SOURCE_RECOMMENDATION" within the "context_attributes" response parameter below.
Added objects are assigned or added by users to a Threat Profile, when users find other relevant threats not automatically recommended by our ML module. These objects are identified by the presence of "source": "SOURCE_DIRECT_FOLLOW" within the "context_attributes" response parameter below.
Args: profile_id (str): Threat Profile identifier at Google Threat Intelligence. limit: Limit the number of objects to retrieve. 10 by default.
Returns: List of Threat (collection) objects identifiers associated to the Threat Profile. Use
get_collection_reportto retrieve the full objects.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| profile_id | Yes | ||
| limit | No | ||
| api_key | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- gti_mcp/tools/threat_profiles.py:102-144 (handler)The async handler function for the 'get_threat_profile_recommendations' tool. It uses vt_client to fetch recommendations for a given threat profile via fetch_object_relationships with the 'recommendations' relationship, then sanitizes the response.
@server.tool() async def get_threat_profile_recommendations( profile_id: str, ctx: Context, limit: int = 10, api_key: str = None ) -> typing.List[typing.Dict[str, typing.Any]]: """Returns the list of objects associated to a given Threat Profile. Each of these objects has one of the following types: - Threat Actors - Malware Families - Software or Toolkits - Campaigns - IoC Collections - Reports - Vulnerabilities We can distinguish between two other types of objects based on how they were associated with the Threat Profile: - **Recommended objects** are automatically recommended or assigned to a Threat Profile based on our proprietary ML that takes into account the Threat Profile's configured interests such as the targeted industries, target regions, source regions, malware roles and actor motivations to recommend the most relevant threats. These objects are identified by the presence of "source": "SOURCE_RECOMMENDATION" within the "context_attributes" response parameter below. - **Added objects** are assigned or added by users to a Threat Profile, when users find other relevant threats not automatically recommended by our ML module. These objects are identified by the presence of "source": "SOURCE_DIRECT_FOLLOW" within the "context_attributes" response parameter below. Args: profile_id (str): Threat Profile identifier at Google Threat Intelligence. limit: Limit the number of objects to retrieve. 10 by default. Returns: List of Threat (collection) objects identifiers associated to the Threat Profile. Use `get_collection_report` to retrieve the full objects. """ async with vt_client(ctx, api_key=api_key) as client: res = await utils.fetch_object_relationships( client, "threat_profiles", profile_id, ['recommendations'], limit=limit) return utils.sanitize_response(res.get('recommendations', [])) - gti_mcp/tools/threat_profiles.py:102-103 (registration)The tool is registered with the MCP server via the @server.tool() decorator on line 102, which makes it available as an MCP tool named 'get_threat_profile_recommendations'.
@server.tool() async def get_threat_profile_recommendations( - gti_mcp/utils.py:87-116 (helper)The fetch_object_relationships helper function called by the handler. It fetches relationship descriptors from VT API, in this case the 'recommendations' relationship for a threat profile.
async def fetch_object_relationships( vt_client: vt.Client, resource_collection_type: str, resource_id: str, relationships: typing.List[str], params: dict[str, typing.Any] | None = None, descriptors_only: bool = True, limit: int = 10): """Fetches the given relationships descriptors from the given object.""" rel_futures = {} # If true, returns descriptors instead of full objects. descriptors = '/relationship' if descriptors_only else '' async with asyncio.TaskGroup() as tg: for rel_name in relationships: rel_futures[rel_name] = tg.create_task( consume_vt_iterator( vt_client, f"/{resource_collection_type}/{resource_id}" f"{descriptors}/{rel_name}", params=params, limit=limit)) data = {} for name, items in rel_futures.items(): data[name] = [] for obj in items.result(): obj_dict = obj.to_dict() if 'aggregations' in obj_dict['attributes']: del obj_dict['attributes']['aggregations'] data[name].append(obj_dict) return data - gti_mcp/utils.py:119-138 (helper)The sanitize_response helper function called by the handler to clean empty dicts/lists from the response before returning.
def sanitize_response(data: typing.Any) -> typing.Any: """Removes empty dictionaries and lists recursively from a response.""" if isinstance(data, dict): sanitized_dict = {} for key, value in data.items(): sanitized_value = sanitize_response(value) if sanitized_value is not None: sanitized_dict[key] = sanitized_value return sanitized_dict elif isinstance(data, list): sanitized_list = [] for item in data: sanitized_item = sanitize_response(item) if sanitized_item is not None: sanitized_list.append(sanitized_item) return sanitized_list elif isinstance(data, str): return data if data else None else: return data - gti_mcp/server.py:19-67 (registration)The FastMCP server instance creation and the import of tools from gti_mcp.tools (line 73), which triggers the registration of all tools including get_threat_profile_recommendations.
import logging import os import vt from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse, Response from starlette.routing import Route, Mount from starlette.requests import Request from mcp.server.sse import SseServerTransport from mcp.server.fastmcp import FastMCP, Context logging.basicConfig(level=logging.ERROR) # If True, creates a completely fresh transport for each request # with no session tracking or state persistence between requests. stateless = False if os.getenv("STATELESS") == "1": stateless = True def _vt_client_factory(ctx: Context, api_key: str = None) -> vt.Client: # Prioritize the passed argument if not api_key: api_key = os.getenv("VT_APIKEY") # Try to get from context if not in env (placeholder for future ctx inspection) # if not api_key and ctx and hasattr(ctx, 'init_options'): # api_key = ctx.init_options.get('vtApiKey') if not api_key: raise ValueError("VT API Key is required. Please provide it as an argument 'api_key' or set VT_APIKEY environment variable.") return vt.Client(api_key) vt_client_factory = _vt_client_factory @asynccontextmanager async def vt_client(ctx: Context, api_key: str = None) -> AsyncIterator[vt.Client]: """Provides a vt.Client instance for the current request.""" client = vt_client_factory(ctx, api_key) try: yield client finally: await client.close_async() # Create a named server and specify dependencies for deployment and development server = FastMCP(