Skip to main content
Glama

Stealthee MCP

by rainbowgore
stealth_server.py15.8 kB
#!/usr/bin/env python3 """ FastMCP Server for Stealth Launch Radar Smithery-compatible MCP server using FastMCP """ import asyncio import json import logging import os import sys from datetime import datetime from typing import Dict, List, Any, Optional import traceback from mcp.server.fastmcp import Context, FastMCP from smithery.decorators import smithery from pydantic import BaseModel, Field import aiohttp import sqlite3 from bs4 import BeautifulSoup import openai from dotenv import load_dotenv import re # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stderr)] ) logger = logging.getLogger(__name__) class ConfigSchema(BaseModel): tavily_api_key: str = Field(..., description="Tavily API key for web search") openai_api_key: str = Field(..., description="OpenAI API key for AI scoring") nimble_api_key: Optional[str] = Field(None, description="Nimble API key for parsing") slack_webhook_url: Optional[str] = Field(None, description="Slack webhook for alerts") @smithery.server(config_schema=ConfigSchema) def create_server(): """Create and return a FastMCP server instance with session config.""" server = FastMCP(name="Stealth Launch Radar") # Initialize database def init_database(): os.makedirs("data", exist_ok=True) conn = sqlite3.connect("data/signals.db") cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS signals ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT, title TEXT, html_excerpt TEXT, changelog TEXT, pricing TEXT, score REAL, confidence TEXT, reasoning TEXT, created_at TEXT ) """) conn.commit() conn.close() init_database() @server.tool() async def web_search(ctx: Context, query: str, num_results: int = 3) -> str: """Search the web for information about stealth product launches.""" config = ctx.session_config try: async with aiohttp.ClientSession() as session: headers = { "Authorization": f"Bearer {config.tavily_api_key}", "Content-Type": "application/json" } data = { "query": query, "search_depth": "basic", "include_answer": False, "include_raw_content": False, "max_results": num_results } async with session.post("https://api.tavily.com/search", headers=headers, json=data) as response: if response.status == 200: result = await response.json() results = result.get("results", []) formatted_results = [] for item in results: formatted_results.append(f"**{item.get('title', 'No title')}**\n{item.get('content', 'No content')}\nURL: {item.get('url', 'No URL')}\n") return "\n".join(formatted_results) else: return f"❌ Search failed: HTTP {response.status}" except Exception as e: logger.error(f"Web search error: {e}") return f"❌ Search error: {str(e)}" @server.tool() async def url_extract(ctx: Context, url: str, parsing_type: str = "plain_text", wait_time: int = 2) -> str: """Extract and clean text content from a given URL.""" try: await asyncio.sleep(wait_time) # Artificial delay async with aiohttp.ClientSession() as session: async with session.get(url, timeout=30) as response: if response.status == 200: html = await response.text() soup = BeautifulSoup(html, 'html.parser') # Remove scripts and styles for script in soup(["script", "style"]): script.decompose() # Get text and clean it text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk) return f"**URL:** {url}\n**Content Type:** {response.headers.get('content-type', 'unknown')}\n**Extracted Text:**\n{text[:1000]}{'...' if len(text) > 1000 else ''}" else: return f"❌ Failed to fetch URL: HTTP {response.status}" except Exception as e: logger.error(f"URL extraction error: {e}") return f"❌ Extraction error: {str(e)}" @server.tool() async def score_signal(ctx: Context, signal_text: str, signal_title: str = "") -> str: """Score a textual signal for likelihood of being a stealth product launch.""" config = ctx.session_config try: client = openai.OpenAI(api_key=config.openai_api_key) prompt = f""" Analyze this text for stealth product launch indicators: Title: {signal_title} Text: {signal_text} Rate the likelihood of this being a stealth product launch (0.0 to 1.0). Consider: stealth keywords, launch timing, product mentions, pricing changes, etc. Respond with JSON: {{ "launch_likelihood": 0.85, "confidence": "High", "reasoning": "Brief explanation of why this appears to be a stealth launch" }} """ response = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.3 ) result = response.choices[0].message.content return f"**Signal Analysis:**\n{result}" except Exception as e: logger.error(f"Signal scoring error: {e}") return f"❌ Scoring error: {str(e)}" @server.tool() async def search_tech_sites(ctx: Context, query: str, num_results: int = 5) -> str: """Search tech news sites and Product Hunt for stealth launches.""" config = ctx.session_config tech_sites = [ "site:techcrunch.com", "site:producthunt.com", "site:venturebeat.com", "site:theverge.com", "site:arstechnica.com" ] try: async with aiohttp.ClientSession() as session: headers = { "Authorization": f"Bearer {config.tavily_api_key}", "Content-Type": "application/json" } all_results = [] for site in tech_sites: data = { "query": f"{query} {site}", "search_depth": "basic", "include_answer": False, "include_raw_content": False, "max_results": 2 } async with session.post("https://api.tavily.com/search", headers=headers, json=data) as response: if response.status == 200: result = await response.json() all_results.extend(result.get("results", [])) # Format results formatted_results = [] for item in all_results[:num_results]: formatted_results.append(f"**{item.get('title', 'No title')}**\n{item.get('content', 'No content')}\nURL: {item.get('url', 'No URL')}\n") return "\n".join(formatted_results) except Exception as e: logger.error(f"Tech sites search error: {e}") return f"❌ Search error: {str(e)}" @server.tool() async def batch_score_signals(ctx: Context, signals: List[Dict[str, str]]) -> str: """Score multiple signals for stealth launch likelihood in batch.""" config = ctx.session_config try: client = openai.OpenAI(api_key=config.openai_api_key) # Prepare batch prompt signals_text = [] for i, signal in enumerate(signals, 1): title = signal.get('signal_title', f'Signal {i}') text = signal.get('signal_text', '') signals_text.append(f"Signal {i}: {title}\n{text}\n") prompt = f""" Analyze these signals for stealth product launch indicators: {''.join(signals_text)} For each signal, rate the likelihood of being a stealth product launch (0.0 to 1.0). Consider: stealth keywords, launch timing, product mentions, pricing changes, etc. Respond with JSON array: [ {{ "signal_id": 1, "launch_likelihood": 0.85, "confidence": "High", "reasoning": "Brief explanation" }}, ... ] """ response = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.3 ) result = response.choices[0].message.content return f"**Batch Signal Analysis:**\n{result}" except Exception as e: logger.error(f"Batch scoring error: {e}") return f"❌ Batch scoring error: {str(e)}" @server.tool() async def parse_fields(ctx: Context, html: str, target_fields: List[str] = None) -> str: """Extract structured stealth-launch indicators from raw HTML using Nimble's AI Parsing Skills.""" if target_fields is None: target_fields = ["pricing", "changelog", "launch_announcement"] config = ctx.session_config if not config.nimble_api_key: return "❌ Nimble API key not configured" try: async with aiohttp.ClientSession() as session: headers = { "Authorization": f"Bearer {config.nimble_api_key}", "Content-Type": "application/json" } data = { "html": html, "fields": target_fields } async with session.post("https://api.nimbleparser.com/parse", headers=headers, json=data) as response: if response.status == 200: result = await response.json() parsed_fields = result.get("parsed_fields", {}) formatted_fields = [] for field, value in parsed_fields.items(): if value: formatted_fields.append(f"**{field.title()}:** {value}") return f"**Parsed Fields:**\n" + "\n".join(formatted_fields) if formatted_fields else "No fields extracted" else: return f"❌ Parsing failed: HTTP {response.status}" except Exception as e: logger.error(f"Field parsing error: {e}") return f"❌ Parsing error: {str(e)}" @server.tool() async def run_pipeline(ctx: Context, query: str, num_results: int = 5, target_fields: List[str] = None) -> str: """End-to-end detection pipeline for stealth product launches.""" if target_fields is None: target_fields = ["pricing", "changelog"] config = ctx.session_config try: logger.info(f"[Pipeline] Starting pipeline for query: {query}") # Step 1: Search tech sites search_results = await search_tech_sites(ctx, query, num_results) logger.info(f"[Pipeline] Search completed") # Step 2: Extract URLs and analyze results = [] lines = search_results.split('\n') current_result = {} for line in lines: if line.startswith('**') and line.endswith('**'): if current_result: results.append(current_result) current_result = {'title': line.strip('*'), 'content': '', 'url': ''} elif line.startswith('URL:'): current_result['url'] = line.replace('URL:', '').strip() elif line and not line.startswith('**'): current_result['content'] += line + ' ' if current_result: results.append(current_result) logger.info(f"[Pipeline] Found {len(results)} results") # Step 3: Process each result processed_results = [] for i, result in enumerate(results[:num_results]): if result.get('url'): logger.info(f"[Pipeline] Processing result {i+1}: {result.get('title', 'No title')}") # Extract content from URL extracted = await url_extract(ctx, result['url']) # Parse fields if Nimble is available parsed_fields = "" if config.nimble_api_key: parsed_fields = await parse_fields(ctx, extracted, target_fields) # Score the signal signal_text = f"{result.get('title', '')} {result.get('content', '')} {extracted}" score_result = await score_signal(ctx, signal_text, result.get('title', '')) processed_results.append({ 'title': result.get('title', ''), 'url': result.get('url', ''), 'score': score_result, 'extracted': extracted, 'parsed_fields': parsed_fields }) # Step 4: Format final results formatted_results = [] for result in processed_results: formatted_results.append(f"**{result['title']}**\nURL: {result['url']}\n{result['score']}\n{result['parsed_fields']}\n---") logger.info(f"[Pipeline] Pipeline completed successfully") return "\n".join(formatted_results) except Exception as e: logger.error(f"Pipeline error: {e}") return f"❌ Pipeline error: {str(e)}" return server

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rainbowgore/stealthee-MCP-tools'

If you have feedback or need assistance with the MCP directory API, please join our Discord server