get_hight_speed_stream
Retrieve high-speed solar wind stream data from NASA for space weather analysis. Specify date ranges to access historical HSS information for research and monitoring purposes.
Instructions
Get high speed stream (HSS) data.
Args: start_date: Start date in YYYY-MM-DD format. Defaults to 30 days before current date. end_date: End date in YYYY-MM-DD format. Defaults to current date.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| start_date | No | ||
| end_date | No |
Implementation Reference
- src/nasa_mcp/server.py:601-633 (handler)The main handler function for the 'get_hight_speed_stream' tool. It makes an API request to NASA's DONKI/HSS endpoint, handles responses and errors, and uses the 'format_donki_results' helper to format the output.async def get_hight_speed_stream(start_date: str = None, end_date: str = None) -> str: # Note: High* Speed Stream """Get high speed stream (HSS) data. Args: start_date: Start date in YYYY-MM-DD format. Defaults to 30 days before current date. end_date: End date in YYYY-MM-DD format. Defaults to current date. """ params = {} if start_date: params["startDate"] = start_date if end_date: params["endDate"] = end_date url = f"{NASA_API_BASE}/DONKI/HSS" data = await make_nasa_request(url, params) if not data: return "Could not retrieve HSS data due to a connection error." # Check for error response (must be a dictionary) if isinstance(data, dict) and "error" in data: return f"API Error: {data.get('error')} - Details: {data.get('details', 'N/A')}" if isinstance(data, dict) and data.get("binary_content"): return f"Received unexpected binary content from HSS API. URL: {data.get('url')}" try: # Ensure data is a list for format_donki_results if not isinstance(data, list): logger.error(f"Unexpected non-list response from HSS API: {data}") return "Received unexpected data format from HSS API." return format_donki_results(data, "High Speed Streams", "hssID") except Exception as e: logger.error(f"Error processing HSS data: {str(e)}") return f"Error processing high speed stream data: {str(e)}"
- src/nasa_mcp/server.py:318-365 (helper)Helper function used by get_hight_speed_stream (and other DONKI tools) to format the list of results into a readable string, limiting display to 10 items.def format_donki_results(data: list, title_prefix: str, id_key: str) -> str: if not data: return f"No {title_prefix.lower()} data for the specified period." result = [f"{title_prefix} found: {len(data)}"] display_limit = 10 count = 0 for item in data: if count >= display_limit: result.append(f"n... and {len(data) - display_limit} more entries.") break result.append(f"nID: {item.get(id_key, 'Unknown')}") # Add common fields if they exist if 'startTime' in item: result.append(f"Start Time: {item.get('startTime', 'Unknown')}") if 'eventTime' in item: result.append(f"Event Time: {item.get('eventTime', 'Unknown')}") if 'sourceLocation' in item: result.append(f"Source Location: {item.get('sourceLocation', 'Unknown')}") if 'note' in item: result.append(f"Note: {item.get('note', 'N/A')}") if 'link' in item: result.append(f"Link: {item.get('link', 'N/A')}") # Specific fields for different DONKI types can be added here if needed # Example for CME: if id_key == 'activityID' and 'cmeAnalyses' in item: analyses = item.get('cmeAnalyses', []) if analyses: result.append(" Analyses:") for analysis in analyses[:2]: # Limit analyses shown result.append(f" - Time: {analysis.get('time21_5', 'N/A')}, Speed: {analysis.get('speed', 'N/A')} km/s, Type: {analysis.get('type', 'N/A')}") # Example for GST: if id_key == 'gstID' and 'allKpIndex' in item: kp_indices = item.get('allKpIndex', []) if kp_indices: result.append(" Kp Indices (first 2):") for kp in kp_indices[:2]: result.append(f" - Time: {kp.get('observedTime', 'N/A')}, Index: {kp.get('kpIndex', 'N/A')}") # Linked Events linked_events = item.get('linkedEvents', []) if linked_events: result.append(" Related event IDs (first 5):") result.append(" " + ", ".join([le.get('activityID', 'N/A') for le in linked_events[:5]])) result.append("-" * 40) count += 1 return "n".join(result)
- src/nasa_mcp/server.py:601-601 (registration)The @mcp.tool() decorator registers the get_hight_speed_stream function as an MCP tool.async def get_hight_speed_stream(start_date: str = None, end_date: str = None) -> str: # Note: High* Speed Stream
- src/nasa_mcp/server.py:28-83 (helper)Shared helper function used by all tools to make HTTP requests to NASA APIs, handling JSON, binary images, and errors uniformly.async def make_nasa_request(url: str, params: dict = None) -> Union[dict[str, Any], List[Any], None]: """Make a request to the NASA API with proper error handling. Handles both JSON and binary (image) responses. """ logger.info(f"Making request to: {url} with params: {params}") if params is None: params = {} # Ensure API key is included in parameters if "api_key" not in params: params["api_key"] = API_KEY async with httpx.AsyncClient() as client: try: response = await client.get(url, params=params, timeout=30.0, follow_redirects=True) response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) content_type = response.headers.get("Content-Type", "").lower() if "application/json" in content_type: try: return response.json() except json.JSONDecodeError as json_err: logger.error(f"JSON decode error for URL {response.url}: {json_err}") logger.error(f"Response text: {response.text[:500]}") # Log beginning of text return {"error": "Failed to decode JSON response", "details": str(json_err)} elif content_type.startswith("image/"): logger.info(f"Received binary image content ({content_type}) from {response.url}") # Return a dictionary indicating binary content was received return { "binary_content": True, "content_type": content_type, "url": str(response.url) # Return the final URL after redirects } else: # Handle other unexpected content types logger.warning(f"Unexpected content type '{content_type}' received from {response.url}") return {"error": f"Unexpected content type: {content_type}", "content": response.text[:500]} except httpx.HTTPStatusError as http_err: logger.error(f"HTTP error occurred: {http_err} - {http_err.response.status_code} for URL {http_err.request.url}") try: # Try to get more details from response body if possible error_details = http_err.response.json() except Exception: error_details = http_err.response.text[:500] return {"error": f"HTTP error: {http_err.response.status_code}", "details": error_details} except httpx.RequestError as req_err: logger.error(f"Request error occurred: {req_err} for URL {req_err.request.url}") return {"error": "Request failed", "details": str(req_err)} except Exception as e: logger.error(f"An unexpected error occurred: {str(e)}") return {"error": "An unexpected error occurred", "details": str(e)}