Skip to main content
Glama
local_storage.py28.8 kB
""" Local SQLite Storage Module Personal data storage for job search, contacts, and preferences """ import sqlite3 import json import os from datetime import datetime from pathlib import Path # Database file location - centralized in pbdb folder DB_PATH = os.path.join(os.path.expanduser('~'), 'Documents', 'pbdb', 'mcp', 'local_data.db') def get_connection(): """Get database connection with row factory""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_database(): """Initialize all database tables""" conn = get_connection() cursor = conn.cursor() # Personal profile table cursor.execute(''' CREATE TABLE IF NOT EXISTS profile ( id INTEGER PRIMARY KEY, field TEXT UNIQUE NOT NULL, value TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Job applications table cursor.execute(''' CREATE TABLE IF NOT EXISTS job_applications ( id INTEGER PRIMARY KEY AUTOINCREMENT, company TEXT NOT NULL, position TEXT NOT NULL, location TEXT, salary_range TEXT, job_url TEXT, date_applied DATE, status TEXT DEFAULT 'applied', contact_name TEXT, contact_email TEXT, notes TEXT, follow_up_date DATE, resume_version TEXT, cover_letter TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Interviews table cursor.execute(''' CREATE TABLE IF NOT EXISTS interviews ( id INTEGER PRIMARY KEY AUTOINCREMENT, application_id INTEGER, interview_date TIMESTAMP, interview_type TEXT, interviewer_name TEXT, interviewer_email TEXT, status TEXT DEFAULT 'scheduled', notes TEXT, feedback TEXT, next_steps TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (application_id) REFERENCES job_applications(id) ) ''') # Contacts/Network table cursor.execute(''' CREATE TABLE IF NOT EXISTS contacts ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT, phone TEXT, company TEXT, title TEXT, linkedin_url TEXT, relationship TEXT, notes TEXT, last_contact DATE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Skills table cursor.execute(''' CREATE TABLE IF NOT EXISTS skills ( id INTEGER PRIMARY KEY AUTOINCREMENT, skill_name TEXT UNIQUE NOT NULL, category TEXT, proficiency TEXT, years_experience INTEGER, notes TEXT ) ''') # Work experience table cursor.execute(''' CREATE TABLE IF NOT EXISTS work_experience ( id INTEGER PRIMARY KEY AUTOINCREMENT, company TEXT NOT NULL, title TEXT NOT NULL, start_date DATE, end_date DATE, is_current BOOLEAN DEFAULT 0, description TEXT, achievements TEXT, technologies TEXT ) ''') # Education table cursor.execute(''' CREATE TABLE IF NOT EXISTS education ( id INTEGER PRIMARY KEY AUTOINCREMENT, institution TEXT NOT NULL, degree TEXT, field_of_study TEXT, start_date DATE, end_date DATE, gpa TEXT, achievements TEXT ) ''') # Tasks/reminders table cursor.execute(''' CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, due_date DATE, priority TEXT DEFAULT 'medium', status TEXT DEFAULT 'pending', category TEXT, related_application_id INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP, FOREIGN KEY (related_application_id) REFERENCES job_applications(id) ) ''') # Notes/journal table cursor.execute(''' CREATE TABLE IF NOT EXISTS notes ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, content TEXT NOT NULL, category TEXT, tags TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Email templates table cursor.execute(''' CREATE TABLE IF NOT EXISTS email_templates ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, subject TEXT, body TEXT NOT NULL, template_type TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() return {'success': True, 'message': 'Database initialized successfully'} # Initialize database on module import init_database() # ==================== PROFILE FUNCTIONS ==================== def set_profile(field, value): """Set a profile field""" try: conn = get_connection() cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO profile (field, value, updated_at) VALUES (?, ?, ?) ''', (field, value, datetime.now())) conn.commit() conn.close() return {'success': True, 'field': field, 'value': value} except Exception as error: return {'success': False, 'error': str(error)} def get_profile(field=None): """Get profile field(s)""" try: conn = get_connection() cursor = conn.cursor() if field: cursor.execute('SELECT value FROM profile WHERE field = ?', (field,)) row = cursor.fetchone() conn.close() return {'success': True, 'field': field, 'value': row['value'] if row else None} else: cursor.execute('SELECT field, value FROM profile') rows = cursor.fetchall() conn.close() return { 'success': True, 'profile': {row['field']: row['value'] for row in rows} } except Exception as error: return {'success': False, 'error': str(error)} def set_profile_bulk(profile_data): """Set multiple profile fields at once""" try: conn = get_connection() cursor = conn.cursor() for field, value in profile_data.items(): cursor.execute(''' INSERT OR REPLACE INTO profile (field, value, updated_at) VALUES (?, ?, ?) ''', (field, value, datetime.now())) conn.commit() conn.close() return {'success': True, 'updated_fields': list(profile_data.keys())} except Exception as error: return {'success': False, 'error': str(error)} # ==================== JOB APPLICATION FUNCTIONS ==================== def add_job_application(company, position, **kwargs): """Add a new job application""" try: conn = get_connection() cursor = conn.cursor() fields = ['company', 'position'] values = [company, position] optional_fields = ['location', 'salary_range', 'job_url', 'date_applied', 'status', 'contact_name', 'contact_email', 'notes', 'follow_up_date', 'resume_version', 'cover_letter'] for field in optional_fields: if field in kwargs: fields.append(field) values.append(kwargs[field]) placeholders = ', '.join(['?'] * len(values)) field_names = ', '.join(fields) cursor.execute(f''' INSERT INTO job_applications ({field_names}) VALUES ({placeholders}) ''', values) application_id = cursor.lastrowid conn.commit() conn.close() return { 'success': True, 'applicationId': application_id, 'company': company, 'position': position } except Exception as error: return {'success': False, 'error': str(error)} def get_job_applications(status=None, company=None, limit=50): """Get job applications with optional filters""" try: conn = get_connection() cursor = conn.cursor() query = 'SELECT * FROM job_applications WHERE 1=1' params = [] if status: query += ' AND status = ?' params.append(status) if company: query += ' AND company LIKE ?' params.append(f'%{company}%') query += ' ORDER BY date_applied DESC LIMIT ?' params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() conn.close() applications = [dict(row) for row in rows] return {'success': True, 'applications': applications, 'total': len(applications)} except Exception as error: return {'success': False, 'error': str(error)} def get_job_application(application_id): """Get a specific job application""" try: conn = get_connection() cursor = conn.cursor() cursor.execute('SELECT * FROM job_applications WHERE id = ?', (application_id,)) row = cursor.fetchone() conn.close() if row: return {'success': True, 'application': dict(row)} return {'success': False, 'error': 'Application not found'} except Exception as error: return {'success': False, 'error': str(error)} def update_job_application(application_id, **kwargs): """Update a job application""" try: conn = get_connection() cursor = conn.cursor() updates = [] values = [] for field, value in kwargs.items(): updates.append(f'{field} = ?') values.append(value) updates.append('updated_at = ?') values.append(datetime.now()) values.append(application_id) cursor.execute(f''' UPDATE job_applications SET {', '.join(updates)} WHERE id = ? ''', values) conn.commit() conn.close() return {'success': True, 'applicationId': application_id, 'updated': list(kwargs.keys())} except Exception as error: return {'success': False, 'error': str(error)} def delete_job_application(application_id): """Delete a job application""" try: conn = get_connection() cursor = conn.cursor() cursor.execute('DELETE FROM job_applications WHERE id = ?', (application_id,)) conn.commit() conn.close() return {'success': True, 'message': 'Application deleted'} except Exception as error: return {'success': False, 'error': str(error)} # ==================== INTERVIEW FUNCTIONS ==================== def add_interview(application_id, interview_date, interview_type, **kwargs): """Add an interview for a job application""" try: conn = get_connection() cursor = conn.cursor() fields = ['application_id', 'interview_date', 'interview_type'] values = [application_id, interview_date, interview_type] optional_fields = ['interviewer_name', 'interviewer_email', 'status', 'notes', 'feedback', 'next_steps'] for field in optional_fields: if field in kwargs: fields.append(field) values.append(kwargs[field]) placeholders = ', '.join(['?'] * len(values)) field_names = ', '.join(fields) cursor.execute(f''' INSERT INTO interviews ({field_names}) VALUES ({placeholders}) ''', values) interview_id = cursor.lastrowid conn.commit() conn.close() return {'success': True, 'interviewId': interview_id} except Exception as error: return {'success': False, 'error': str(error)} def get_interviews(application_id=None, status=None, upcoming_only=False): """Get interviews with optional filters""" try: conn = get_connection() cursor = conn.cursor() query = ''' SELECT i.*, ja.company, ja.position FROM interviews i JOIN job_applications ja ON i.application_id = ja.id WHERE 1=1 ''' params = [] if application_id: query += ' AND i.application_id = ?' params.append(application_id) if status: query += ' AND i.status = ?' params.append(status) if upcoming_only: query += ' AND i.interview_date >= ?' params.append(datetime.now().isoformat()) query += ' ORDER BY i.interview_date ASC' cursor.execute(query, params) rows = cursor.fetchall() conn.close() return {'success': True, 'interviews': [dict(row) for row in rows]} except Exception as error: return {'success': False, 'error': str(error)} def update_interview(interview_id, **kwargs): """Update an interview""" try: conn = get_connection() cursor = conn.cursor() updates = [] values = [] for field, value in kwargs.items(): updates.append(f'{field} = ?') values.append(value) values.append(interview_id) cursor.execute(f''' UPDATE interviews SET {', '.join(updates)} WHERE id = ? ''', values) conn.commit() conn.close() return {'success': True, 'interviewId': interview_id} except Exception as error: return {'success': False, 'error': str(error)} # ==================== CONTACTS FUNCTIONS ==================== def add_contact(name, **kwargs): """Add a new contact""" try: conn = get_connection() cursor = conn.cursor() fields = ['name'] values = [name] optional_fields = ['email', 'phone', 'company', 'title', 'linkedin_url', 'relationship', 'notes', 'last_contact'] for field in optional_fields: if field in kwargs: fields.append(field) values.append(kwargs[field]) placeholders = ', '.join(['?'] * len(values)) field_names = ', '.join(fields) cursor.execute(f''' INSERT INTO contacts ({field_names}) VALUES ({placeholders}) ''', values) contact_id = cursor.lastrowid conn.commit() conn.close() return {'success': True, 'contactId': contact_id, 'name': name} except Exception as error: return {'success': False, 'error': str(error)} def get_contacts(company=None, search=None, limit=50): """Get contacts with optional filters""" try: conn = get_connection() cursor = conn.cursor() query = 'SELECT * FROM contacts WHERE 1=1' params = [] if company: query += ' AND company LIKE ?' params.append(f'%{company}%') if search: query += ' AND (name LIKE ? OR email LIKE ? OR company LIKE ?)' params.extend([f'%{search}%'] * 3) query += ' ORDER BY name ASC LIMIT ?' params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() conn.close() return {'success': True, 'contacts': [dict(row) for row in rows]} except Exception as error: return {'success': False, 'error': str(error)} def update_contact(contact_id, **kwargs): """Update a contact""" try: conn = get_connection() cursor = conn.cursor() updates = [] values = [] for field, value in kwargs.items(): updates.append(f'{field} = ?') values.append(value) values.append(contact_id) cursor.execute(f''' UPDATE contacts SET {', '.join(updates)} WHERE id = ? ''', values) conn.commit() conn.close() return {'success': True, 'contactId': contact_id} except Exception as error: return {'success': False, 'error': str(error)} # ==================== SKILLS FUNCTIONS ==================== def add_skill(skill_name, category=None, proficiency=None, years_experience=None, notes=None): """Add a skill""" try: conn = get_connection() cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO skills (skill_name, category, proficiency, years_experience, notes) VALUES (?, ?, ?, ?, ?) ''', (skill_name, category, proficiency, years_experience, notes)) conn.commit() conn.close() return {'success': True, 'skill': skill_name} except Exception as error: return {'success': False, 'error': str(error)} def get_skills(category=None): """Get all skills, optionally filtered by category""" try: conn = get_connection() cursor = conn.cursor() if category: cursor.execute('SELECT * FROM skills WHERE category = ? ORDER BY skill_name', (category,)) else: cursor.execute('SELECT * FROM skills ORDER BY category, skill_name') rows = cursor.fetchall() conn.close() return {'success': True, 'skills': [dict(row) for row in rows]} except Exception as error: return {'success': False, 'error': str(error)} # ==================== WORK EXPERIENCE FUNCTIONS ==================== def add_work_experience(company, title, **kwargs): """Add work experience""" try: conn = get_connection() cursor = conn.cursor() fields = ['company', 'title'] values = [company, title] optional_fields = ['start_date', 'end_date', 'is_current', 'description', 'achievements', 'technologies'] for field in optional_fields: if field in kwargs: fields.append(field) values.append(kwargs[field]) placeholders = ', '.join(['?'] * len(values)) field_names = ', '.join(fields) cursor.execute(f''' INSERT INTO work_experience ({field_names}) VALUES ({placeholders}) ''', values) exp_id = cursor.lastrowid conn.commit() conn.close() return {'success': True, 'experienceId': exp_id} except Exception as error: return {'success': False, 'error': str(error)} def get_work_experience(): """Get all work experience""" try: conn = get_connection() cursor = conn.cursor() cursor.execute('SELECT * FROM work_experience ORDER BY start_date DESC') rows = cursor.fetchall() conn.close() return {'success': True, 'experience': [dict(row) for row in rows]} except Exception as error: return {'success': False, 'error': str(error)} # ==================== EDUCATION FUNCTIONS ==================== def add_education(institution, **kwargs): """Add education entry""" try: conn = get_connection() cursor = conn.cursor() fields = ['institution'] values = [institution] optional_fields = ['degree', 'field_of_study', 'start_date', 'end_date', 'gpa', 'achievements'] for field in optional_fields: if field in kwargs: fields.append(field) values.append(kwargs[field]) placeholders = ', '.join(['?'] * len(values)) field_names = ', '.join(fields) cursor.execute(f''' INSERT INTO education ({field_names}) VALUES ({placeholders}) ''', values) edu_id = cursor.lastrowid conn.commit() conn.close() return {'success': True, 'educationId': edu_id} except Exception as error: return {'success': False, 'error': str(error)} def get_education(): """Get all education entries""" try: conn = get_connection() cursor = conn.cursor() cursor.execute('SELECT * FROM education ORDER BY end_date DESC') rows = cursor.fetchall() conn.close() return {'success': True, 'education': [dict(row) for row in rows]} except Exception as error: return {'success': False, 'error': str(error)} # ==================== TASKS FUNCTIONS ==================== def add_task(title, **kwargs): """Add a task/reminder""" try: conn = get_connection() cursor = conn.cursor() fields = ['title'] values = [title] optional_fields = ['description', 'due_date', 'priority', 'status', 'category', 'related_application_id'] for field in optional_fields: if field in kwargs: fields.append(field) values.append(kwargs[field]) placeholders = ', '.join(['?'] * len(values)) field_names = ', '.join(fields) cursor.execute(f''' INSERT INTO tasks ({field_names}) VALUES ({placeholders}) ''', values) task_id = cursor.lastrowid conn.commit() conn.close() return {'success': True, 'taskId': task_id} except Exception as error: return {'success': False, 'error': str(error)} def get_tasks(status=None, category=None, include_completed=False): """Get tasks with optional filters""" try: conn = get_connection() cursor = conn.cursor() query = 'SELECT * FROM tasks WHERE 1=1' params = [] if status: query += ' AND status = ?' params.append(status) if category: query += ' AND category = ?' params.append(category) if not include_completed: query += ' AND status != ?' params.append('completed') query += ' ORDER BY due_date ASC, priority DESC' cursor.execute(query, params) rows = cursor.fetchall() conn.close() return {'success': True, 'tasks': [dict(row) for row in rows]} except Exception as error: return {'success': False, 'error': str(error)} def complete_task(task_id): """Mark a task as completed""" try: conn = get_connection() cursor = conn.cursor() cursor.execute(''' UPDATE tasks SET status = 'completed', completed_at = ? WHERE id = ? ''', (datetime.now(), task_id)) conn.commit() conn.close() return {'success': True, 'taskId': task_id, 'status': 'completed'} except Exception as error: return {'success': False, 'error': str(error)} # ==================== NOTES FUNCTIONS ==================== def add_note(content, title=None, category=None, tags=None): """Add a note""" try: conn = get_connection() cursor = conn.cursor() tags_str = ','.join(tags) if isinstance(tags, list) else tags cursor.execute(''' INSERT INTO notes (title, content, category, tags) VALUES (?, ?, ?, ?) ''', (title, content, category, tags_str)) note_id = cursor.lastrowid conn.commit() conn.close() return {'success': True, 'noteId': note_id} except Exception as error: return {'success': False, 'error': str(error)} def get_notes(category=None, search=None, limit=50): """Get notes with optional filters""" try: conn = get_connection() cursor = conn.cursor() query = 'SELECT * FROM notes WHERE 1=1' params = [] if category: query += ' AND category = ?' params.append(category) if search: query += ' AND (title LIKE ? OR content LIKE ? OR tags LIKE ?)' params.extend([f'%{search}%'] * 3) query += ' ORDER BY updated_at DESC LIMIT ?' params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() conn.close() return {'success': True, 'notes': [dict(row) for row in rows]} except Exception as error: return {'success': False, 'error': str(error)} # ==================== EMAIL TEMPLATES FUNCTIONS ==================== def add_email_template(name, body, subject=None, template_type=None): """Add an email template""" try: conn = get_connection() cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO email_templates (name, subject, body, template_type) VALUES (?, ?, ?, ?) ''', (name, subject, body, template_type)) conn.commit() conn.close() return {'success': True, 'templateName': name} except Exception as error: return {'success': False, 'error': str(error)} def get_email_template(name): """Get an email template by name""" try: conn = get_connection() cursor = conn.cursor() cursor.execute('SELECT * FROM email_templates WHERE name = ?', (name,)) row = cursor.fetchone() conn.close() if row: return {'success': True, 'template': dict(row)} return {'success': False, 'error': 'Template not found'} except Exception as error: return {'success': False, 'error': str(error)} def list_email_templates(template_type=None): """List all email templates""" try: conn = get_connection() cursor = conn.cursor() if template_type: cursor.execute('SELECT * FROM email_templates WHERE template_type = ? ORDER BY name', (template_type,)) else: cursor.execute('SELECT * FROM email_templates ORDER BY name') rows = cursor.fetchall() conn.close() return {'success': True, 'templates': [dict(row) for row in rows]} except Exception as error: return {'success': False, 'error': str(error)} # ==================== SUMMARY/ANALYTICS FUNCTIONS ==================== def get_job_search_summary(): """Get summary statistics of job search""" try: conn = get_connection() cursor = conn.cursor() # Total applications cursor.execute('SELECT COUNT(*) as total FROM job_applications') total = cursor.fetchone()['total'] # By status cursor.execute(''' SELECT status, COUNT(*) as count FROM job_applications GROUP BY status ''') by_status = {row['status']: row['count'] for row in cursor.fetchall()} # Recent applications (last 7 days) cursor.execute(''' SELECT COUNT(*) as recent FROM job_applications WHERE date_applied >= date('now', '-7 days') ''') recent = cursor.fetchone()['recent'] # Upcoming interviews cursor.execute(''' SELECT COUNT(*) as upcoming FROM interviews WHERE interview_date >= datetime('now') AND status = 'scheduled' ''') upcoming_interviews = cursor.fetchone()['upcoming'] # Pending tasks cursor.execute(''' SELECT COUNT(*) as pending FROM tasks WHERE status = 'pending' ''') pending_tasks = cursor.fetchone()['pending'] conn.close() return { 'success': True, 'summary': { 'total_applications': total, 'applications_by_status': by_status, 'applications_last_7_days': recent, 'upcoming_interviews': upcoming_interviews, 'pending_tasks': pending_tasks } } except Exception as error: return {'success': False, 'error': str(error)} def get_full_profile(): """Get complete profile data including all stored information""" try: profile = get_profile() skills = get_skills() experience = get_work_experience() education = get_education() return { 'success': True, 'full_profile': { 'personal_info': profile.get('profile', {}), 'skills': skills.get('skills', []), 'work_experience': experience.get('experience', []), 'education': education.get('education', []) } } except Exception as error: return {'success': False, 'error': str(error)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/google-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server