analyze_observable
Submit observables (IP, domain, URL, hash, extension ID) for threat analysis using specified engines. Returns enriched reputation data from Cyberbro API.
Instructions
Trigger an analysis for a given observable (IP, domain, URL, hash, chrome extension id) using Cyberbro. It can support multiple observables at once separated by spaces. Args: text: Observable(s) to analyze. engines: List of engine names. Returns: The analysis response from Cyberbro API.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| text | Yes | ||
| engines | Yes |
Implementation Reference
- src/mcp_cyberbro/tools/analysis.py:12-29 (handler)The async function analyze_observable is the handler for the tool. It takes text and engines parameters, POSTs to Cyberbro's /analyze endpoint, and returns the API response (or an error dict on exception).
@mcp.tool() async def analyze_observable(text: str, engines: list[str]) -> Any: """ Trigger an analysis for a given observable (IP, domain, URL, hash, chrome extension id) using Cyberbro. It can support multiple observables at once separated by spaces. Args: text: Observable(s) to analyze. engines: List of engine names. Returns: The analysis response from Cyberbro API. """ try: payload = {"text": text, "engines": engines} return await post_json( f"{config.cyberbro_api}/analyze", payload=payload, verify=config.ssl_verify ) except Exception as exc: return {"error": f"Error executing tool analyze_observable: {exc!s}"} - src/mcp_cyberbro/tools/analysis.py:11-11 (registration)The function register_analysis_tools wraps the @mcp.tool() decorator registration for analyze_observable (and sibling tools).
def register_analysis_tools(mcp: FastMCP, config: CyberbroConfig) -> None: - src/mcp_cyberbro/utils/http.py:6-10 (helper)The post_json helper function used by analyze_observable to send the HTTP POST request to the Cyberbro API.
async def post_json(url: str, payload: dict, verify: bool): async with httpx.AsyncClient(verify=verify) as client: response = await client.post(url, json=payload) response.raise_for_status() return response.json() - CyberbroConfig.cyberbro_api property builds the base API URL used as the endpoint for the analyze_observable tool.
@property def cyberbro_api(self) -> str: return f"{self.cyberbro_url}/{self.api_prefix}"