search_digital_threat_monitoring
Search historical data from surface, deep, and dark web sources using Lucene syntax to monitor digital threats like malware, phishing, and information leaks.
Instructions
Search for historical data in Digital Threat Monitoring (DTM) using Lucene syntax.
Digital theat monitoring is a collection of documents from surface, deep, and dark web sources.
To filter by document type or threat type, include the conditions within the query string
using the fields __type and label_threat, respectively. Combine multiple conditions
using Lucene boolean operators (AND, OR, NOT).
Examples of filtering in the query:
Single document type:
(__type:forum_post) AND (body:security)Multiple document types:
(__type:(forum_post OR paste)) AND (body:security)Single threat type:
(label_threat:information-security/malware) AND (body:exploit)Multiple threat types:
(label_threat:(information-security/malware OR information-security/phishing)) AND (body:exploit)Combined:
(__type:document_analysis) AND (label_threat:information-security/information-leak/credentials) AND (body:password)
Important Considerations for Effective Querying:
Date/Time Filtering (:
Input parameters
sinceanduntilfilter documents by their creation/modification time.These must be strings in RFC3339 format, specifically ending with 'Z' to denote UTC.
Example:
'2025-04-23T00:00:00Z'Pagination for More Than 25 Results:
A single API call returns at most
sizeresults (maximum 25).To retrieve more results, you must paginate:
Make your initial search request.
The response dictionary will contain a key named
page.If this
pagekey holds a non-empty string value, there are more results available.To fetch the next page, make a subsequent API call. This call MUST include the exact same parameters as your original request (query, size, since, until, doc_type, etc.), PLUS the
pageparameter set to the token value received in the previous response'spagefield.Continue this process, using the new
pagetoken from each response, until thepagefield is absent or empty in the response, indicating the end of the results.
Tokenization:
DTM breaks documents into tokens.
Example: "some-domain.com" -> "some", "domain", "com".
Wildcard/Regex queries match single tokens, not phrases.
Special Characters:
Escape with :
+ - & | ! ( ) { } [ ] ^ " ~ * ? : /and space.Example: To find "(1+1):2", query (1+1):2
Case Sensitivity:
DTM entity values are often lowercased.
Boolean operators (AND, OR, NOT) MUST be UPPERCASE.
Domain Search Nuances:
Use wildcards/regex on fields like
doc.domain.Example: doc.domain:google.*.dev
Avoid pattern searches on
group_network.
Performance Limit:
Searches timeout after 60 seconds.
For broad or complex queries, it is highly recommended to use the
sinceanduntilparameters to add time delimiters. This narrows the search scope and helps prevent timeouts.
Noise Reduction:
Use typed entities for higher precision.
Example: organization:"Acme Corp"
Prefer typed entities over free text searches.
The following fields and their meanings can be used to compose a query using Lucene syntax (including combining them with AND, OR, and NOT operators along with parentheses):
author.identity.name - The handle used by the forum post author
subject - The subject line of the forum post
body - The body text of the content
inet_location.url - What URL content was found
language - The content language
title - The title of the web page
channel.name - The Telegram channel name
domain - A DNS domain name
cve - A CVE entry by ID
__type: one of the following
web_content_publish - General website content
domain_discovery - Newly discovered domain names
forum_post - Darkweb forum posts
message - Chat messages like Telegram
paste - Paste site content like Pastebin
shop_listing - Items for sale on the dark web
email_analysis - Suspicious emails
tweet - Tweets from Twitter on cybersecurity topics.
document_analysis - Documents (PDF, Office, text) from VirusTotal, including malicious and corporate confidential files.
label_threat: one of the following
information-security/anonymization - Anonymization
information-security/apt - Advanced Persistent Threat
information-security/botnet - Botnet
information-security/compromised - Compromised Infrastructure
information-security/doxing - Personal Information Disclosure
information-security/exploit - Exploits
information-security/phishing - Phishing
information-security/information-leak - Information Leak
information-security/information-leak/confidential - Confidential Information Leak
information-security/information-leak/credentials - Credential Leak
information-security/information-leak/payment-cards - Credit Card Leak
information-security/malicious-activity - Malicious Activity
information-security/malicious-infrastructure - Malicious Infrastructure
information-security/malware - Malware
information-security/malware/ransomware - Ransomware
information-security/malware/ransomware-victim-listing - Ransomware Victim Listing
information-security/security-research - Security Research
information-security/spam - Spam
Args: query (required): The Lucene-like query string for your document search. size (optional): The number of results to return in each page (0 to 25). Defaults to 10. since (optional): The timestamp to search for documents since (RFC3339 format). until (optional): The timestamp to search for documents from (RFC3339 format). page (optional): The page ID to fetch the page for. This is only used when paginating through pages greater than the first page of results. truncate (optional): The number of characters (as a string) to truncate all documents fields in the response (e.g., '500'). sanitize (optional): If true (default), any HTML content in the document fields are sanitized to remove links, scripts, etc.
Returns: A dictionary containing the list of documents found and search metadata.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | ||
| size | No | ||
| since | No | ||
| until | No | ||
| page | No | ||
| truncate | No | ||
| sanitize | No | ||
| api_key | No |
Implementation Reference
- gti_mcp/tools/files.py:271-447 (handler)The main handler function for search_digital_threat_monitoring tool. This async function executes a search query against the Digital Threat Monitoring (DTM) API using Lucene syntax. It accepts parameters like query, size, since, until, page, truncate, sanitize, and api_key. The function makes a POST request to the DTM API, handles timeouts and errors, processes the response by removing metadata, parses pagination info from link headers, and returns sanitized results.
@server.tool() async def search_digital_threat_monitoring( query: str, ctx: Context, size: int = 10, since: str = None, until: str = None, page: str = None, truncate: str = None, sanitize: bool = True, api_key: str = None, ) -> dict: """Search for historical data in Digital Threat Monitoring (DTM) using Lucene syntax. Digital theat monitoring is a collection of documents from surface, deep, and dark web sources. To filter by document type or threat type, include the conditions within the `query` string using the fields `__type` and `label_threat`, respectively. Combine multiple conditions using Lucene boolean operators (AND, OR, NOT). Examples of filtering in the query: - Single document type: `(__type:forum_post) AND (body:security)` - Multiple document types: `(__type:(forum_post OR paste)) AND (body:security)` - Single threat type: `(label_threat:information-security/malware) AND (body:exploit)` - Multiple threat types: `(label_threat:(information-security/malware OR information-security/phishing)) AND (body:exploit)` - Combined: `(__type:document_analysis) AND (label_threat:information-security/information-leak/credentials) AND (body:password)` Important Considerations for Effective Querying: - **Date/Time Filtering (`since` and `until`)**: - Input parameters `since` and `until` filter documents by their creation/modification time. - These must be strings in RFC3339 format, specifically ending with 'Z' to denote UTC. - Example: `'2025-04-23T00:00:00Z'` - **Pagination for More Than 25 Results**: - A single API call returns at most `size` results (maximum 25). - To retrieve more results, you must paginate: 1. Make your initial search request. 2. The response dictionary will contain a key named `page`. 3. If this `page` key holds a non-empty string value, there are more results available. 4. To fetch the next page, make a subsequent API call. This call MUST include the *exact same parameters* as your original request (query, size, since, until, doc_type, etc.), PLUS the `page` parameter set to the token value received in the previous response's `page` field. 5. Continue this process, using the new `page` token from each response, until the `page` field is absent or empty in the response, indicating the end of the results. Tokenization: - DTM breaks documents into tokens. - Example: "some-domain.com" -> "some", "domain", "com". - Wildcard/Regex queries match single tokens, not phrases. Special Characters: - Escape with \: ` + - & | ! ( ) { } [ ] ^ " ~ * ? : / ` and space. - Example: To find "(1+1):2", query \(1\+1\)\:2 Case Sensitivity: - DTM entity values are often lowercased. - Boolean operators (AND, OR, NOT) MUST be UPPERCASE. Domain Search Nuances: - Use wildcards/regex on fields like `doc.domain`. - Example: doc.domain:google.*.dev - Avoid pattern searches on `group_network`. Performance Limit: - Searches timeout after 60 seconds. - For broad or complex queries, it is highly recommended to use the `since` and `until` parameters to add time delimiters. This narrows the search scope and helps prevent timeouts. Noise Reduction: - Use typed entities for higher precision. - Example: organization:"Acme Corp" - Prefer typed entities over free text searches. The following fields and their meanings can be used to compose a query using Lucene syntax (including combining them with AND, OR, and NOT operators along with parentheses): * author.identity.name - The handle used by the forum post author * subject - The subject line of the forum post * body - The body text of the content * inet_location.url - What URL content was found * language - The content language * title - The title of the web page * channel.name - The Telegram channel name * domain - A DNS domain name * cve - A CVE entry by ID __type: one of the following * web_content_publish - General website content * domain_discovery - Newly discovered domain names * forum_post - Darkweb forum posts * message - Chat messages like Telegram * paste - Paste site content like Pastebin * shop_listing - Items for sale on the dark web * email_analysis - Suspicious emails * tweet - Tweets from Twitter on cybersecurity topics. * document_analysis - Documents (PDF, Office, text) from VirusTotal, including malicious and corporate confidential files. label_threat: one of the following * information-security/anonymization - Anonymization * information-security/apt - Advanced Persistent Threat * information-security/botnet - Botnet * information-security/compromised - Compromised Infrastructure * information-security/doxing - Personal Information Disclosure * information-security/exploit - Exploits * information-security/phishing - Phishing * information-security/information-leak - Information Leak * information-security/information-leak/confidential - Confidential Information Leak * information-security/information-leak/credentials - Credential Leak * information-security/information-leak/payment-cards - Credit Card Leak * information-security/malicious-activity - Malicious Activity * information-security/malicious-infrastructure - Malicious Infrastructure * information-security/malware - Malware * information-security/malware/ransomware - Ransomware * information-security/malware/ransomware-victim-listing - Ransomware Victim Listing * information-security/security-research - Security Research * information-security/spam - Spam Args: query (required): The Lucene-like query string for your document search. size (optional): The number of results to return in each page (0 to 25). Defaults to 10. since (optional): The timestamp to search for documents since (RFC3339 format). until (optional): The timestamp to search for documents from (RFC3339 format). page (optional): The page ID to fetch the page for. This is only used when paginating through pages greater than the first page of results. truncate (optional): The number of characters (as a string) to truncate all documents fields in the response (e.g., '500'). sanitize (optional): If true (default), any HTML content in the document fields are sanitized to remove links, scripts, etc. Returns: A dictionary containing the list of documents found and search metadata. """ async with vt_client(ctx, api_key=api_key) as client: params = { "size": size, "since": since, "until": until, "page": page, "truncate": truncate, "sanitize": str(sanitize).lower(), } params = {k: v for k, v in params.items() if v is not None} path = f"/dtm/docs/search?{urllib.parse.urlencode(params)}" try: res = await client.post_async( path=path, json_data={"query": query} ) if "text/html" in res.headers.get("Content-Type", ""): response_text = await res.text_async() if "request timed out" in response_text.lower(): return {"error": "The request timed out. Please try reducing the scope of your query by using `since` and `until` parameters to add time delimiters"} logging.error(response_text) return {"error": f"API returned an HTML error page instead of JSON: {response_text}"} res_json = await res.json_async() except (asyncio.TimeoutError, TimeoutError): # Catch both return {"error": "The request timed out. Please try reducing the scope of your query by using `since` and `until` parameters to add time delimiters"} except json.JSONDecodeError as json_error: logging.error(f"Failed to parse JSON response: {json_error}") return {"error": f"Failed to parse server response: {json_error}."} except Exception as e: logging.error(f"An unexpected error occurred: {e} (type: {type(e)})") return {"error": f"An unexpected error occurred: {e}"} # Remove unnecessary information if "docs" in res_json: for i in range(len(res_json["docs"])): res_json["docs"][i].pop("__meta", None) res_json["docs"][i].pop("entities", None) link_header = res.headers.get("link") if link_header and 'rel="next"' in link_header: try: url_part = link_header.split(';')[0].strip().strip('<>') query_string = urllib.parse.urlparse(url_part).query next_page = urllib.parse.parse_qs(query_string).get('page', [None])[0] if next_page: res_json["page"] = next_page except (IndexError, AttributeError): # Could not parse link header, proceed without it pass return utils.sanitize_response(res_json) - gti_mcp/tools/files.py:271-394 (schema)The function signature and comprehensive docstring define the schema for search_digital_threat_monitoring. The docstring (lines 283-394) contains detailed parameter documentation including types (query: str, size: int, since: str, until: str, page: str, truncate: str, sanitize: bool, api_key: str), descriptions, default values, examples of Lucene query syntax, and return type (dict). This serves as the input/output schema definition.
@server.tool() async def search_digital_threat_monitoring( query: str, ctx: Context, size: int = 10, since: str = None, until: str = None, page: str = None, truncate: str = None, sanitize: bool = True, api_key: str = None, ) -> dict: """Search for historical data in Digital Threat Monitoring (DTM) using Lucene syntax. Digital theat monitoring is a collection of documents from surface, deep, and dark web sources. To filter by document type or threat type, include the conditions within the `query` string using the fields `__type` and `label_threat`, respectively. Combine multiple conditions using Lucene boolean operators (AND, OR, NOT). Examples of filtering in the query: - Single document type: `(__type:forum_post) AND (body:security)` - Multiple document types: `(__type:(forum_post OR paste)) AND (body:security)` - Single threat type: `(label_threat:information-security/malware) AND (body:exploit)` - Multiple threat types: `(label_threat:(information-security/malware OR information-security/phishing)) AND (body:exploit)` - Combined: `(__type:document_analysis) AND (label_threat:information-security/information-leak/credentials) AND (body:password)` Important Considerations for Effective Querying: - **Date/Time Filtering (`since` and `until`)**: - Input parameters `since` and `until` filter documents by their creation/modification time. - These must be strings in RFC3339 format, specifically ending with 'Z' to denote UTC. - Example: `'2025-04-23T00:00:00Z'` - **Pagination for More Than 25 Results**: - A single API call returns at most `size` results (maximum 25). - To retrieve more results, you must paginate: 1. Make your initial search request. 2. The response dictionary will contain a key named `page`. 3. If this `page` key holds a non-empty string value, there are more results available. 4. To fetch the next page, make a subsequent API call. This call MUST include the *exact same parameters* as your original request (query, size, since, until, doc_type, etc.), PLUS the `page` parameter set to the token value received in the previous response's `page` field. 5. Continue this process, using the new `page` token from each response, until the `page` field is absent or empty in the response, indicating the end of the results. Tokenization: - DTM breaks documents into tokens. - Example: "some-domain.com" -> "some", "domain", "com". - Wildcard/Regex queries match single tokens, not phrases. Special Characters: - Escape with \: ` + - & | ! ( ) { } [ ] ^ " ~ * ? : / ` and space. - Example: To find "(1+1):2", query \(1\+1\)\:2 Case Sensitivity: - DTM entity values are often lowercased. - Boolean operators (AND, OR, NOT) MUST be UPPERCASE. Domain Search Nuances: - Use wildcards/regex on fields like `doc.domain`. - Example: doc.domain:google.*.dev - Avoid pattern searches on `group_network`. Performance Limit: - Searches timeout after 60 seconds. - For broad or complex queries, it is highly recommended to use the `since` and `until` parameters to add time delimiters. This narrows the search scope and helps prevent timeouts. Noise Reduction: - Use typed entities for higher precision. - Example: organization:"Acme Corp" - Prefer typed entities over free text searches. The following fields and their meanings can be used to compose a query using Lucene syntax (including combining them with AND, OR, and NOT operators along with parentheses): * author.identity.name - The handle used by the forum post author * subject - The subject line of the forum post * body - The body text of the content * inet_location.url - What URL content was found * language - The content language * title - The title of the web page * channel.name - The Telegram channel name * domain - A DNS domain name * cve - A CVE entry by ID __type: one of the following * web_content_publish - General website content * domain_discovery - Newly discovered domain names * forum_post - Darkweb forum posts * message - Chat messages like Telegram * paste - Paste site content like Pastebin * shop_listing - Items for sale on the dark web * email_analysis - Suspicious emails * tweet - Tweets from Twitter on cybersecurity topics. * document_analysis - Documents (PDF, Office, text) from VirusTotal, including malicious and corporate confidential files. label_threat: one of the following * information-security/anonymization - Anonymization * information-security/apt - Advanced Persistent Threat * information-security/botnet - Botnet * information-security/compromised - Compromised Infrastructure * information-security/doxing - Personal Information Disclosure * information-security/exploit - Exploits * information-security/phishing - Phishing * information-security/information-leak - Information Leak * information-security/information-leak/confidential - Confidential Information Leak * information-security/information-leak/credentials - Credential Leak * information-security/information-leak/payment-cards - Credit Card Leak * information-security/malicious-activity - Malicious Activity * information-security/malicious-infrastructure - Malicious Infrastructure * information-security/malware - Malware * information-security/malware/ransomware - Ransomware * information-security/malware/ransomware-victim-listing - Ransomware Victim Listing * information-security/security-research - Security Research * information-security/spam - Spam Args: query (required): The Lucene-like query string for your document search. size (optional): The number of results to return in each page (0 to 25). Defaults to 10. since (optional): The timestamp to search for documents since (RFC3339 format). until (optional): The timestamp to search for documents from (RFC3339 format). page (optional): The page ID to fetch the page for. This is only used when paginating through pages greater than the first page of results. truncate (optional): The number of characters (as a string) to truncate all documents fields in the response (e.g., '500'). sanitize (optional): If true (default), any HTML content in the document fields are sanitized to remove links, scripts, etc. Returns: A dictionary containing the list of documents found and search metadata. """ - gti_mcp/tools/files.py:271-282 (registration)Tool registration via @server.tool() decorator at line 271. The decorator automatically registers the search_digital_threat_monitoring function as an available MCP tool on the FastMCP server instance. The server object is imported from gti_mcp.server and all tools in the gti_mcp.tools package are loaded via 'from gti_mcp.tools import *' in server.py line 73.
@server.tool() async def search_digital_threat_monitoring( query: str, ctx: Context, size: int = 10, since: str = None, until: str = None, page: str = None, truncate: str = None, sanitize: bool = True, api_key: str = None, ) -> dict: - gti_mcp/utils.py:119-138 (helper)The sanitize_response helper function used by search_digital_threat_monitoring. This utility recursively removes empty dictionaries and lists from the API response data before returning it to the client. It processes dicts, lists, and strings, returning None for empty values to clean up the response structure.
def sanitize_response(data: typing.Any) -> typing.Any: """Removes empty dictionaries and lists recursively from a response.""" if isinstance(data, dict): sanitized_dict = {} for key, value in data.items(): sanitized_value = sanitize_response(value) if sanitized_value is not None: sanitized_dict[key] = sanitized_value return sanitized_dict elif isinstance(data, list): sanitized_list = [] for item in data: sanitized_item = sanitize_response(item) if sanitized_item is not None: sanitized_list.append(sanitized_item) return sanitized_list elif isinstance(data, str): return data if data else None else: return data