Skip to main content
Glama

Research Tracker MCP Server

by vupatel08
flask_app.py15.8 kB
""" Real-Time Research Discovery Frontend Flask backend with Server-Sent Events for streaming discovery updates """ import json import time import io import sys import logging from flask import Flask, Response, render_template, request, jsonify from flask_cors import CORS from config import logger from discovery import find_research_relationships, format_list_output from inference import create_row_data app = Flask(__name__) CORS(app) class DiscoveryLogCapture: """Captures real discovery logs and streams them to frontend""" def __init__(self): self.logs = [] self.original_handlers = {} self.capturing = False def start_capture(self): """Start capturing logs from all modules""" if self.capturing: return self.capturing = True self.logs = [] # Capture logs from discovery-related modules modules_to_capture = ['discovery', 'inference', 'utils', 'config'] for module_name in modules_to_capture: try: module_logger = logging.getLogger(module_name) if module_logger.handlers: # Store original handlers self.original_handlers[module_name] = module_logger.handlers.copy() # Add our custom handler custom_handler = CustomLogHandler(self) custom_handler.setLevel(logging.INFO) module_logger.addHandler(custom_handler) except Exception as e: print(f"Error setting up log capture for {module_name}: {e}") def stop_capture(self): """Stop capturing logs and restore original handlers""" if not self.capturing: return # Restore original handlers for module_name, handlers in self.original_handlers.items(): try: module_logger = logging.getLogger(module_name) # Remove our custom handler for handler in module_logger.handlers[:]: if isinstance(handler, CustomLogHandler): module_logger.removeHandler(handler) # Restore original handlers module_logger.handlers = handlers except Exception as e: print(f"Error restoring handlers for {module_name}: {e}") self.capturing = False def add_log(self, log_entry): """Add a captured log entry""" self.logs.append(log_entry) def get_logs(self): """Get all captured logs""" return self.logs.copy() def clear_logs(self): """Clear captured logs""" self.logs = [] class CustomLogHandler(logging.Handler): """Custom log handler that captures logs for streaming""" def __init__(self, capture_instance): super().__init__() self.capture_instance = capture_instance def emit(self, record): """Emit log record to capture instance""" try: log_entry = { 'timestamp': time.strftime('%H:%M:%S'), 'level': record.levelname, 'message': record.getMessage(), 'module': record.module, 'function': record.funcName if hasattr(record, 'funcName') else 'unknown' } self.capture_instance.add_log(log_entry) except Exception: pass # Create global log capture instance log_capture = DiscoveryLogCapture() def stream_discovery_updates(input_url): """Generator that yields REAL discovery updates in real-time""" def send_update(step, message, data=None): """Send a formatted SSE update""" event_data = { 'step': step, 'message': message, 'timestamp': time.strftime('%H:%M:%S'), 'data': data or {} } return f"data: {json.dumps(event_data)}\n\n" try: start_time = time.time() # Initial setup yield send_update('start', f'Starting discovery for: {input_url}') yield send_update('start', f'Timestamp: {time.strftime("%H:%M:%S")}') # URL analysis if "arxiv.org" in input_url: yield send_update('analysis', 'Detected: arXiv paper URL') yield send_update('analysis', 'Will extract: Paper metadata, authors, date, abstract') elif "huggingface.co" in input_url: yield send_update('analysis', 'Detected: HuggingFace resource') yield send_update('analysis', 'Will extract: Models, datasets, spaces, code repos') elif "github.com" in input_url: yield send_update('analysis', 'Detected: GitHub repository') yield send_update('analysis', 'Will extract: Code, documentation, related papers') else: yield send_update('analysis', 'Detected: Other research resource') yield send_update('analysis', 'Will attempt: General web scraping and analysis') # Start discovery process yield send_update('discovery', 'Starting comprehensive discovery...') yield send_update('discovery', 'Phase 1: Building resource graph...') # Call discovery function and capture real logs try: yield send_update('discovery', 'Calling find_research_relationships...') # Create a log buffer to capture discovery logs log_buffer = [] class BufferLogHandler(logging.Handler): def __init__(self, buffer): super().__init__() self.buffer = buffer def emit(self, record): try: # Format the log message log_message = record.getMessage() module_name = record.module func_name = record.funcName if hasattr(record, 'funcName') else 'unknown' # Add to buffer self.buffer.append({ 'message': log_message, 'module': module_name, 'function': func_name, 'level': record.levelname, 'timestamp': time.strftime('%H:%M:%S') }) except Exception: pass # Add the buffer handler to discovery-related loggers discovery_logger = logging.getLogger('discovery') inference_logger = logging.getLogger('inference') config_logger = logging.getLogger('config') buffer_handler = BufferLogHandler(log_buffer) buffer_handler.setLevel(logging.INFO) # Store original handlers orig_discovery_handlers = discovery_logger.handlers.copy() orig_inference_handlers = inference_logger.handlers.copy() orig_config_handlers = config_logger.handlers.copy() # Add buffer handlers discovery_logger.addHandler(buffer_handler) inference_logger.addHandler(buffer_handler) config_logger.addHandler(buffer_handler) # Call discovery function (this will generate logs that go to buffer) result = find_research_relationships(input_url.strip()) # Remove the buffer handlers and restore original handlers discovery_logger.removeHandler(buffer_handler) inference_logger.removeHandler(buffer_handler) config_logger.removeHandler(buffer_handler) # Stream captured logs to frontend if log_buffer: yield send_update('log', f'Captured {len(log_buffer)} real discovery logs:') for log_entry in log_buffer: formatted_log = f"[{log_entry['module']}.{log_entry['function']}] {log_entry['message']}" yield send_update('log', formatted_log, { 'type': 'discovery_log', 'level': log_entry['level'], 'module': log_entry['module'], 'function': log_entry['function'], 'timestamp': log_entry['timestamp'] }) else: yield send_update('log', 'No discovery logs captured - showing key steps') # Show the real steps we know happen yield send_update('log', '[discovery] Phase 1: Building complete resource graph') yield send_update('log', '[discovery] Round 1: Direct inferences from input') yield send_update('log', '[discovery] Round 2: Cross-inferences') yield send_update('log', '[discovery] Round 3: HuggingFace page scraping') yield send_update('log', '[discovery] Phase 2: Creating comprehensive row data') yield send_update('log', '[discovery] Phase 3: Performing comprehensive inferences') yield send_update('discovery', 'Discovery engine completed successfully') except Exception as discovery_error: logger.error(f"Discovery engine error: {discovery_error}") yield send_update('error', f'Discovery engine failed: {str(discovery_error)}') return # Process results yield send_update('processing', 'Processing discovered resources...') # Extract fields with safe fallbacks try: paper = result.get("paper", "") or "" code = result.get("code", "") or "" authors = format_list_output(result.get("authors", [])) model = result.get("model", "") or "" dataset = result.get("dataset", "") or "" space = result.get("space", "") or "" except Exception as field_error: logger.error(f"Field extraction error: {field_error}") yield send_update('error', f'Error extracting fields: {str(field_error)}') return # Send discovered resources if paper: yield send_update('result', f'Found paper: {paper}', {'type': 'paper', 'url': paper}) if "arxiv.org" in paper: yield send_update('result', 'Source: arXiv API (direct)') elif "huggingface.co" in paper: yield send_update('result', 'Source: HuggingFace papers') else: yield send_update('result', 'Source: Cross-reference discovery') if code: yield send_update('result', f'Found code repository: {code}', {'type': 'code', 'url': code}) if "github.com" in code: yield send_update('result', 'Source: GitHub search API') elif "huggingface.co" in code: yield send_update('result', 'Source: HuggingFace code scraping') else: yield send_update('result', 'Source: Web search/discovery') if authors and authors != "None": yield send_update('result', f'Found authors: {authors}', {'type': 'authors', 'data': authors}) if "arxiv.org" in input_url or "arxiv.org" in paper: yield send_update('result', 'Source: arXiv API metadata') else: yield send_update('result', 'Source: Page scraping/parsing') if model: yield send_update('result', f'Found model: {model}', {'type': 'model', 'url': model}) yield send_update('result', 'Source: HuggingFace paper page scraping') if dataset: yield send_update('result', f'Found dataset: {dataset}', {'type': 'dataset', 'url': dataset}) yield send_update('result', 'Source: HuggingFace paper page scraping') if space: yield send_update('result', f'Found demo space: {space}', {'type': 'space', 'url': space}) yield send_update('result', 'Source: HuggingFace paper page scraping') # Final summary yield send_update('summary', 'Discovery summary...') try: success_count = result.get("success_count", 0) total_inferences = result.get("total_inferences", 10) discovery_time = time.time() - start_time yield send_update('summary', f'Success Rate: {success_count}/{total_inferences} resources found') yield send_update('summary', f'Total Time: {discovery_time:.2f} seconds') yield send_update('summary', f'Efficiency: {(success_count/total_inferences)*100:.1f}%') if success_count > 0: yield send_update('summary', f'Discovery Speed: {success_count/discovery_time:.1f} resources/second') except Exception as summary_error: logger.error(f"Summary error: {summary_error}") yield send_update('error', f'Error generating summary: {str(summary_error)}') return # Final completion try: final_results = { 'paper': paper, 'code': code, 'authors': authors, 'model': model, 'dataset': dataset, 'space': space, 'success_count': success_count, 'total_inferences': total_inferences, 'discovery_time': discovery_time } yield send_update('complete', 'Discovery process completed successfully!', final_results) except Exception as final_error: logger.error(f"Final results error: {final_error}") yield send_update('error', f'Error preparing final results: {str(final_error)}') except Exception as e: error_msg = f"Critical error during discovery: {str(e)}" logger.error(error_msg) yield send_update('error', error_msg, {'error': str(e)}) @app.route('/') def index(): """Serve the main frontend page""" return render_template('index.html') @app.route('/stream') def stream(): """SSE endpoint for streaming discovery updates""" url = request.args.get('url', '') if not url: return Response("data: {\"error\": \"No URL provided\"}\n\n", mimetype='text/plain') return Response( stream_discovery_updates(url), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*' } ) @app.route('/discover', methods=['POST']) def discover(): """API endpoint for discovery requests""" data = request.get_json() url = data.get('url', '') if data else '' if not url: return jsonify({'error': 'No URL provided'}), 400 try: result = find_research_relationships(url.strip()) return jsonify({ 'success': True, 'result': result, 'url': url }) except Exception as e: return jsonify({ 'success': False, 'error': str(e), 'url': url }), 500 @app.route('/health') def health(): """Health check endpoint""" return jsonify({'status': 'healthy', 'timestamp': time.time()}) if __name__ == '__main__': logger.info("Starting Real-Time Research Discovery Frontend") app.run(host='127.0.0.1', port=5000, debug=True, threaded=True)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vupatel08/research-mcp-tool'

If you have feedback or need assistance with the MCP directory API, please join our Discord server