database.pyโข10.3 kB
"""
MCP Database Manager - Adapted from original database.py for MCP compatibility
"""
import sqlite3
import logging
from datetime import datetime
logger = logging.getLogger("mcp-web-scraper")
class DatabaseManager:
def __init__(self, db_path="scraper_results.db"):
self.db_path = db_path
self.setup_database()
def setup_database(self):
"""Create database tables if they don't exist"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create URLs table
cursor.execute('''
CREATE TABLE IF NOT EXISTS urls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT,
content TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Create keywords table
cursor.execute('''
CREATE TABLE IF NOT EXISTS keywords (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT UNIQUE NOT NULL
)
''')
# Create junction table for URL-keyword relationships
cursor.execute('''
CREATE TABLE IF NOT EXISTS url_keywords (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url_id INTEGER,
keyword_id INTEGER,
matches INTEGER DEFAULT 1,
context TEXT,
FOREIGN KEY (url_id) REFERENCES urls (id),
FOREIGN KEY (keyword_id) REFERENCES keywords (id),
UNIQUE(url_id, keyword_id)
)
''')
# Create indexes for better performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_urls_url ON urls(url)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_keywords_keyword ON keywords(keyword)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_url_keywords_url_id ON url_keywords(url_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_url_keywords_keyword_id ON url_keywords(keyword_id)')
conn.commit()
conn.close()
logger.info("Database setup completed")
def insert_url(self, url, title="", content=""):
"""Insert or update a URL in the database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO urls (url, title, content, timestamp)
VALUES (?, ?, ?, ?)
''', (url, title, content, datetime.now()))
url_id = cursor.lastrowid
conn.commit()
return url_id
except sqlite3.Error as e:
logger.error(f"Database error inserting URL {url}: {e}")
return None
finally:
conn.close()
def get_or_create_keyword(self, keyword):
"""Get keyword ID or create if it doesn't exist"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('SELECT id FROM keywords WHERE keyword = ?', (keyword,))
result = cursor.fetchone()
if result:
return result[0]
else:
cursor.execute('INSERT INTO keywords (keyword) VALUES (?)', (keyword,))
keyword_id = cursor.lastrowid
conn.commit()
return keyword_id
except sqlite3.Error as e:
logger.error(f"Database error with keyword {keyword}: {e}")
return None
finally:
conn.close()
def add_keyword_match(self, url_id, keyword_id, context=""):
"""Add a keyword match for a URL"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO url_keywords (url_id, keyword_id, context, matches)
VALUES (?, ?, ?, COALESCE((SELECT matches FROM url_keywords WHERE url_id = ? AND keyword_id = ?), 0) + 1)
''', (url_id, keyword_id, context, url_id, keyword_id))
conn.commit()
return True
except sqlite3.Error as e:
logger.error(f"Database error adding keyword match: {e}")
return False
finally:
conn.close()
def get_scraping_results(self, keyword_filter=None, limit=50):
"""Get scraping results with optional keyword filtering"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
if keyword_filter:
# Get results filtered by specific keyword
cursor.execute('''
SELECT
u.url,
u.title,
k.keyword,
uk.context,
uk.matches
FROM urls u
JOIN url_keywords uk ON u.id = uk.url_id
JOIN keywords k ON uk.keyword_id = k.id
WHERE k.keyword = ?
ORDER BY uk.matches DESC
LIMIT ?
''', (keyword_filter, limit))
results = cursor.fetchall()
# Group by URL
url_results = {}
for url, title, keyword, context, matches in results:
if url not in url_results:
url_results[url] = {
'url': url,
'title': title,
'keywords': [],
'keyword_matches': [],
'match_count': 0
}
url_results[url]['keywords'].append(keyword)
url_results[url]['keyword_matches'].append((keyword, context))
url_results[url]['match_count'] += matches
return list(url_results.values())
else:
# Get all results grouped by URL
cursor.execute('''
SELECT
u.url,
u.title,
GROUP_CONCAT(DISTINCT k.keyword) as keywords,
COUNT(DISTINCT k.id) as keyword_count,
SUM(uk.matches) as total_matches
FROM urls u
JOIN url_keywords uk ON u.id = uk.url_id
JOIN keywords k ON uk.keyword_id = k.id
GROUP BY u.id
ORDER BY total_matches DESC
LIMIT ?
''', (limit,))
results = cursor.fetchall()
formatted_results = []
for url, title, keywords, keyword_count, total_matches in results:
# Get detailed keyword matches for this URL
cursor.execute('''
SELECT k.keyword, uk.context, uk.matches
FROM url_keywords uk
JOIN keywords k ON uk.keyword_id = k.id
JOIN urls u ON uk.url_id = u.id
WHERE u.url = ?
''', (url,))
keyword_matches = cursor.fetchall()
formatted_results.append({
'url': url,
'title': title,
'keywords': keywords.split(',') if keywords else [],
'keyword_matches': [(kw, ctx) for kw, ctx, _ in keyword_matches],
'match_count': total_matches
})
return formatted_results
except sqlite3.Error as e:
logger.error(f"Database error getting results: {e}")
return []
finally:
conn.close()
def get_keyword_stats(self):
"""Get statistics about keywords"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
SELECT
k.keyword,
COUNT(DISTINCT uk.url_id) as url_count,
SUM(uk.matches) as total_matches
FROM keywords k
LEFT JOIN url_keywords uk ON k.id = uk.keyword_id
GROUP BY k.id
ORDER BY total_matches DESC
''')
return cursor.fetchall()
except sqlite3.Error as e:
logger.error(f"Database error getting keyword stats: {e}")
return []
finally:
conn.close()
def get_url_stats(self):
"""Get statistics about URLs"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
SELECT
COUNT(*) as total_urls,
COUNT(DISTINCT uk.keyword_id) as unique_keywords,
SUM(uk.matches) as total_matches
FROM urls u
LEFT JOIN url_keywords uk ON u.id = uk.url_id
''')
return cursor.fetchone()
except sqlite3.Error as e:
logger.error(f"Database error getting URL stats: {e}")
return (0, 0, 0)
finally:
conn.close()
def clear_database(self):
"""Clear all data from the database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM url_keywords')
cursor.execute('DELETE FROM keywords')
cursor.execute('DELETE FROM urls')
conn.commit()
logger.info("Database cleared")
return True
except sqlite3.Error as e:
logger.error(f"Database error clearing data: {e}")
return False
finally:
conn.close()