get-streaming-results
Retrieve real-time mortgage rate updates using a session ID, with customizable output formats, polling intervals, and stability checks for accurate results.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| format | No | Output format | markdown |
| maxAttempts | No | Maximum number of polling attempts | |
| pollInterval | No | Polling interval in milliseconds | |
| sessionId | Yes | Session ID from get-mortgage-rates | |
| stopOnStable | No | Stop polling when record count stabilizes |
Implementation Reference
- ratespot_mcp_server_streaming.ts:387-503 (handler)Main handler for 'get-streaming-results' tool. Retrieves active session, performs polling if needed, formats results (markdown/CSV/JSON), handles CSV file export with download URL, manages status and errors.async (params) => { try { const session = activeSessions.get(params.sessionId); if (!session) { return { content: [{ type: "text", text: `Session ${params.sessionId} not found or expired` }], isError: true }; } // Initialize polling metadata if not present if (!session.metadata.pollCount) { session.metadata.pollCount = 0; session.metadata.previousCount = 0; session.metadata.stableCount = 0; } // Update polling metadata session.metadata.pollCount++; const currentCount = session.data.length; const countChanged = currentCount !== session.metadata.previousCount; if (countChanged) { session.metadata.stableCount = 0; } else { session.metadata.stableCount++; } session.metadata.previousCount = currentCount; // Check if we should continue polling const shouldPoll = session.status === 'streaming' && session.metadata.pollCount < params.maxAttempts && (!params.stopOnStable || session.metadata.stableCount < 2); if (shouldPoll) { // Schedule next poll and wait for it await new Promise(resolve => setTimeout(resolve, params.pollInterval)); try { // Fetch additional data with polling flag const queryParams = session.metadata.searchParams; await streamMortgageRates(queryParams, session.id, true); // Return early to trigger another poll if needed return { content: [{ type: "text", text: `๐ **Auto-polling (Attempt ${session.metadata.pollCount})**\n\n` + `Current products: ${session.data.length}\n` + `Previous count: ${session.metadata.previousCount}\n` + `Status: ${session.status}\n\n` + `โณ Still receiving data. Polling will continue automatically...` }] }; } catch (error) { console.error('Polling error:', error); session.status = 'error'; session.metadata.error = error instanceof Error ? error.message : String(error); } } let response = `๐ **Streaming Results**\n\n`; response += `Status: ${session.status}\n`; response += `Products: ${session.data.length} (Previous: ${session.metadata.previousCount})\n`; response += `Poll Count: ${session.metadata.pollCount}\n`; response += `Stable Count: ${session.metadata.stableCount}\n`; response += `Last update: ${session.metadata.lastUpdate.toLocaleString()}\n\n`; if (session.status === 'error') { response += `โ Error: ${session.metadata.error}\n\n`; } if (session.data.length > 0) { if (params.format === 'csv') { const csvData = formatResults(session, 'csv'); const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const fileName = `mortgage_rates_${timestamp}.csv`; const filePath = path.join(DATA_DIR, fileName); await fs.promises.writeFile(filePath, csvData, 'utf8'); const manager = await getFileServerManager(); await manager.ensureServerRunning(DATA_DIR); const downloadUrl = await manager.getDownloadUrl(fileName); response += `โ CSV file saved\n`; response += `๐ File: ${fileName}\n`; response += `๐ Download: ${downloadUrl}\n`; } else { response += formatResults(session, params.format); } } if (session.status === 'streaming') { response += `\nโณ Still receiving data. Check again for more results.\n`; } return { content: [{ type: "text", text: response }] }; } catch (error) { return { content: [{ type: "text", text: `Error getting results: ${error instanceof Error ? error.message : String(error)}` }], isError: true }; } }
- Input schema using Zod validators for tool parameters including required sessionId and optional formatting/polling options.{ sessionId: z.string().describe("Session ID from get-mortgage-rates"), format: z.enum(["markdown", "csv", "json"]).optional().default("markdown").describe("Output format"), pollInterval: z.number().optional().default(3000).describe("Polling interval in milliseconds"), maxAttempts: z.number().optional().default(10).describe("Maximum number of polling attempts"), stopOnStable: z.boolean().optional().default(true).describe("Stop polling when record count stabilizes") },
- ratespot_mcp_server_streaming.ts:378-504 (registration)Registers the 'get-streaming-results' tool on the MCP server with name, input schema, and handler function.server.tool( "get-streaming-results", { sessionId: z.string().describe("Session ID from get-mortgage-rates"), format: z.enum(["markdown", "csv", "json"]).optional().default("markdown").describe("Output format"), pollInterval: z.number().optional().default(3000).describe("Polling interval in milliseconds"), maxAttempts: z.number().optional().default(10).describe("Maximum number of polling attempts"), stopOnStable: z.boolean().optional().default(true).describe("Stop polling when record count stabilizes") }, async (params) => { try { const session = activeSessions.get(params.sessionId); if (!session) { return { content: [{ type: "text", text: `Session ${params.sessionId} not found or expired` }], isError: true }; } // Initialize polling metadata if not present if (!session.metadata.pollCount) { session.metadata.pollCount = 0; session.metadata.previousCount = 0; session.metadata.stableCount = 0; } // Update polling metadata session.metadata.pollCount++; const currentCount = session.data.length; const countChanged = currentCount !== session.metadata.previousCount; if (countChanged) { session.metadata.stableCount = 0; } else { session.metadata.stableCount++; } session.metadata.previousCount = currentCount; // Check if we should continue polling const shouldPoll = session.status === 'streaming' && session.metadata.pollCount < params.maxAttempts && (!params.stopOnStable || session.metadata.stableCount < 2); if (shouldPoll) { // Schedule next poll and wait for it await new Promise(resolve => setTimeout(resolve, params.pollInterval)); try { // Fetch additional data with polling flag const queryParams = session.metadata.searchParams; await streamMortgageRates(queryParams, session.id, true); // Return early to trigger another poll if needed return { content: [{ type: "text", text: `๐ **Auto-polling (Attempt ${session.metadata.pollCount})**\n\n` + `Current products: ${session.data.length}\n` + `Previous count: ${session.metadata.previousCount}\n` + `Status: ${session.status}\n\n` + `โณ Still receiving data. Polling will continue automatically...` }] }; } catch (error) { console.error('Polling error:', error); session.status = 'error'; session.metadata.error = error instanceof Error ? error.message : String(error); } } let response = `๐ **Streaming Results**\n\n`; response += `Status: ${session.status}\n`; response += `Products: ${session.data.length} (Previous: ${session.metadata.previousCount})\n`; response += `Poll Count: ${session.metadata.pollCount}\n`; response += `Stable Count: ${session.metadata.stableCount}\n`; response += `Last update: ${session.metadata.lastUpdate.toLocaleString()}\n\n`; if (session.status === 'error') { response += `โ Error: ${session.metadata.error}\n\n`; } if (session.data.length > 0) { if (params.format === 'csv') { const csvData = formatResults(session, 'csv'); const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const fileName = `mortgage_rates_${timestamp}.csv`; const filePath = path.join(DATA_DIR, fileName); await fs.promises.writeFile(filePath, csvData, 'utf8'); const manager = await getFileServerManager(); await manager.ensureServerRunning(DATA_DIR); const downloadUrl = await manager.getDownloadUrl(fileName); response += `โ CSV file saved\n`; response += `๐ File: ${fileName}\n`; response += `๐ Download: ${downloadUrl}\n`; } else { response += formatResults(session, params.format); } } if (session.status === 'streaming') { response += `\nโณ Still receiving data. Check again for more results.\n`; } return { content: [{ type: "text", text: response }] }; } catch (error) { return { content: [{ type: "text", text: `Error getting results: ${error instanceof Error ? error.message : String(error)}` }], isError: true }; } } );
- Helper function to format session data into markdown table, CSV, or JSON strings, used by the handler.function formatResults(session: StreamSession, format: string = 'markdown'): string { const products = session.data; // Sort by rate products.sort((a, b) => a.rate - b.rate); if (format === 'markdown') { let output = `# Mortgage Rate Results\n\n`; output += `Found ${products.length} mortgage products\n\n`; output += `| Lender | Rate | APR | Payment | Points | Upfront Costs | Type | Quote |\n`; output += `|---------|------|-----|---------|--------|---------------|------|--------|\n`; for (const product of products) { output += `| ${product.lender} | ${product.rate.toFixed(3)}% | ${product.apr.toFixed(3)}% | $${product.payment.toLocaleString()} | ${product.points.toFixed(3)} | $${product.upfrontCosts.toLocaleString()} | ${product.loanType} | ${product.quoteType} |\n`; } return output; } else if (format === 'csv') { let output = 'Lender,Rate,APR,Payment,Points,Upfront_Costs,Loan_Type,Quote_Type\n'; for (const product of products) { output += `${escapeCSV(product.lender)},${product.rate}%,${product.apr}%,$${product.payment},${product.points},$${product.upfrontCosts},${product.loanType},${product.quoteType}\n`; } return output; } else { return JSON.stringify(products, null, 2); } }
- Helper to fetch and parse streaming mortgage data from Ratespot API SSE endpoint, adds to session, avoids duplicates, used for initial and polling fetches.async function streamMortgageRates(params: any, sessionId: string, isPolling: boolean = false): Promise<void> { const session = activeSessions.get(sessionId); if (!session) { throw new Error('Invalid session ID'); } try { const queryString = new URLSearchParams({ apikey: getApiKey(), ...params, offset: isPolling ? session.data.length.toString() : '0' }).toString(); const url = `${RATESPOT_BASE_URL}/v1/mortgage_products?${queryString}`; const response = await fetch(url, { method: 'GET', headers: { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache' } }); if (!response.ok) { throw new Error(`API request failed: ${response.status} ${response.statusText}`); } const reader = response.body?.getReader(); if (!reader) { throw new Error('No response body available'); } const decoder = new TextDecoder(); let buffer = ''; let newProducts = 0; while (true) { const { done, value } = await reader.read(); if (done) { if (buffer.trim()) { newProducts += processBuffer(buffer, session); } break; } buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.startsWith('data:')) { const product = processSSEData(line.substring(5).trim()); if (product) { // Check for duplicates before adding const isDuplicate = session.data.some(p => p.lender === product.lender && p.rate === product.rate && p.apr === product.apr ); if (!isDuplicate) { session.data.push(product); session.metadata.totalProducts++; session.metadata.lastUpdate = new Date(); session.status = 'streaming'; newProducts++; } } } } } // Only mark as complete if no new products were found during polling if (isPolling && newProducts === 0) { session.status = 'complete'; console.error(`No new products found after ${session.metadata.pollCount} polls. Marking as complete.`); } } catch (error) { session.status = 'error'; session.metadata.error = error instanceof Error ? error.message : String(error); throw error; } }