zz_news_mcp_server.py.bk•13 kB
#!/usr/bin/env python
"""
news_mcp_server.py
Full-text news harvester, summarizer and FastMCP server.
"""
import os
import uuid
import time
import logging
import threading
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Optional
import feedparser
import psycopg2
from psycopg2.extras import execute_values
import trafilatura
import openai
import requests
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP, Context # type: ignore
# ────────────────────────── Configuration ────────────────────────── #
load_dotenv(os.path.join(os.path.dirname(__file__), ".env"))
FEED_SOURCES: List[Dict[str, str]] = [
{"name": "TechCrunch", "url": "https://techcrunch.com/feed/", "category": "tech"},
{"name": "The Verge", "url": "https://www.theverge.com/tech/rss/index.xml", "category": "tech"},
{"name": "Ars Technica", "url": "http://feeds.arstechnica.com/arstechnica/index","category": "tech"},
{"name": "KDnuggets", "url": "https://www.kdnuggets.com/feed", "category": "data_science"},
{"name": "Data Science Central", "url": "https://www.datasciencecentral.com/feed/", "category": "data_science"},
{"name": "Machine Learning Mastery","url":"https://machinelearningmastery.com/feed/", "category": "data_science"},
{"name": "arXiv cs.LG", "url": "https://arxiv.org/rss/cs.LG", "category": "data_science"},
{"name": "arXiv stat.ML", "url": "https://arxiv.org/rss/stat.ML", "category": "data_science"},
{"name": "OpenAI News", "url": "https://openai.com/news/rss.xml", "category": "llm_tools"},
{"name": "Hugging Face Blog", "url": "https://huggingface.co/blog/rss.xml", "category": "llm_tools"},
{"name": "Google AI Blog", "url": "https://ai.googleblog.com/atom.xml", "category": "llm_tools"},
{"name": "The Hacker News", "url": "https://feeds.feedburner.com/TheHackersNews", "category": "cybersecurity"},
{"name": "Krebs on Security", "url": "https://krebsonsecurity.com/feed/", "category": "cybersecurity"},
{"name": "Schneier on Security", "url": "https://www.schneier.com/blog/atom.xml", "category": "cybersecurity"},
{"name": "Phoronix", "url": "https://www.phoronix.com/rss.php", "category": "linux"},
{"name": "LWN.net", "url": "https://lwn.net/headlines/rss", "category": "linux"},
{"name": "ServeTheHome", "url": "https://www.servethehome.com/feed/", "category": "linux"},
{"name": "The Audio Programmer", "url": "https://theaudioprogrammer.com/rss/", "category": "audio_dsp"},
{"name": "Designing Sound", "url": "https://designingsound.org/feed/", "category": "audio_dsp"},
{"name": "KVR Audio", "url": "https://www.kvraudio.com/xml/rss.xml", "category": "audio_dsp"},
{"name": "VentureBeat", "url": "https://venturebeat.com/feed/", "category": "startups"},
{"name": "First Round Review", "url": "https://review.firstround.com/rss", "category": "startups"},
]
KEYWORD_FILTER = [
"tech","technology","data science","machine learning","ml",
"foundation model","self-supervised","causal","llm",
"large language model","prompt","agent","hack","cyber",
"linux","open-source","homelab","audio","dsp","creative",
"startup","entrepreneur","funding",
]
DB_URL = os.getenv("DATABASE_URL", "postgresql://localhost/mcp_news")
openai.api_key = os.getenv("OPENAI_API_KEY")
SUMMARY_WORD_TARGET = 500
HOURLY_REFRESH = 2 * 3600 # 2 hours
MAX_ARTICLES_PER_SUMMARY = 25
# Silence noisy loggers
logging.getLogger('trafilatura').setLevel(logging.ERROR)
# ────────────────────────── Database helpers ─────────────────────── #
def get_connection():
return psycopg2.connect(DB_URL, connect_timeout=5)
def init_db():
"""Create table once if it doesn't exist."""
with get_connection() as conn, conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS entries (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
link TEXT NOT NULL,
published TIMESTAMPTZ,
source TEXT,
category TEXT,
content TEXT,
summarized_at TIMESTAMPTZ
);
""")
conn.commit()
# ────────────────────────── Feed harvesting ──────────────────────── #
def _extract_content(url: str) -> Optional[str]:
"""Download article and return cleaned text."""
try:
r = requests.get(
url,
headers={
"User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/121.0.0.0 Safari/537.36")
},
timeout=10,
)
if r.status_code != 200:
return None
except Exception as exc:
logging.debug("Request failed for %s: %s", url, exc)
return None
text = trafilatura.extract(r.text, include_comments=False,
include_tables=False, no_fallback=True)
return text.strip() if text else None
def upsert_entries(rows: List[tuple]):
if not rows:
return
query = """
INSERT INTO entries
(id,title,link,published,source,category,content,summarized_at)
VALUES %s
ON CONFLICT (id) DO NOTHING;
"""
with get_connection() as conn, conn.cursor() as cur:
execute_values(cur, query, rows)
conn.commit()
def fetch_and_store() -> int:
"""Poll all RSS feeds, insert new articles, return count."""
new_cnt = 0
for src in FEED_SOURCES:
feed = feedparser.parse(
src["url"],
request_headers={"User-Agent": "Mozilla/5.0"}
)
for entry in feed.entries:
entry_id = (
entry.get("id") or entry.get("guid") or entry.get("link") or
str(uuid.uuid5(uuid.NAMESPACE_URL, entry.get("link", str(uuid.uuid4()))))
)
with get_connection() as conn, conn.cursor() as cur:
cur.execute("SELECT 1 FROM entries WHERE id=%s;", (entry_id,))
if cur.fetchone():
continue # already present
pub_struct = entry.get("published_parsed") or entry.get("updated_parsed")
pub_dt = datetime(*pub_struct[:6], tzinfo=timezone.utc) if pub_struct else None
content = _extract_content(entry.link) or \
entry.get("summary") or entry.get("description") or ""
if not content:
continue
upsert_entries([(
entry_id, entry.title, entry.link, pub_dt,
src["name"], src["category"], content, None
)])
new_cnt += 1
return new_cnt
# ────────────────────────── Summarisation ────────────────────────── #
def _summarize_articles(records: List[dict]) -> str:
articles_md, sources = [], []
for idx, rec in enumerate(records, 1):
articles_md.append(
f"### Article {idx}\n"
f"Title: {rec['title']}\n"
f"Source: {rec['source']} ({rec['link']})\n\n"
f"{rec['content']}\n"
)
sources.append(f"[{idx}] {rec['title']} – {rec['link']}")
prompt = f"""
You are a professional technology analyst.
Write a cohesive {SUMMARY_WORD_TARGET}-word briefing combining the articles below,
grouping by theme. Cover only these domains: {', '.join(KEYWORD_FILTER)}.
Cite with footnotes like [1], [2]. Start with a short headline.
=== ARTICLES ===
{'\n'.join(articles_md)}
=== END ===
"""
resp = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=3000,
)
summary = resp.choices[0].message.content.strip()
summary += "\n\nSources\n" + "\n".join(sources)
return summary
def summarize_unsummarized(category: Optional[str] = None,
limit: int = MAX_ARTICLES_PER_SUMMARY) -> str:
with get_connection() as conn, conn.cursor() as cur:
sql = """
SELECT id,title,link,source,content
FROM entries
WHERE summarized_at IS NULL
"""
params: list = []
if category:
sql += "AND category=%s"
params.append(category)
sql += "ORDER BY published DESC NULLS LAST LIMIT %s"
params.append(limit)
cur.execute(sql, tuple(params))
rows = cur.fetchall()
if not rows:
return "No new articles to summarize."
records = [dict(id=r[0], title=r[1], link=r[2], source=r[3], content=r[4])
for r in rows]
summary = _summarize_articles(records)
ids = [r["id"] for r in records]
with get_connection() as conn, conn.cursor() as cur:
cur.execute("UPDATE entries SET summarized_at=%s WHERE id=ANY(%s);",
(datetime.now(timezone.utc), ids))
conn.commit()
return summary
# ────────────────────────── FastMCP ──────────────────────────────── #
mcp = FastMCP("News Feeds")
@mcp.tool()
def refresh_feeds(ctx: Context) -> str:
inserted = fetch_and_store()
return f"Fetched feeds; inserted {inserted} new items."
@mcp.tool()
def summarize_news(ctx: Context, category: str = "", limit: int = 20) -> List[Dict]:
"""Return raw articles; the calling LLM can decide how to summarise them."""
cat = category or None
lim = min(limit, MAX_ARTICLES_PER_SUMMARY)
with get_connection() as conn, conn.cursor() as cur:
if cat:
cur.execute("""
SELECT id,title,link,published::text,source,content
FROM entries
WHERE category=%s
ORDER BY published DESC NULLS LAST
LIMIT %s;
""", (cat, lim))
else:
cur.execute("""
SELECT id,title,link,published::text,source,content
FROM entries
ORDER BY published DESC NULLS LAST
LIMIT %s;
""", (lim,))
rows = cur.fetchall()
return [
dict(id=r[0], title=r[1], link=r[2], published=r[3],
source=r[4], content=r[5])
for r in rows
]
@mcp.resource("news://{category}/{limit}")
def get_latest_news(category: str, limit: int = 10) -> List[dict]:
with get_connection() as conn, conn.cursor() as cur:
cur.execute("""
SELECT title,link,published::text,source
FROM entries
WHERE category=%s
ORDER BY published DESC NULLS LAST
LIMIT %s;
""", (category, limit))
return [
dict(title=t, link=l, published=p, source=s)
for t, l, p, s in cur.fetchall()
]
# ────────────────────────── Startup & Server Run ──────────────────── #
def initialize_resources() -> None:
"""Initial DB setup and kick off background fetch loop."""
init_db()
# Define initial fetch job
def _initial_fetch():
try:
inserted = fetch_and_store()
logging.info("Startup fetch inserted %d new items.", inserted)
except Exception as exc:
logging.warning("Startup fetch failed: %s", exc)
# Define hourly job
def _hourly_job():
while True:
try:
n = fetch_and_store()
logging.info("Hourly fetch inserted %d new items.", n)
except Exception as exc:
logging.warning("Hourly fetch failed: %s", exc)
time.sleep(HOURLY_REFRESH)
# Start both initial fetch and hourly job in background threads
threading.Thread(target=_initial_fetch, daemon=True).start()
threading.Thread(target=_hourly_job, daemon=True).start()
logging.info("News MCP server initializing - feed fetch running in background")
# Run the FastMCP server when executed directly
if __name__ == "__main__":
initialize_resources()
# Use SSE transport so clients can connect over HTTP
mcp.run(transport="sse")