wait_for_request
Poll for new HTTP, DNS, SMTP, or TCP requests until timeout to capture and inspect incoming network traffic.
Instructions
Poll for a new request until timeout.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| request_type | No | ||
| timeout_seconds | No | ||
| poll_interval_seconds | No | ||
| include_raw | No | ||
| include_body | No | ||
| max_bytes | No |
Implementation Reference
- src/requestrepo_mcp/server.py:94-148 (handler)The core implementation of wait_for_request in RequestrepoMCPService class. This method polls for new requests until timeout, tracking seen request IDs and returning the most recent matching request or timeout status.
def wait_for_request( self, *, request_type: RequestType | None = None, timeout_seconds: int | None = None, poll_interval_seconds: float = 1.0, include_raw: bool = False, include_body: bool = False, max_bytes: int | None = None, ) -> dict[str, Any]: if poll_interval_seconds <= 0: raise ValueError("poll_interval_seconds must be > 0.") resolved_timeout = self.config.default_timeout_seconds if timeout_seconds is None else timeout_seconds if resolved_timeout < 0: raise ValueError("timeout_seconds must be >= 0.") resolved_max_bytes = self._resolved_max_bytes(max_bytes) client = self._client() deadline = time.monotonic() + resolved_timeout seen_ids = {request.id for request in client.list_requests(limit=100, offset=0)} while time.monotonic() <= deadline: requests = client.list_requests(limit=100, offset=0) new_requests = [request for request in requests if request.id not in seen_ids] seen_ids.update(request.id for request in requests) if request_type is not None: new_requests = [request for request in new_requests if request.type == request_type] if new_requests: selected = max(new_requests, key=lambda request: request.date) return { "found": True, "timeout": False, "request_type": request_type, "request": serialize_request( selected, include_raw=include_raw, include_body=include_body, max_bytes=resolved_max_bytes, ), } sleep_for = min(poll_interval_seconds, max(0.0, deadline - time.monotonic())) if sleep_for <= 0: break time.sleep(sleep_for) return { "found": False, "timeout": True, "request_type": request_type, "request": None, } - src/requestrepo_mcp/server.py:363-380 (registration)MCP tool registration using @mcp.tool() decorator in create_mcp_server function. Exposes wait_for_request as an MCP tool with default parameter values and delegates to the service method.
@mcp.tool() def wait_for_request( request_type: RequestType | None = None, timeout_seconds: int = 30, poll_interval_seconds: float = 1.0, include_raw: bool = False, include_body: bool = False, max_bytes: int = 65536, ) -> dict[str, Any]: """Poll for a new request until timeout.""" return resolved_service.wait_for_request( request_type=request_type, timeout_seconds=timeout_seconds, poll_interval_seconds=poll_interval_seconds, include_raw=include_raw, include_body=include_body, max_bytes=max_bytes, ) - src/requestrepo_mcp/schemas.py:9-9 (schema)RequestType type definition (Literal['http', 'dns', 'smtp', 'tcp']) used as the request_type parameter for wait_for_request tool to filter requests by type.
RequestType = Literal["http", "dns", "smtp", "tcp"] - serialize_request function used by wait_for_request to convert request objects into dictionaries with optional body and raw data inclusion based on include_body and include_raw flags.
def serialize_request( request: HttpRequest | DnsRequest | SmtpRequest | TcpRequest, *, include_raw: bool, include_body: bool, max_bytes: int, ) -> dict[str, Any]: payload: dict[str, Any] = { "id": request.id, "type": request.type, "uid": request.uid, "ip": request.ip, "country": request.country, "date_unix": request.date, "date_iso": _iso_from_unix(request.date), } if isinstance(request, HttpRequest): payload.update( { "method": request.method, "path": request.path, "http_version": request.http_version, "headers": request.headers, } ) if include_body and request.body is not None: payload["body"] = bytes_envelope(request.body, max_bytes=max_bytes) elif isinstance(request, DnsRequest): payload.update( { "port": request.port, "query_type": request.query_type, "domain": request.domain, "reply": request.reply, } ) elif isinstance(request, SmtpRequest): payload.update( { "command": request.command, "data": request.data, "subject": request.subject, "from_addr": request.from_addr, "to": request.to, "cc": request.cc, "bcc": request.bcc, } ) elif isinstance(request, TcpRequest): payload.update({"port": request.port}) if include_raw: payload["raw"] = bytes_envelope(request.raw, max_bytes=max_bytes) return payload