Skip to main content
Glama

MCP News Server

by eluc1a
news_mcp_server.py12.7 kB
#!/usr/bin/env python """ news_mcp_server.py FastMCP server that exposes the harvested articles to LLM agents and offers on-demand refreshing & summarisation. """ import os import logging import sys from datetime import datetime, timezone, timedelta from typing import List, Dict, Optional import openai from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP, Context # type: ignore # Import DB helpers & harvester from the separate module from news_gatherer import ( get_connection, init_db, fetch_and_store, ) # ───────────────────────── configuration ────────────────────────── # load_dotenv(os.path.join(os.path.dirname(__file__), ".env")) SUMMARY_WORD_TARGET = 500 MAX_ARTICLES_PER_SUMMARY = 10_000 MAX_ARTICLES_PER_RESPONSE = 10_000 # Maximum articles per API response due to transport layer limitations KEYWORD_FILTER = [ "tech","technology","data science","machine learning","ml", "foundation model","self-supervised","causal","llm", "large language model","prompt","agent","hack","cyber", "linux","open-source","homelab","audio","dsp","creative", "startup","entrepreneur","funding", ] openai.api_key = os.getenv("OPENAI_API_KEY") # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("news_mcp_server.log"), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger("news_mcp") # ──────────────────────── summarisation ─────────────────────────── # def _summarize_articles(records: List[dict]) -> str: articles_md, sources = [], [] for idx, rec in enumerate(records, 1): articles_md.append( f"### Article {idx}\n" f"Title: {rec['title']}\n" f"Source: {rec['source']} ({rec['link']})\n\n" f"{rec['content']}\n" ) sources.append(f"[{idx}] {rec['title']} – {rec['link']}") articles_text = '\n'.join(articles_md) prompt = f""" You are a professional technology analyst. Write a cohesive {SUMMARY_WORD_TARGET}-word briefing combining the articles below, grouping by theme. Cover only these domains: {', '.join(KEYWORD_FILTER)}. Cite with footnotes like [1], [2]. Start with a short headline. === ARTICLES === {articles_text} === END === """ resp = openai.ChatCompletion.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=3000, ) summary = resp.choices[0].message.content.strip() summary += "\n\nSources\n" + "\n".join(sources) return summary def summarize_unsummarized( category: Optional[str] = None, limit: int = MAX_ARTICLES_PER_SUMMARY ) -> str: with get_connection() as conn, conn.cursor() as cur: sql = """ SELECT id,title,link,source,content FROM entries WHERE summarized_at IS NULL """ params: list = [] if category: sql += "AND category=%s" params.append(category) sql += "ORDER BY published DESC NULLS LAST LIMIT %s" params.append(limit) cur.execute(sql, tuple(params)) rows = cur.fetchall() if not rows: return "No new articles to summarize." records = [dict(id=r[0], title=r[1], link=r[2], source=r[3], content=r[4]) for r in rows] summary = _summarize_articles(records) ids = [r["id"] for r in records] with get_connection() as conn, conn.cursor() as cur: cur.execute("UPDATE entries SET summarized_at=%s WHERE id=ANY(%s);", (datetime.now(timezone.utc), ids)) conn.commit() return summary # ──────────────────────── Fast-MCP API ──────────────────────────── # mcp = FastMCP("News Feeds") # @mcp.tool() # def refresh_feeds(ctx: Context) -> str: # """Manually trigger the harvester (rarely needed if you schedule it).""" # inserted = fetch_and_store() # return f"Fetched feeds; inserted {inserted} new items." def get_articles_with_pagination(category: Optional[str | List[str]], cutoff_time: datetime, limit: int) -> List[Dict]: """ Get articles with pagination to support retrieving large numbers of articles. This function is internal and not exposed as an MCP tool. """ print(f"*** RETRIEVING ARTICLES: category={category}, cutoff={cutoff_time}, limit={limit}") all_articles = [] offset = 0 batch_size = 100 # Each database query fetches 100 records with get_connection() as conn: while len(all_articles) < limit: with conn.cursor() as cur: if category: if isinstance(category, list): # Handle list of categories query = """ SELECT id,title,link,published::text,source,content FROM entries WHERE category = ANY(%s) AND uploaded_at >= %s ORDER BY uploaded_at DESC NULLS LAST, published DESC NULLS LAST LIMIT %s OFFSET %s;""" print(f"*** EXECUTING QUERY WITH CATEGORIES: {category}, OFFSET={offset}, LIMIT={batch_size}") cur.execute(query, (category, cutoff_time, batch_size, offset)) else: # Handle single category as string query = """ SELECT id,title,link,published::text,source,content FROM entries WHERE category=%s AND uploaded_at >= %s ORDER BY uploaded_at DESC NULLS LAST, published DESC NULLS LAST LIMIT %s OFFSET %s;""" print(f"*** EXECUTING QUERY: OFFSET={offset}, LIMIT={batch_size}") cur.execute(query, (category, cutoff_time, batch_size, offset)) else: query = """ SELECT id,title,link,published::text,source,content FROM entries WHERE category='news' AND uploaded_at >= %s ORDER BY uploaded_at DESC NULLS LAST, published DESC NULLS LAST LIMIT %s OFFSET %s;""" print(f"*** EXECUTING QUERY: OFFSET={offset}, LIMIT={batch_size}") cur.execute(query, (cutoff_time, batch_size, offset)) rows = cur.fetchall() print(f"*** FETCHED {len(rows)} ARTICLES") if not rows: # No more results break articles = [ dict(id=r[0], title=r[1], link=r[2], published=r[3], source=r[4], content=r[5]) for r in rows ] all_articles.extend(articles) offset += batch_size if len(rows) < batch_size: # Got fewer rows than requested, meaning we've reached the end break print(f"*** TOTAL ARTICLES FOUND: {len(all_articles)}") return all_articles[:limit] # Respect the original limit @mcp.tool() def summarize_news(ctx: Context, category: str | List[str] = "", hours: int = 24, limit: int = 10_000, offset: int = 0) -> Dict: """ Returns raw articles so the caller can summarise them (LLM-side). Args: ctx: MCP context providing database connection and logging. category: Category filter - either a single category string or a list of categories. Available categories include: international_news, research, data_science, regional_international_news, business_finance_news, us_local_news, business_tech, tech, policy, linux, science, cybersecurity, startups, business, us_national_news, investigative_journalism, llm_tools Defaults to us_national_news if empty. hours: Number of hours to look back for articles. limit: Maximum number of articles to return. offset: Starting position for pagination (default 0). Returns: A dict with: - articles: list of article dicts (each has id, title, link, published, source, content) - meta: metadata about the query (total_count, limit, offset, has_more) """ if isinstance(category, list): category_str = ', '.join(category) logger.info(f"summarize_news called with categories=[{category_str}], hours={hours}, limit={limit}, offset={offset}") else: logger.info(f"summarize_news called with category='{category}', hours={hours}, limit={limit}, offset={offset}") current_time = datetime.now(timezone.utc) if hours >= 24: # Calculate how many days the hours represent days = hours // 24 # Get the start of today, then subtract the days today_start = datetime( current_time.year, current_time.month, current_time.day, tzinfo=current_time.tzinfo) cutoff_time = today_start - timedelta(days=days) else: # For less than 24 hours, use the original hour-based calculation cutoff_time = current_time - timedelta(hours=hours) logger.info(f"Calculated cutoff_time: {cutoff_time}") cat = category or None lim = min(limit, MAX_ARTICLES_PER_RESPONSE) # Only fetch what we can actually return # Retrieve the articles directly articles = get_articles_with_pagination(cat, cutoff_time, lim) if not articles: # Return an empty result if no articles found return { "articles": [], "meta": { "total_count": 0, "limit": lim, "offset": offset, "has_more": False, "next_offset": None } } # Format all articles into a single text string with clear delimiters delimiter = "\n==========ARTICLE_SEPARATOR==========\n" result = f"FOUND {len(articles)} ARTICLES\n\n" result += delimiter.join([ f"ID: {art['id']}\nTITLE: {art['title']}\nLINK: {art['link']}\n" f"PUBLISHED: {art['published']}\nSOURCE: {art['source']}\n" f"CONTENT:\n{art['content']}" for art in articles ]) # Create a single "mega-article" with all content to bypass pagination limits mega_article = [{ "id": 0, "title": f"News Feed ({len(articles)} articles)", "link": "concat://articles", "published": str(current_time), "source": "News Concatenator", "content": result }] # Return both the article and metadata return { "articles": mega_article, "meta": { "total_count": len(articles), "limit": lim, "offset": offset, "has_more": False, "next_offset": None } } @mcp.resource("news://{category}/{limit}") def get_latest_news(category: str, limit: int = 10_000) -> List[dict]: with get_connection() as conn, conn.cursor() as cur: cur.execute(""" SELECT title,link,published::text,source FROM entries WHERE category=%s ORDER BY published DESC NULLS LAST LIMIT %s;""", (category, limit)) return [ dict(title=t, link=l, published=p, source=s) for t, l, p, s in cur.fetchall() ] # ────────────────────── server start-up ─────────────────────────── # def initialize_resources() -> None: """Ensure DB exists; harvesting is handled by the external scheduler.""" init_db() logging.info("News MCP server initialised – waiting for client calls.") print("News MCP server initialised – waiting for client calls.") if __name__ == "__main__": initialize_resources() # SSE transport keeps things simple for HTTP clients such as Cursor/TypingMind mcp.run(transport="stdio")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eluc1a/mcp-news'

If you have feedback or need assistance with the MCP directory API, please join our Discord server