Skip to main content
Glama
boecht

BitSight Community MCP Server

by boecht
service.py35.3 kB
"""Company rating tools and helpers for BiRRe.""" from __future__ import annotations import asyncio import heapq import json from collections import defaultdict from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from typing import Any from fastmcp import Context, FastMCP from fastmcp.tools.tool import FunctionTool from pydantic import BaseModel, Field, model_validator from birre.config.settings import DEFAULT_MAX_FINDINGS, DEFAULT_RISK_VECTOR_FILTER from birre.domain.common import CallV1Tool from birre.domain.company_rating.constants import ( DEFAULT_SEVERITY_FLOOR, SEVERITY_LOW, SEVERITY_MATERIAL, SEVERITY_MODERATE, SEVERITY_RANK_LOW, SEVERITY_RANK_MATERIAL, SEVERITY_RANK_MODERATE, SEVERITY_RANK_SEVERE, SEVERITY_RANK_UNKNOWN, SEVERITY_SCORE_UNKNOWN, SEVERITY_SEVERE, TIMESTAMP_INVALID, ) from birre.domain.subscription import ( SubscriptionAttempt, cleanup_ephemeral_subscription, create_ephemeral_subscription, ) from birre.infrastructure.logging import BoundLogger, log_rating_event class TrendSummary(BaseModel): direction: str change: float class CurrentRating(BaseModel): value: float | None = None color: str | None = None class FindingSummary(BaseModel): top: int finding: str | None = None details: str | None = None asset: str | None = None first_seen: str | None = None last_seen: str | None = None class TopFindingsPolicy(BaseModel): severity_floor: str supplements: list[str] max_items: int profile: str class TopFindings(BaseModel): policy: TopFindingsPolicy findings: list[FindingSummary] = Field(default_factory=list) count: int = Field(default=0, ge=0) @model_validator(mode="before") @classmethod def _default_count(cls, values: Any) -> dict[str, Any]: if not isinstance(values, dict): return {"policy": values, "findings": [], "count": 0} findings = values.get("findings") or [] values_dict: dict[str, Any] = dict(values) values_dict.setdefault("findings", findings) values_dict.setdefault("count", len(findings)) return values_dict class RatingLegendEntry(BaseModel): color: str min: int max: int class RatingLegend(BaseModel): rating: list[RatingLegendEntry] class CompanyRatingResponse(BaseModel): error: str | None = None warning: str | None = None name: str | None = None domain: str | None = None current_rating: CurrentRating | None = None trend_8_weeks: TrendSummary | None = None trend_1_year: TrendSummary | None = None top_findings: TopFindings | None = None legend: RatingLegend | None = None def to_payload(self) -> dict[str, Any]: if self.error: return {"error": self.error} data = self.model_dump(exclude_unset=True) data.pop("error", None) return data COMPANY_RATING_OUTPUT_SCHEMA: dict[str, Any] = CompanyRatingResponse.model_json_schema() def _rating_color(value: float | None) -> str | None: if value is None: return None if value >= 740: return "green" if value >= 630: return "yellow" return "red" def _aggregate_ratings( raw_ratings: list[dict[str, Any]], *, horizon_days: int, mode: str, ) -> list[tuple[datetime, float]]: cutoff = datetime.now(UTC).date() - timedelta(days=horizon_days) buckets: dict[tuple[int, int], list[float]] = defaultdict(list) anchors: dict[tuple[int, int], datetime] = {} for entry in raw_ratings: if not isinstance(entry, dict): continue date_str = entry.get("rating_date") rating_value = entry.get("rating") if not date_str or rating_value is None: continue try: rating_date = datetime.strptime(str(date_str), "%Y-%m-%d").date() except ValueError: continue if rating_date < cutoff: continue if mode == "weekly": iso = rating_date.isocalendar() key = (iso.year, iso.week) anchor = datetime.fromisocalendar(iso.year, iso.week, 1) elif mode == "monthly": key = (rating_date.year, rating_date.month) anchor = datetime(rating_date.year, rating_date.month, 1) else: key = (rating_date.year, rating_date.timetuple().tm_yday) anchor = datetime.combine(rating_date, datetime.min.time()) buckets[key].append(float(rating_value)) anchors[key] = anchor series: list[tuple[datetime, float]] = [] for key, values in buckets.items(): anchor = anchors[key] avg_rating = sum(values) / len(values) series.append((anchor, avg_rating)) series.sort(key=lambda item: item[0]) return series def _compute_trend(series: list[tuple[datetime, float]]) -> dict[str, object]: if len(series) < 2: return { "direction": "insufficient data", "change": 0.0, } xs = [point[0].toordinal() for point in series] ys = [point[1] for point in series] n = len(xs) x_mean = sum(xs) / n y_mean = sum(ys) / n denom = sum((x - x_mean) ** 2 for x in xs) if denom == 0: slope = 0.0 else: slope = sum((x - x_mean) * (y - y_mean) for x, y in zip(xs, ys)) / denom smoothed_delta = slope * (xs[-1] - xs[0]) def classify(delta: float) -> str: if delta >= 40: return "up" if delta >= 20: return "slightly up" if delta <= -40: return "down" if delta <= -20: return "slightly down" return "stable" label = classify(smoothed_delta) return { "direction": label, "change": round(smoothed_delta, 1), } def _rank_severity_category_value(val: Any) -> int: if isinstance(val, str): v = val.lower() if v == SEVERITY_SEVERE: return SEVERITY_RANK_SEVERE if v == SEVERITY_MATERIAL: return SEVERITY_RANK_MATERIAL if v == SEVERITY_MODERATE: return SEVERITY_RANK_MODERATE if v == SEVERITY_LOW: return SEVERITY_RANK_LOW return SEVERITY_RANK_UNKNOWN def _derive_numeric_severity_score(item: Any) -> float: def _extract_numeric(value: Any) -> float | None: return float(value) if isinstance(value, int | float) else None if not isinstance(item, dict): return SEVERITY_SCORE_UNKNOWN direct = _extract_numeric(item.get("severity")) if direct is not None: return direct details = item.get("details") if not isinstance(details, dict): return SEVERITY_SCORE_UNKNOWN for key in ("severity", "grade"): candidate = _extract_numeric(details.get(key)) if candidate is not None: return candidate cvss = details.get("cvss") if isinstance(cvss, dict): base_score = _extract_numeric(cvss.get("base")) if base_score is not None: return base_score return SEVERITY_SCORE_UNKNOWN def _parse_timestamp_seconds(val: Any) -> int: if isinstance(val, str) and val: for fmt in ( "%Y-%m-%d", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", ): try: return int(datetime.strptime(val, fmt).timestamp()) except Exception: continue return TIMESTAMP_INVALID def _derive_asset_importance_score(obj: Any) -> float: if isinstance(obj, dict): assets = obj.get("assets") if isinstance(obj.get("assets"), dict) else {} if isinstance(assets, dict): for key in ("combined_importance", "importance"): val = assets.get(key) if isinstance(val, int | float): return float(val) return 0.0 def _build_finding_sort_key(item: Any) -> tuple[float, int, float, float, str]: sev_num = _derive_numeric_severity_score(item) sev_cat = _rank_severity_category_value( item.get("severity") if isinstance(item, dict) else None ) imp = _derive_asset_importance_score(item) last = _parse_timestamp_seconds( item.get("last_seen") if isinstance(item, dict) else None ) rv = (item.get("risk_vector") or "") if isinstance(item, dict) else "" # Desc numeric severity, then desc categorical rank, desc importance, # desc last_seen; asc risk_vector return (-sev_num, -sev_cat, -imp, -last, rv) def _build_finding_score_tuple(item: Any) -> tuple[float, int, float, float]: # Positive score tuple for heapq.nlargest (descending desired) sev_num = _derive_numeric_severity_score(item) sev_cat = _rank_severity_category_value( item.get("severity") if isinstance(item, dict) else None ) imp = _derive_asset_importance_score(item) last = _parse_timestamp_seconds( item.get("last_seen") if isinstance(item, dict) else None ) return (sev_num, sev_cat, imp, last) def _select_top_finding_candidates( results: list[dict[str, Any]], k: int ) -> list[dict[str, Any]]: if not results: return [] # Keep only the top-k by primary numeric keys; finalize ordering with full _sort_key candidates = heapq.nlargest(k, results, key=_build_finding_score_tuple) candidates.sort(key=_build_finding_sort_key) return candidates # ---- Finding normalization helpers ---- # Infection vectors whose narrative should take precedence when available INFECTION_RISK_VECTORS = { "botnet_infections", "spam_propagation", "malware_servers", "unsolicited_comm", "potentially_exploited", } def _determine_finding_label( item: dict[str, Any], details: dict[str, Any] ) -> str | None: """Choose a finding label from details.name/display_name or risk_vector_label.""" if isinstance(details.get("name"), str): return details.get("name") if isinstance(details.get("display_name"), str): return details.get("display_name") rv_label = item.get("risk_vector_label") return rv_label if isinstance(rv_label, str) else None def _compose_base_details_text(details: dict[str, Any]) -> str | None: """ Build the base details text from display_name/description/ searchable_details/infection.family. """ display_name: str | None = ( details.get("display_name") if isinstance(details.get("display_name"), str) else None ) long_desc: str | None = ( details.get("description") if isinstance(details.get("description"), str) else None ) if display_name and long_desc: return f"{display_name} — {long_desc}" if long_desc: return long_desc if display_name: return display_name searchable_details = details.get("searchable_details") if isinstance(searchable_details, str): return searchable_details inf = details.get("infection") if isinstance(inf, dict) and isinstance(inf.get("family"), str): return f"Infection: {inf['family']}" return None def _find_first_remediation_text(details: dict[str, Any]) -> str | None: """Return the first available remediation hint text if present.""" rem_list = ( details.get("remediations") if isinstance(details.get("remediations"), list) else [] ) for rem in rem_list or []: if isinstance(rem, dict): text = ( rem.get("help_text") or rem.get("remediation_tip") or rem.get("message") ) if isinstance(text, str) and text: return text return None def _normalize_detected_service_summary(text: str, remediation_hint: str | None) -> str: """Rewrite 'Detected service: ...' text to include a concise remediation hint when available.""" if not remediation_hint: return text try: after = text.split(":", 1)[1].strip() service = after.split(",", 1)[0].strip() return f"Detected service: {service} — {remediation_hint}" except Exception: return f"{text} — {remediation_hint}" if remediation_hint not in text else text def _append_remediation_hint( text: str | None, remediation_hint: str | None ) -> str | None: """Append remediation hint to text, preserving punctuation and avoiding duplication.""" if not remediation_hint: return text if isinstance(text, str): if remediation_hint in text: return text if text.endswith((".", "!", "?")): return f"{text} {remediation_hint}" return f"{text}. {remediation_hint}" return remediation_hint def _apply_infection_narrative_preference( text: str | None, risk_vector: Any, details: dict[str, Any] ) -> str | None: """Prefer infection narrative for infection vectors when description/family are present.""" if not isinstance(risk_vector, str) or risk_vector not in INFECTION_RISK_VECTORS: return text inf = details.get("infection") if not isinstance(inf, dict): return text family = inf.get("family") if isinstance(inf.get("family"), str) else None desc_val = inf.get("description") inf_desc = desc_val.strip() if isinstance(desc_val, str) else None if family and inf_desc: return f"Infection: {family} — {inf_desc}" if inf_desc: if text and inf_desc not in (text or ""): return f"{text} — {inf_desc}" return inf_desc or text return text def _determine_primary_port(details: dict[str, Any]) -> int | None: """Return a port from details.dest_port or the first of details.port_list.""" dest_port = details.get("dest_port") if isinstance(dest_port, int): return dest_port ports = details.get("port_list") if isinstance(ports, list) and ports: p0 = ports[0] if isinstance(p0, int): return p0 return None def _determine_primary_asset( item: dict[str, Any], details: dict[str, Any] ) -> str | None: """Choose an asset from evidence_key, then details.assets[0] (+port), then observed_ips[0].""" asset: str | None = ( item.get("evidence_key") if isinstance(item.get("evidence_key"), str) else None ) if asset: return asset assets = details.get("assets") if isinstance(assets, list) and assets: first = assets[0] if isinstance(first, dict) and isinstance(first.get("asset"), str): port = _determine_primary_port(details) return f"{first['asset']}:{port}" if port else first["asset"] observed = details.get("observed_ips") if isinstance(observed, list) and observed: ip0 = observed[0] if isinstance(ip0, str): return ip0 return None def _normalize_finding_entry(item: dict[str, Any]) -> dict[str, Any]: """Normalize one API finding item into the compact summary shape used in outputs.""" raw_details = item.get("details") details_obj: dict[str, Any] = raw_details if isinstance(raw_details, dict) else {} finding_label = _determine_finding_label(item, details_obj) text = _compose_base_details_text(details_obj) remediation = _find_first_remediation_text(details_obj) if isinstance(text, str) and text.startswith("Detected service:") and remediation: text = _normalize_detected_service_summary(text, remediation) else: text = _append_remediation_hint(text, remediation) text = _apply_infection_narrative_preference( text, item.get("risk_vector"), details_obj ) asset = _determine_primary_asset(item, details_obj) first_seen_raw = item.get("first_seen") # BitSight's Finding schema (apis/v1/components/schemas.json) does not require # first_seen/last_seen, so the fields may be absent entirely in the payload. # When the source omits them we surface ``null`` in our normalized output to # preserve the key without claiming a timestamp we do not have. first_seen = first_seen_raw if isinstance(first_seen_raw, str) else None last_seen_raw = item.get("last_seen") last_seen = last_seen_raw if isinstance(last_seen_raw, str) else None return { "finding": finding_label, "details": text, "asset": asset, "first_seen": first_seen, "last_seen": last_seen, } def _normalize_top_findings(results: list[dict[str, Any]]) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for item in results or []: if not isinstance(item, dict): continue items.append(_normalize_finding_entry(item)) return items def _default_top_findings_payload(limit: int) -> dict[str, Any]: return { "policy": { "severity_floor": "material", "supplements": [], "max_items": limit, "profile": "strict", }, "count": 0, "findings": [], } def _emit_sorted_preview( ctx: Context, items: list[Any], label: str, *, debug_enabled: bool ) -> None: try: preview_items = heapq.nlargest(15, items, key=_build_finding_score_tuple) preview_items.sort(key=_build_finding_sort_key) preview = [] for i, it in enumerate(preview_items[:15], start=1): preview.append( { "idx": i, "sev_num": _derive_numeric_severity_score(it), "sev_cat": it.get("severity") if isinstance(it, dict) else None, "importance": _derive_asset_importance_score(it), "last_seen": it.get("last_seen") if isinstance(it, dict) else None, "risk_vector": it.get("risk_vector") if isinstance(it, dict) else None, } ) _debug( ctx, f"Sort preview ({label})", preview, debug_enabled=debug_enabled, ) except Exception: pass def _extract_results_from_payload( payload: dict[str, Any], ctx: Context, label: str, *, debug_enabled: bool, ) -> list[Any]: results = payload.get("results") or [] if not isinstance(results, list): return [] _emit_sorted_preview(ctx, results, label, debug_enabled=debug_enabled) return results async def _fetch_and_normalize_findings( call_v1_tool: CallV1Tool, ctx: Context, params: dict[str, Any], limit: int, label: str, *, debug_enabled: bool, ) -> tuple[list[dict[str, Any]], bool]: result = call_v1_tool("getCompaniesFindings", ctx, params) raw = await result if hasattr(result, "__await") else result if not isinstance(raw, dict): return [], False _debug( ctx, f"getCompaniesFindings raw response ({label})", raw, debug_enabled=debug_enabled, ) results = _extract_results_from_payload( raw, ctx, label, debug_enabled=debug_enabled ) top_raw = _select_top_finding_candidates(results, limit) findings = _normalize_top_findings(top_raw) return findings, True @dataclass class _TopFindingsSelection: findings: list[dict[str, Any]] severity_floor: str = "material" profile: str = "strict" supplements: list[str] = field(default_factory=list) max_items: int = 0 def policy(self) -> dict[str, Any]: return { "severity_floor": self.severity_floor, "supplements": self.supplements, "max_items": self.max_items, "profile": self.profile, } def _normalize_top_finding_limit(max_findings: int) -> int: if isinstance(max_findings, int) and max_findings > 0: return max_findings return DEFAULT_MAX_FINDINGS async def _request_top_findings( call_v1_tool: CallV1Tool, ctx: Context, params: dict[str, Any], limit: int, label: str, *, debug_enabled: bool, ) -> list[dict[str, Any]] | None: findings, ok = await _fetch_and_normalize_findings( call_v1_tool, ctx, params, limit, label, debug_enabled=debug_enabled, ) if not ok: return None return findings[:limit] async def _build_top_findings_selection( call_v1_tool: CallV1Tool, ctx: Context, base_params: dict[str, Any], limit: int, *, debug_enabled: bool, ) -> _TopFindingsSelection | None: strict_findings = await _request_top_findings( call_v1_tool, ctx, base_params, limit, "strict", debug_enabled=debug_enabled, ) if strict_findings is None: return None selection = _TopFindingsSelection( findings=list(strict_findings), max_items=limit, ) if len(selection.findings) >= 3: return selection selection.profile = "relaxed" selection.severity_floor = DEFAULT_SEVERITY_FLOOR relaxed_params = dict(base_params) relaxed_params["severity_category"] = "severe,material,moderate" relaxed_findings = await _request_top_findings( call_v1_tool, ctx, relaxed_params, limit, "relaxed", debug_enabled=debug_enabled, ) if relaxed_findings: selection.findings = list(relaxed_findings) if len(selection.findings) >= 3: return selection web_params = dict(relaxed_params) web_params["risk_vector"] = "web_appsec" web_findings = await _request_top_findings( call_v1_tool, ctx, web_params, limit, "web_appsec", debug_enabled=debug_enabled, ) if web_findings: needed = max(0, limit - len(selection.findings)) if needed > 0: selection.findings.extend(web_findings[:needed]) selection.profile = "relaxed+web_appsec" selection.supplements = ["web_appsec"] return selection async def _assemble_top_findings_section( call_v1_tool: CallV1Tool, ctx: Context, guid: str, risk_vector_filter: str, max_findings: int, *, debug_enabled: bool, ) -> dict[str, Any]: limit = _normalize_top_finding_limit(max_findings) params = { "guid": guid, "affects_rating": True, "risk_vector": risk_vector_filter, "severity_category": "severe,material", # Intentionally omit server-side sort/limit; sort & cap locally # Request only used fields to reduce payload size while preserving help_text "fields": ( "severity,details,evidence_key,assets,risk_vector," "risk_vector_label,first_seen,last_seen" ), } selection = await _build_top_findings_selection( call_v1_tool, ctx, params, limit, debug_enabled=debug_enabled, ) if selection is None: return _default_top_findings_payload(limit) for idx, entry in enumerate(selection.findings, start=1): if isinstance(entry, dict): entry["top"] = idx _debug( ctx, "Normalized top findings", selection.findings, debug_enabled=debug_enabled, ) return { "policy": selection.policy(), "count": len(selection.findings), "findings": selection.findings, } def _debug(ctx: Context, message: str, obj: Any, *, debug_enabled: bool) -> None: """Emit a structured debug log if DEBUG env var is enabled.""" try: if not debug_enabled: return None try: pretty = json.dumps(obj, indent=2, ensure_ascii=False) except Exception: pretty = str(obj) try: loop = asyncio.get_running_loop() except RuntimeError: return None loop.create_task(ctx.info(f"{message}: {pretty}")) except Exception: return None return None def _top_findings_unavailable_payload() -> TopFindings: return TopFindings.model_validate( { "policy": { "severity_floor": "material", "supplements": [], "max_items": 0, "profile": "unavailable", }, "count": 0, "findings": [], } ) async def _retrieve_top_findings_payload( call_v1_tool: CallV1Tool, ctx: Context, guid: str, effective_filter: str, effective_findings: int, *, debug_enabled: bool, ) -> TopFindings: try: payload = await _assemble_top_findings_section( call_v1_tool, ctx, guid, effective_filter, effective_findings, debug_enabled=debug_enabled, ) return TopFindings.model_validate(payload) except Exception as exc: # pragma: no cover - defensive log await ctx.warning(f"Failed to fetch top findings: {exc}") return _top_findings_unavailable_payload() async def _fetch_company_profile_dict( call_v1_tool: CallV1Tool, ctx: Context, guid: str ) -> dict[str, Any]: """Fetch and validate the company profile object from BitSight v1.""" company = await call_v1_tool("getCompany", ctx, {"guid": guid}) if not isinstance(company, dict): raise ValueError("Unexpected response format from BitSight company endpoint") return company def _summarize_current_rating(company: dict[str, Any]) -> tuple[Any, Any]: """Return (value, color) tuple for the company's current rating.""" value = company.get("current_rating") return value, _rating_color(value) def _calculate_rating_trend_summaries( company: dict[str, Any], ) -> tuple[TrendSummary, TrendSummary]: """Calculate 8-week and 1-year rating trends from the ratings series.""" raw_ratings = company.get("ratings", []) weekly_series = _aggregate_ratings(raw_ratings, horizon_days=56, mode="weekly") yearly_series = _aggregate_ratings(raw_ratings, horizon_days=365, mode="monthly") weekly = TrendSummary.model_validate(_compute_trend(weekly_series)) yearly = TrendSummary.model_validate(_compute_trend(yearly_series)) return weekly, yearly def _build_rating_legend_entries() -> list[RatingLegendEntry]: return [ RatingLegendEntry(color="red", min=250, max=629), RatingLegendEntry(color="yellow", min=630, max=739), RatingLegendEntry(color="green", min=740, max=900), ] def _extract_policy_profile(top_findings_payload: Any) -> str | None: if isinstance(top_findings_payload, TopFindings): return top_findings_payload.policy.profile if isinstance(top_findings_payload, dict): policy = top_findings_payload.get("policy") if isinstance(policy, dict): profile = policy.get("profile") if isinstance(profile, str): return profile return None async def _build_rating_payload( call_v1_tool: CallV1Tool, ctx: Context, guid: str, effective_filter: str, effective_findings: int, logger: BoundLogger, *, debug_enabled: bool, ) -> CompanyRatingResponse: log_rating_event(logger, "fetch_start", ctx=ctx, company_guid=guid) company = await _fetch_company_profile_dict(call_v1_tool, ctx, guid) current_value, color = _summarize_current_rating(company) weekly_trend, yearly_trend = _calculate_rating_trend_summaries(company) top_findings = await _retrieve_top_findings_payload( call_v1_tool, ctx, guid, effective_filter, effective_findings, debug_enabled=debug_enabled, ) primary_domain = company.get("primary_domain") or company.get("display_url") or "" response = CompanyRatingResponse( name=company.get("name", ""), domain=primary_domain, current_rating=CurrentRating(value=current_value, color=color), trend_8_weeks=weekly_trend, trend_1_year=yearly_trend, top_findings=top_findings, legend=RatingLegend(rating=_build_rating_legend_entries()), ) log_rating_event( logger, "fetch_success", ctx=ctx, company_guid=guid, findings_count=top_findings.count, policy=_extract_policy_profile(top_findings), ) return response def register_company_rating_tool( business_server: FastMCP, call_v1_tool: CallV1Tool, *, logger: BoundLogger, risk_vector_filter: str | None = None, max_findings: int | None = None, default_folder: str | None = None, default_type: str | None = None, debug_enabled: bool = False, ) -> FunctionTool: effective_filter = ( risk_vector_filter.strip() if isinstance(risk_vector_filter, str) and risk_vector_filter.strip() else DEFAULT_RISK_VECTOR_FILTER ) effective_findings = ( max_findings if isinstance(max_findings, int) and max_findings > 0 else DEFAULT_MAX_FINDINGS ) @business_server.tool(output_schema=COMPANY_RATING_OUTPUT_SCHEMA) async def get_company_rating(ctx: Context, guid: str) -> dict[str, Any]: """Fetch normalized BitSight rating analytics for a company. Parameters - guid: BitSight company GUID from `company_search`. Behavior - Ensures subscription: creates an ephemeral subscription if needed; if already subscribed, it does not unsubscribe. Ephemeral subs are cleaned up after data retrieval. Returns - { "name": str, "domain": str, "current_rating": {"value": float|int, "color": "red|yellow|green"}, "trend_8_weeks": {"direction": str, "change": float}, "trend_1_year": {"direction": str, "change": float}, "top_findings": { "policy": { "severity_floor": "material" | "moderate", # minimum severity included (always includes 'severe') "supplements": ["web_appsec"] | [], # vectors appended after fallback (kept at the end) "max_items": 5 | 10, # cap used for this response "profile": "strict" | "relaxed" | "relaxed+web_appsec" # human-readable summary }, "count": int, "findings": [ { "top": int, "finding": str, "details": str, "asset": str, "first_seen": str, "last_seen": str } ] }, "legend": {"rating": [{"color": str, "min": int, "max": int}, ...]} } Output semantics - current_rating.value: Numeric BitSight rating on a 250–900 scale (higher is better). May be null if unavailable. - current_rating.color: Traffic-light bucket derived from value: red (250–629), yellow (630–739), green (740–900). - trend_8_weeks / trend_1_year: {direction, change} - direction ∈ {up, slightly up, stable, slightly down, down} or "insufficient data" - change is the approximate rating delta over the window (float) - if insufficient data points (<2), direction is "insufficient data" and change is 0.0 - top_findings: The top findings impacting the rating (compact summary per finding). - policy: - severity_floor: "material" (includes severe+material) or "moderate" (includes severe+material+moderate). - supplements: ["web_appsec"] when fallback was needed; otherwise []. Appended items come last. - max_items: Configured `max_findings` (default 10). When web-appsec padding is applied, the list remains capped at this value. - profile: quick summary: "strict" | "relaxed" | "relaxed+web_appsec". - Behavior: Start strict (severe,material). If <3 items, relax to include 'moderate'. If still <3, append from Web Application Security until the configured limit is reached (appended findings remain last). - legend.rating: Explicit color thresholds used to compute current_rating.color. Error contract - On failure returns {"error": str} (e.g., subscription could not be ensured or API error). Example (GitHub, Inc.) >>> get_company_rating(guid="e90b389b-0b7e-4722-9411-97d81c8e2bc6") { "name": "GitHub, Inc.", "domain": "github.com", "current_rating": {"value": 740, "color": "green"}, "trend_8_weeks": {"direction": "up", "change": 52.0}, "trend_1_year": {"direction": "stable", "change": 14.3}, "top_findings": {"count": 3, "findings": [ { "top": 1, "finding": "Open Ports", "details": "Detected service: …", "asset": "…", "first_seen": "…", "last_seen": "…" }, { "top": 2, "finding": "Patching Cadence", "details": "CVE-… — …", "asset": "…", "first_seen": "…", "last_seen": "…" } ]}, "legend": {"rating": [ {"color": "red", "min": 250, "max": 629}, {"color": "yellow", "min": 630, "max": 739}, {"color": "green", "min": 740, "max": 900} ]} } """ await ctx.info(f"Getting rating analytics for company: {guid}") # 1) Ensure access via subscription attempt: SubscriptionAttempt = await create_ephemeral_subscription( call_v1_tool, ctx, guid, logger=logger, default_folder=default_folder, subscription_type=default_type, debug_enabled=debug_enabled, ) if not attempt.success: msg = attempt.message or ( "Unable to access company rating; subscription required and could not be created" ) await ctx.error(msg) return {"error": msg} auto_subscribed = attempt.created result_model: CompanyRatingResponse try: result_model = await _build_rating_payload( call_v1_tool, ctx, guid, effective_filter, effective_findings, logger, debug_enabled=debug_enabled, ) except Exception as exc: # pragma: no cover - defensive log error_message = f"Failed to build rating payload: {exc}" await ctx.error(error_message) log_rating_event( logger, "fetch_failure", ctx=ctx, company_guid=guid, error=str(exc), ) result_model = CompanyRatingResponse(error=error_message) finally: if auto_subscribed: await cleanup_ephemeral_subscription( call_v1_tool, ctx, guid, debug_enabled=debug_enabled, ) return result_model.to_payload() return get_company_rating __all__ = ["register_company_rating_tool"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boecht/bitsight-community-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server