SQLite MCP Server
by direkt
#!/usr/bin/env python3
import gzip
import sqlite3
import os
import re
import glob
from datetime import datetime
import sys
import traceback
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler("create_log_db.log"), logging.StreamHandler()]
)
logger = logging.getLogger("create_log_db")
# Define the database file
DB_FILE = "logs.db"
# Improved regular expression pattern for the log format
# Format: 2025-03-06 00:00:00,024 [UserServer-2] INFO c.d.s.r.user.EnterpriseUserRPCServer - [USER]: Channel ...
LOG_PATTERN = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) \[([^\]]+)\] (\w+)\s+([^\s-]+) - (.+)$'
# Alternative pattern for logs that might not match the primary pattern
ALT_LOG_PATTERN = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) \[([^\]]+)\] (\w+)\s+(.+)$'
# Pattern to detect the start of a Java stack trace
STACK_TRACE_START_PATTERN = r'^(java\.\w+\.\w+Exception|Caused by:|at [\w\.]+\()'
def create_database():
"""Create the SQLite database and tables"""
# Remove existing database if it exists
if os.path.exists(DB_FILE):
os.remove(DB_FILE)
logger.info(f"Removed existing {DB_FILE}")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# Create logs table
cursor.execute('''
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
thread TEXT,
level TEXT,
module TEXT,
message TEXT,
source_file TEXT,
raw_log TEXT,
has_stack_trace INTEGER DEFAULT 0
)
''')
# Create a table for parsing errors
cursor.execute('''
CREATE TABLE IF NOT EXISTS parsing_errors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
line TEXT,
source_file TEXT,
error_message TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Create a table for stack traces
cursor.execute('''
CREATE TABLE IF NOT EXISTS stack_traces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
log_id INTEGER,
stack_trace TEXT,
FOREIGN KEY (log_id) REFERENCES logs (id)
)
''')
conn.commit()
return conn, cursor
def is_stack_trace_line(line):
"""Check if a line is part of a stack trace"""
return re.match(STACK_TRACE_START_PATTERN, line) is not None or line.strip().startswith("at ") or "Exception" in line
def parse_log_line(line, source_file):
"""Parse a log line and return a dictionary of values"""
# Try to parse as structured log with primary pattern
match = re.match(LOG_PATTERN, line)
if match:
timestamp, thread, level, module, message = match.groups()
return {
"timestamp": timestamp,
"thread": thread,
"level": level,
"module": module,
"message": message,
"source_file": source_file,
"raw_log": line,
"parsed": True
}
# Try alternative pattern
match = re.match(ALT_LOG_PATTERN, line)
if match:
timestamp, thread, level, message = match.groups()
# In this case, we don't have a separate module, so we'll use an empty string
return {
"timestamp": timestamp,
"thread": thread,
"level": level,
"module": "",
"message": message,
"source_file": source_file,
"raw_log": line,
"parsed": True
}
# Check if it's a stack trace line
if is_stack_trace_line(line):
return {
"is_stack_trace": True,
"stack_trace_line": line,
"parsed": True
}
# If we can't parse it in a structured way, just store the raw log
# and record it as a parsing error
return {
"timestamp": "",
"thread": "",
"level": "",
"module": "",
"message": line,
"source_file": source_file,
"raw_log": line,
"parsed": False,
"error": "Could not parse log line with any pattern"
}
def find_gz_files():
"""Find all .gz files in the current directory"""
gz_files = glob.glob("*.gz")
logger.info(f"Found {len(gz_files)} .gz files: {', '.join(gz_files)}")
return gz_files
def process_log_files(conn, cursor):
"""Process all log files and insert into database"""
# Find all .gz files in the current directory
log_files = find_gz_files()
if not log_files:
logger.warning("No .gz files found in the current directory.")
return
total_logs = 0
total_stack_traces = 0
total_errors = 0
for log_file in log_files:
if not os.path.exists(log_file):
logger.warning(f"Log file {log_file} not found, skipping.")
continue
logger.info(f"Processing {log_file}...")
file_logs = 0
file_stack_traces = 0
file_errors = 0
try:
# Open and decompress the gzip file
with gzip.open(log_file, 'rt', encoding='utf-8', errors='replace') as f:
batch = []
error_batch = []
stack_trace_batch = []
batch_size = 1000
line_count = 0
# Variables to track multi-line log entries
current_log_entry = None
current_stack_trace = []
last_log_id = None
for line in f:
line_count += 1
line = line.strip()
if not line:
continue
try:
# Check if this is a new log entry or continuation of previous
is_new_log = re.match(LOG_PATTERN, line) is not None or re.match(ALT_LOG_PATTERN, line) is not None
# If we have a stack trace and this is a new log, save the stack trace
if current_stack_trace and is_new_log and last_log_id is not None:
stack_trace_text = "\n".join(current_stack_trace)
stack_trace_batch.append((last_log_id, stack_trace_text))
current_stack_trace = []
file_stack_traces += 1
# Parse the line
parsed = parse_log_line(line, log_file)
if is_new_log:
# This is a new log entry
if parsed.get("parsed", False):
has_stack_trace = 0
batch.append((
parsed["timestamp"],
parsed.get("thread", ""),
parsed["level"],
parsed["module"],
parsed["message"],
parsed["source_file"],
parsed["raw_log"],
has_stack_trace
))
file_logs += 1
current_log_entry = parsed
else:
error_batch.append((
line,
log_file,
parsed.get("error", "Unknown parsing error")
))
file_errors += 1
current_log_entry = None
elif parsed.get("is_stack_trace", False):
# This is a stack trace line
current_stack_trace.append(parsed["stack_trace_line"])
# If we have a current log entry, mark it as having a stack trace
if current_log_entry and batch:
# Update the last entry in the batch to indicate it has a stack trace
last_entry = list(batch[-1])
last_entry[7] = 1 # Set has_stack_trace to 1
batch[-1] = tuple(last_entry)
else:
# This is a continuation of the previous log or an error
if current_log_entry:
# Append to the message of the current log entry
if batch:
last_entry = list(batch[-1])
last_entry[4] += "\n" + line # Append to message
last_entry[6] += "\n" + line # Append to raw_log
batch[-1] = tuple(last_entry)
else:
error_batch.append((
line,
log_file,
"Continuation line without a parent log entry"
))
file_errors += 1
# Insert in batches for better performance
if len(batch) >= batch_size:
cursor.executemany(
"INSERT INTO logs (timestamp, thread, level, module, message, source_file, raw_log, has_stack_trace) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
batch
)
# Get the ID of the last inserted log for stack traces
if current_stack_trace:
last_log_id = cursor.lastrowid
batch = []
if len(error_batch) >= batch_size:
cursor.executemany(
"INSERT INTO parsing_errors (line, source_file, error_message) VALUES (?, ?, ?)",
error_batch
)
error_batch = []
if len(stack_trace_batch) >= batch_size:
cursor.executemany(
"INSERT INTO stack_traces (log_id, stack_trace) VALUES (?, ?)",
stack_trace_batch
)
stack_trace_batch = []
# Commit every 100,000 lines to avoid transaction getting too large
if line_count % 100000 == 0:
conn.commit()
logger.info(f" Processed {line_count} lines...")
except Exception as e:
error_batch.append((
line,
log_file,
f"Exception: {str(e)}"
))
file_errors += 1
# Insert any remaining logs
if batch:
cursor.executemany(
"INSERT INTO logs (timestamp, thread, level, module, message, source_file, raw_log, has_stack_trace) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
batch
)
# Get the ID of the last inserted log for stack traces
if current_stack_trace:
last_log_id = cursor.lastrowid
# Insert the final stack trace if there is one
if current_stack_trace and last_log_id is not None:
stack_trace_text = "\n".join(current_stack_trace)
stack_trace_batch.append((last_log_id, stack_trace_text))
file_stack_traces += 1
if error_batch:
cursor.executemany(
"INSERT INTO parsing_errors (line, source_file, error_message) VALUES (?, ?, ?)",
error_batch
)
if stack_trace_batch:
cursor.executemany(
"INSERT INTO stack_traces (log_id, stack_trace) VALUES (?, ?)",
stack_trace_batch
)
conn.commit()
total_logs += file_logs
total_stack_traces += file_stack_traces
total_errors += file_errors
logger.info(f"Finished processing {log_file}: {file_logs} logs, {file_stack_traces} stack traces, {file_errors} errors")
except Exception as e:
logger.error(f"Error processing {log_file}: {str(e)}")
traceback.print_exc()
conn.rollback()
return total_logs, total_stack_traces, total_errors
def create_indexes(conn, cursor):
"""Create indexes for better query performance"""
logger.info("Creating indexes...")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs (timestamp)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_level ON logs (level)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_module ON logs (module)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_thread ON logs (thread)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_has_stack_trace ON logs (has_stack_trace)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_parsing_errors_source_file ON parsing_errors (source_file)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_stack_traces_log_id ON stack_traces (log_id)")
conn.commit()
def main():
logger.info(f"Creating log database: {DB_FILE}")
try:
conn, cursor = create_database()
try:
total_logs, total_stack_traces, total_errors = process_log_files(conn, cursor)
create_indexes(conn, cursor)
# Get stats
cursor.execute("SELECT COUNT(*) FROM logs")
log_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM parsing_errors")
error_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM stack_traces")
stack_trace_count = cursor.fetchone()[0]
# Get level distribution
cursor.execute("SELECT level, COUNT(*) as count FROM logs GROUP BY level ORDER BY count DESC")
level_counts = cursor.fetchall()
logger.info(f"\nDatabase created successfully!")
logger.info(f"Total regular logs: {log_count}")
logger.info(f"Total stack traces: {stack_trace_count}")
logger.info(f"Total parsing errors: {error_count}")
if level_counts:
level_distribution = "\nLog level distribution:"
for level, count in level_counts:
if level: # Only show non-empty levels
level_distribution += f"\n {level}: {count}"
logger.info(level_distribution)
logger.info(f"Database file: {os.path.abspath(DB_FILE)}")
finally:
conn.close()
except Exception as e:
logger.error(f"Error: {str(e)}")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()