Skip to main content
Glama

MCP SQLite Demo

by ELISHA1994
hr_server.py58.7 kB
import sqlite3 import argparse from mcp.server.fastmcp import FastMCP from datetime import datetime, date, timedelta from typing import Dict, List, Optional, Any import json import re mcp = FastMCP('hr-management-system') # Database connection management def get_db_connection(): conn = sqlite3.connect('hr_management.db') conn.row_factory = sqlite3.Row return conn def init_db(): """Initialize the HR management database with all required tables""" conn = get_db_connection() cursor = conn.cursor() # Departments table cursor.execute(''' CREATE TABLE IF NOT EXISTS departments ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, parent_id INTEGER REFERENCES departments(id), manager_id INTEGER REFERENCES employees(id), budget DECIMAL(15,2), cost_center TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Positions table cursor.execute(''' CREATE TABLE IF NOT EXISTS positions ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, department_id INTEGER REFERENCES departments(id), level TEXT, min_salary DECIMAL(10,2), max_salary DECIMAL(10,2), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Employees table (comprehensive) cursor.execute(''' CREATE TABLE IF NOT EXISTS employees ( id INTEGER PRIMARY KEY AUTOINCREMENT, employee_id TEXT UNIQUE NOT NULL, first_name TEXT NOT NULL, last_name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, phone TEXT, date_of_birth DATE, gender TEXT, marital_status TEXT, address TEXT, city TEXT, state TEXT, country TEXT, postal_code TEXT, department_id INTEGER REFERENCES departments(id), position_id INTEGER REFERENCES positions(id), manager_id INTEGER REFERENCES employees(id), hire_date DATE NOT NULL, employment_status TEXT DEFAULT 'active', employment_type TEXT DEFAULT 'full-time', work_location TEXT DEFAULT 'office', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Salaries table cursor.execute(''' CREATE TABLE IF NOT EXISTS salaries ( id INTEGER PRIMARY KEY AUTOINCREMENT, employee_id INTEGER REFERENCES employees(id), base_salary DECIMAL(10,2), bonus DECIMAL(10,2), commission DECIMAL(10,2), effective_date DATE, end_date DATE, currency TEXT DEFAULT 'USD', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Leave types cursor.execute(''' CREATE TABLE IF NOT EXISTS leave_types ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, days_per_year INTEGER, carry_forward BOOLEAN DEFAULT FALSE, max_carry_forward INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Leave balances cursor.execute(''' CREATE TABLE IF NOT EXISTS leave_balances ( employee_id INTEGER REFERENCES employees(id), leave_type_id INTEGER REFERENCES leave_types(id), year INTEGER, entitled_days INTEGER, used_days DECIMAL(5,2) DEFAULT 0, carried_forward DECIMAL(5,2) DEFAULT 0, remaining_days DECIMAL(5,2), PRIMARY KEY (employee_id, leave_type_id, year) ) ''') # Leave requests cursor.execute(''' CREATE TABLE IF NOT EXISTS leave_requests ( id INTEGER PRIMARY KEY AUTOINCREMENT, employee_id INTEGER REFERENCES employees(id), leave_type_id INTEGER REFERENCES leave_types(id), start_date DATE, end_date DATE, days_requested DECIMAL(5,2), reason TEXT, status TEXT DEFAULT 'pending', approved_by INTEGER REFERENCES employees(id), approved_date TIMESTAMP, comments TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Performance reviews cursor.execute(''' CREATE TABLE IF NOT EXISTS performance_reviews ( id INTEGER PRIMARY KEY AUTOINCREMENT, employee_id INTEGER REFERENCES employees(id), reviewer_id INTEGER REFERENCES employees(id), review_period_start DATE, review_period_end DATE, overall_rating INTEGER CHECK(overall_rating >= 1 AND overall_rating <= 5), goals_achieved TEXT, areas_of_improvement TEXT, accomplishments TEXT, next_review_date DATE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Training programs cursor.execute(''' CREATE TABLE IF NOT EXISTS training_programs ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, duration_hours INTEGER, is_mandatory BOOLEAN DEFAULT FALSE, department_specific INTEGER REFERENCES departments(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Employee training records cursor.execute(''' CREATE TABLE IF NOT EXISTS employee_training ( employee_id INTEGER REFERENCES employees(id), training_id INTEGER REFERENCES training_programs(id), enrollment_date DATE, completion_date DATE, score DECIMAL(5,2), certificate_url TEXT, status TEXT DEFAULT 'enrolled', PRIMARY KEY (employee_id, training_id) ) ''') # Audit log for compliance cursor.execute(''' CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, action TEXT NOT NULL, entity_type TEXT NOT NULL, entity_id INTEGER, old_values TEXT, new_values TEXT, ip_address TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Insert default leave types cursor.execute(''' INSERT OR IGNORE INTO leave_types (name, days_per_year, carry_forward, max_carry_forward) VALUES ('Annual Leave', 21, TRUE, 10), ('Sick Leave', 10, FALSE, 0), ('Personal Leave', 5, FALSE, 0), ('Maternity Leave', 90, FALSE, 0), ('Paternity Leave', 14, FALSE, 0), ('Bereavement Leave', 3, FALSE, 0) ''') conn.commit() conn.close() # Helper functions def validate_email(email: str) -> bool: """Validate email format""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def generate_employee_id() -> str: """Generate unique employee ID""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) as count FROM employees") count = cursor.fetchone()['count'] conn.close() return f"EMP{str(count + 1).zfill(5)}" def calculate_leave_days(start_date: str, end_date: str) -> float: """Calculate number of days between two dates""" start = datetime.strptime(start_date, '%Y-%m-%d') end = datetime.strptime(end_date, '%Y-%m-%d') return (end - start).days + 1 def log_audit(action: str, entity_type: str, entity_id: int, old_values: dict = None, new_values: dict = None): """Log actions for audit trail""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(''' INSERT INTO audit_log (action, entity_type, entity_id, old_values, new_values) VALUES (?, ?, ?, ?, ?) ''', (action, entity_type, entity_id, json.dumps(old_values) if old_values else None, json.dumps(new_values) if new_values else None)) conn.commit() conn.close() # Employee Management Tools @mcp.tool() def manage_employee( action: str, employee_data: Dict[str, Any] ) -> Dict[str, Any]: """ Comprehensive employee management - add, update, terminate, or reactivate employees. Args: action: One of 'add', 'update', 'terminate', 'reactivate' employee_data: Dictionary containing employee information For 'add': first_name, last_name, email (required), hire_date (required, use today if not specified) Optional: phone, department_name, position_title, manager_name, salary For 'update': employee_id and fields to update For 'terminate': employee_id and termination_date For 'reactivate': employee_id Returns: Dictionary with success status and employee details Note: Department and position can be specified by name and will be created if they don't exist. """ conn = get_db_connection() cursor = conn.cursor() try: if action == 'add': # Validate required fields required = ['first_name', 'last_name', 'email'] for field in required: if field not in employee_data or not employee_data[field]: return {"success": False, "error": f"Missing required field: {field}"} # Validate email if not validate_email(employee_data['email']): return {"success": False, "error": "Invalid email format"} # Set hire_date to today if not provided if 'hire_date' not in employee_data or not employee_data['hire_date']: employee_data['hire_date'] = date.today().isoformat() # Generate employee ID employee_id = generate_employee_id() # Handle department by name department_id = None if 'department_name' in employee_data and employee_data['department_name']: cursor.execute('SELECT id FROM departments WHERE name = ?', (employee_data['department_name'],)) dept = cursor.fetchone() if dept: department_id = dept['id'] else: # Create department if it doesn't exist cursor.execute('INSERT INTO departments (name) VALUES (?)', (employee_data['department_name'],)) department_id = cursor.lastrowid elif 'department_id' in employee_data: department_id = employee_data['department_id'] # Handle position by title position_id = None if 'position_title' in employee_data and employee_data['position_title']: cursor.execute('SELECT id FROM positions WHERE title = ? AND (department_id = ? OR department_id IS NULL)', (employee_data['position_title'], department_id)) pos = cursor.fetchone() if pos: position_id = pos['id'] else: # Create position if it doesn't exist cursor.execute('INSERT INTO positions (title, department_id) VALUES (?, ?)', (employee_data['position_title'], department_id)) position_id = cursor.lastrowid elif 'position_id' in employee_data: position_id = employee_data['position_id'] # Handle manager by name manager_id = None if 'manager_name' in employee_data and employee_data['manager_name']: # Try to find manager by full name manager_parts = employee_data['manager_name'].split() if len(manager_parts) >= 2: cursor.execute(''' SELECT id FROM employees WHERE first_name = ? AND last_name = ? AND employment_status = 'active' ''', (manager_parts[0], ' '.join(manager_parts[1:]))) manager = cursor.fetchone() if manager: manager_id = manager['id'] elif 'manager_id' in employee_data: manager_id = employee_data['manager_id'] # Insert employee cursor.execute(''' INSERT INTO employees ( employee_id, first_name, last_name, email, phone, date_of_birth, gender, marital_status, address, city, state, country, postal_code, department_id, position_id, manager_id, hire_date, employment_type, work_location ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( employee_id, employee_data['first_name'], employee_data['last_name'], employee_data['email'], employee_data.get('phone'), employee_data.get('date_of_birth'), employee_data.get('gender'), employee_data.get('marital_status'), employee_data.get('address'), employee_data.get('city'), employee_data.get('state'), employee_data.get('country', 'USA'), employee_data.get('postal_code'), department_id, position_id, manager_id, employee_data['hire_date'], employee_data.get('employment_type', 'full-time'), employee_data.get('work_location', 'office') )) emp_id = cursor.lastrowid # Add initial salary if provided if 'salary' in employee_data: cursor.execute(''' INSERT INTO salaries (employee_id, base_salary, effective_date) VALUES (?, ?, ?) ''', (emp_id, employee_data['salary'], employee_data['hire_date'])) # Initialize leave balances for current year current_year = datetime.now().year cursor.execute('SELECT id, days_per_year FROM leave_types') leave_types = cursor.fetchall() for leave_type in leave_types: # Pro-rate leave days based on hire date hire_date = datetime.strptime(employee_data['hire_date'], '%Y-%m-%d') if hire_date.year == current_year: days_remaining = (date(current_year, 12, 31) - hire_date.date()).days prorated_days = round((leave_type['days_per_year'] * days_remaining) / 365, 1) else: prorated_days = leave_type['days_per_year'] cursor.execute(''' INSERT INTO leave_balances (employee_id, leave_type_id, year, entitled_days, remaining_days) VALUES (?, ?, ?, ?, ?) ''', (emp_id, leave_type['id'], current_year, prorated_days, prorated_days)) conn.commit() log_audit('CREATE', 'employee', emp_id, None, employee_data) return { "success": True, "employee_id": employee_id, "message": f"Employee {employee_data['first_name']} {employee_data['last_name']} added successfully" } elif action == 'update': if 'employee_id' not in employee_data: return {"success": False, "error": "employee_id required for update"} # Get current employee data for audit cursor.execute('SELECT * FROM employees WHERE employee_id = ?', (employee_data['employee_id'],)) current = cursor.fetchone() if not current: return {"success": False, "error": "Employee not found"} # Build update query update_fields = [] values = [] for field, value in employee_data.items(): if field != 'employee_id' and field in ['first_name', 'last_name', 'email', 'phone', 'department_id', 'position_id', 'manager_id', 'employment_status', 'work_location']: update_fields.append(f"{field} = ?") values.append(value) if update_fields: values.append(employee_data['employee_id']) cursor.execute(f''' UPDATE employees SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP WHERE employee_id = ? ''', values) conn.commit() log_audit('UPDATE', 'employee', current['id'], dict(current), employee_data) return {"success": True, "message": "Employee updated successfully"} return {"success": False, "error": "No valid fields to update"} elif action == 'terminate': if 'employee_id' not in employee_data: return {"success": False, "error": "employee_id required for termination"} cursor.execute(''' UPDATE employees SET employment_status = 'terminated', updated_at = CURRENT_TIMESTAMP WHERE employee_id = ? ''', (employee_data['employee_id'],)) # End current salary record if 'termination_date' in employee_data: cursor.execute(''' UPDATE salaries SET end_date = ? WHERE employee_id = (SELECT id FROM employees WHERE employee_id = ?) AND end_date IS NULL ''', (employee_data['termination_date'], employee_data['employee_id'])) conn.commit() log_audit('TERMINATE', 'employee', employee_data['employee_id'], None, employee_data) return {"success": True, "message": "Employee terminated successfully"} elif action == 'reactivate': if 'employee_id' not in employee_data: return {"success": False, "error": "employee_id required for reactivation"} cursor.execute(''' UPDATE employees SET employment_status = 'active', updated_at = CURRENT_TIMESTAMP WHERE employee_id = ? ''', (employee_data['employee_id'],)) conn.commit() log_audit('REACTIVATE', 'employee', employee_data['employee_id'], None, None) return {"success": True, "message": "Employee reactivated successfully"} else: return {"success": False, "error": f"Unknown action: {action}"} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() @mcp.tool() def search_employees( criteria: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Advanced employee search with multiple filters. Args: criteria: Search criteria dictionary - name: Search by first or last name (partial match) - department: Department name or ID - position: Position title (partial match) - status: Employment status (active, terminated, on_leave) - manager: Manager's name or employee_id - hire_date_from: Employees hired after this date - hire_date_to: Employees hired before this date - location: Work location Returns: List of employee records matching criteria """ conn = get_db_connection() cursor = conn.cursor() query = ''' SELECT e.employee_id, e.first_name, e.last_name, e.email, e.phone, e.employment_status, e.hire_date, d.name as department_name, p.title as position_title, m.first_name || ' ' || m.last_name as manager_name, s.base_salary as current_salary FROM employees e LEFT JOIN departments d ON e.department_id = d.id LEFT JOIN positions p ON e.position_id = p.id LEFT JOIN employees m ON e.manager_id = m.id LEFT JOIN ( SELECT employee_id, base_salary FROM salaries WHERE end_date IS NULL ) s ON e.id = s.employee_id WHERE 1=1 ''' params = [] if criteria.get('name'): query += " AND (e.first_name LIKE ? OR e.last_name LIKE ?)" name_pattern = f"%{criteria['name']}%" params.extend([name_pattern, name_pattern]) if criteria.get('department'): if isinstance(criteria['department'], int): query += " AND e.department_id = ?" params.append(criteria['department']) else: query += " AND d.name LIKE ?" params.append(f"%{criteria['department']}%") if criteria.get('position'): query += " AND p.title LIKE ?" params.append(f"%{criteria['position']}%") if criteria.get('status'): query += " AND e.employment_status = ?" params.append(criteria['status']) if criteria.get('manager'): if criteria['manager'].startswith('EMP'): query += " AND m.employee_id = ?" params.append(criteria['manager']) else: query += " AND (m.first_name LIKE ? OR m.last_name LIKE ?)" manager_pattern = f"%{criteria['manager']}%" params.extend([manager_pattern, manager_pattern]) if criteria.get('hire_date_from'): query += " AND e.hire_date >= ?" params.append(criteria['hire_date_from']) if criteria.get('hire_date_to'): query += " AND e.hire_date <= ?" params.append(criteria['hire_date_to']) if criteria.get('location'): query += " AND e.work_location = ?" params.append(criteria['location']) cursor.execute(query, params) results = [] for row in cursor.fetchall(): results.append(dict(row)) conn.close() return results # Organizational Structure Tools @mcp.tool() def get_org_chart( department_id: Optional[int] = None, include_all_levels: bool = True ) -> Dict[str, Any]: """ Generate organizational hierarchy chart. Args: department_id: Specific department ID (None for entire organization) include_all_levels: Whether to include all subordinate levels Returns: Hierarchical organization structure """ conn = get_db_connection() cursor = conn.cursor() def build_hierarchy(manager_id=None, dept_id=None): if dept_id: cursor.execute(''' SELECT e.id, e.employee_id, e.first_name, e.last_name, p.title, d.name as department FROM employees e LEFT JOIN positions p ON e.position_id = p.id LEFT JOIN departments d ON e.department_id = d.id WHERE e.department_id = ? AND e.employment_status = 'active' AND (e.manager_id = ? OR (? IS NULL AND e.manager_id IS NULL)) ''', (dept_id, manager_id, manager_id)) else: cursor.execute(''' SELECT e.id, e.employee_id, e.first_name, e.last_name, p.title, d.name as department FROM employees e LEFT JOIN positions p ON e.position_id = p.id LEFT JOIN departments d ON e.department_id = d.id WHERE e.employment_status = 'active' AND (e.manager_id = ? OR (? IS NULL AND e.manager_id IS NULL)) ''', (manager_id, manager_id)) employees = cursor.fetchall() org_structure = [] for emp in employees: emp_dict = { "employee_id": emp['employee_id'], "name": f"{emp['first_name']} {emp['last_name']}", "position": emp['title'], "department": emp['department'] } if include_all_levels: subordinates = build_hierarchy(emp['id'], dept_id if dept_id else None) if subordinates: emp_dict["reports"] = subordinates org_structure.append(emp_dict) return org_structure org_chart = build_hierarchy(None, department_id) # Get department info if specified if department_id: cursor.execute('SELECT name FROM departments WHERE id = ?', (department_id,)) dept = cursor.fetchone() result = { "department": dept['name'] if dept else "Unknown", "structure": org_chart } else: result = {"organization": org_chart} conn.close() return result @mcp.tool() def manage_department( action: str, department_data: Dict[str, Any] ) -> Dict[str, Any]: """ Create, update, merge, or close departments. Args: action: One of 'create', 'update', 'merge', 'close' department_data: Department information For 'create': name, parent_id (optional), manager_id, budget For 'update': department_id and fields to update For 'merge': source_id, target_id For 'close': department_id Returns: Operation result """ conn = get_db_connection() cursor = conn.cursor() try: if action == 'create': if 'name' not in department_data: return {"success": False, "error": "Department name required"} cursor.execute(''' INSERT INTO departments (name, parent_id, manager_id, budget, cost_center) VALUES (?, ?, ?, ?, ?) ''', ( department_data['name'], department_data.get('parent_id'), department_data.get('manager_id'), department_data.get('budget'), department_data.get('cost_center') )) dept_id = cursor.lastrowid conn.commit() log_audit('CREATE', 'department', dept_id, None, department_data) return {"success": True, "department_id": dept_id, "message": "Department created successfully"} elif action == 'update': if 'department_id' not in department_data: return {"success": False, "error": "department_id required"} update_fields = [] values = [] for field in ['name', 'parent_id', 'manager_id', 'budget', 'cost_center']: if field in department_data: update_fields.append(f"{field} = ?") values.append(department_data[field]) if update_fields: values.append(department_data['department_id']) cursor.execute(f''' UPDATE departments SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', values) conn.commit() return {"success": True, "message": "Department updated successfully"} return {"success": False, "error": "No fields to update"} elif action == 'merge': if 'source_id' not in department_data or 'target_id' not in department_data: return {"success": False, "error": "source_id and target_id required"} # Move all employees from source to target department cursor.execute(''' UPDATE employees SET department_id = ? WHERE department_id = ? ''', (department_data['target_id'], department_data['source_id'])) # Update positions cursor.execute(''' UPDATE positions SET department_id = ? WHERE department_id = ? ''', (department_data['target_id'], department_data['source_id'])) # Delete source department cursor.execute('DELETE FROM departments WHERE id = ?', (department_data['source_id'],)) conn.commit() log_audit('MERGE', 'department', department_data['source_id'], {'source': department_data['source_id']}, {'target': department_data['target_id']}) return {"success": True, "message": "Departments merged successfully"} else: return {"success": False, "error": f"Unknown action: {action}"} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() # Leave Management Tools @mcp.tool() def request_leave( employee_id: str, leave_type: str, start_date: str, end_date: str, reason: str = "" ) -> Dict[str, Any]: """ Submit a leave request. Args: employee_id: Employee ID leave_type: Type of leave (e.g., 'Annual Leave', 'Sick Leave') start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) reason: Reason for leave Returns: Request submission result """ conn = get_db_connection() cursor = conn.cursor() try: # Get employee and leave type IDs cursor.execute('SELECT id FROM employees WHERE employee_id = ?', (employee_id,)) emp = cursor.fetchone() if not emp: return {"success": False, "error": "Employee not found"} cursor.execute('SELECT id FROM leave_types WHERE name = ?', (leave_type,)) leave = cursor.fetchone() if not leave: return {"success": False, "error": "Invalid leave type"} # Calculate days requested days_requested = calculate_leave_days(start_date, end_date) # Check leave balance year = datetime.strptime(start_date, '%Y-%m-%d').year cursor.execute(''' SELECT remaining_days FROM leave_balances WHERE employee_id = ? AND leave_type_id = ? AND year = ? ''', (emp['id'], leave['id'], year)) balance = cursor.fetchone() if not balance or balance['remaining_days'] < days_requested: return {"success": False, "error": "Insufficient leave balance"} # Create leave request cursor.execute(''' INSERT INTO leave_requests ( employee_id, leave_type_id, start_date, end_date, days_requested, reason, status ) VALUES (?, ?, ?, ?, ?, ?, 'pending') ''', (emp['id'], leave['id'], start_date, end_date, days_requested, reason)) request_id = cursor.lastrowid conn.commit() return { "success": True, "request_id": request_id, "message": f"Leave request submitted for {days_requested} days", "remaining_balance": balance['remaining_days'] - days_requested } except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() @mcp.tool() def approve_leave( request_id: int, approver_id: str, action: str = "approve", comments: str = "" ) -> Dict[str, Any]: """ Approve or reject leave requests. Args: request_id: Leave request ID approver_id: Approver's employee ID action: 'approve' or 'reject' comments: Optional comments Returns: Approval result """ conn = get_db_connection() cursor = conn.cursor() try: # Get approver ID cursor.execute('SELECT id FROM employees WHERE employee_id = ?', (approver_id,)) approver = cursor.fetchone() if not approver: return {"success": False, "error": "Approver not found"} # Get leave request details cursor.execute(''' SELECT lr.*, e.first_name, e.last_name, lt.name as leave_type FROM leave_requests lr JOIN employees e ON lr.employee_id = e.id JOIN leave_types lt ON lr.leave_type_id = lt.id WHERE lr.id = ? ''', (request_id,)) request = cursor.fetchone() if not request: return {"success": False, "error": "Leave request not found"} if request['status'] != 'pending': return {"success": False, "error": f"Request already {request['status']}"} if action == 'approve': # Update request status cursor.execute(''' UPDATE leave_requests SET status = 'approved', approved_by = ?, approved_date = CURRENT_TIMESTAMP, comments = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (approver['id'], comments, request_id)) # Update leave balance year = datetime.strptime(request['start_date'], '%Y-%m-%d').year cursor.execute(''' UPDATE leave_balances SET used_days = used_days + ?, remaining_days = remaining_days - ? WHERE employee_id = ? AND leave_type_id = ? AND year = ? ''', (request['days_requested'], request['days_requested'], request['employee_id'], request['leave_type_id'], year)) message = f"Leave request approved for {request['first_name']} {request['last_name']}" else: # reject cursor.execute(''' UPDATE leave_requests SET status = 'rejected', approved_by = ?, approved_date = CURRENT_TIMESTAMP, comments = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (approver['id'], comments, request_id)) message = f"Leave request rejected for {request['first_name']} {request['last_name']}" conn.commit() log_audit(action.upper(), 'leave_request', request_id, None, {'comments': comments}) return {"success": True, "message": message} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() @mcp.tool() def get_leave_balance( employee_id: str, year: Optional[int] = None ) -> Dict[str, Any]: """ Check leave balances for an employee. Args: employee_id: Employee ID year: Year (defaults to current year) Returns: Leave balance details """ conn = get_db_connection() cursor = conn.cursor() if not year: year = datetime.now().year # Get employee cursor.execute('SELECT id, first_name, last_name FROM employees WHERE employee_id = ?', (employee_id,)) emp = cursor.fetchone() if not emp: return {"success": False, "error": "Employee not found"} # Get leave balances cursor.execute(''' SELECT lt.name as leave_type, lb.entitled_days, lb.used_days, lb.carried_forward, lb.remaining_days FROM leave_balances lb JOIN leave_types lt ON lb.leave_type_id = lt.id WHERE lb.employee_id = ? AND lb.year = ? ''', (emp['id'], year)) balances = [] for row in cursor.fetchall(): balances.append(dict(row)) # Get pending requests cursor.execute(''' SELECT lt.name as leave_type, lr.start_date, lr.end_date, lr.days_requested FROM leave_requests lr JOIN leave_types lt ON lr.leave_type_id = lt.id WHERE lr.employee_id = ? AND lr.status = 'pending' AND strftime('%Y', lr.start_date) = ? ''', (emp['id'], str(year))) pending = [] for row in cursor.fetchall(): pending.append(dict(row)) conn.close() return { "employee": f"{emp['first_name']} {emp['last_name']}", "year": year, "balances": balances, "pending_requests": pending } # Compensation & Benefits Tools @mcp.tool() def update_salary( employee_id: str, new_salary: float, effective_date: str, bonus: Optional[float] = None, reason: str = "" ) -> Dict[str, Any]: """ Update employee salary with history tracking. Args: employee_id: Employee ID new_salary: New base salary effective_date: Effective date (YYYY-MM-DD) bonus: Optional bonus amount reason: Reason for salary change Returns: Update result """ conn = get_db_connection() cursor = conn.cursor() try: # Get employee cursor.execute('SELECT id FROM employees WHERE employee_id = ?', (employee_id,)) emp = cursor.fetchone() if not emp: return {"success": False, "error": "Employee not found"} # End current salary record cursor.execute(''' UPDATE salaries SET end_date = date(?, '-1 day') WHERE employee_id = ? AND end_date IS NULL ''', (effective_date, emp['id'])) # Insert new salary record cursor.execute(''' INSERT INTO salaries (employee_id, base_salary, bonus, effective_date) VALUES (?, ?, ?, ?) ''', (emp['id'], new_salary, bonus, effective_date)) conn.commit() log_audit('SALARY_UPDATE', 'employee', emp['id'], None, {'new_salary': new_salary, 'reason': reason}) return { "success": True, "message": f"Salary updated to {new_salary} effective {effective_date}" } except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() @mcp.tool() def generate_compensation_report( filters: Dict[str, Any] = {} ) -> Dict[str, Any]: """ Generate salary analysis and compensation reports. Args: filters: Optional filters - department: Department name or ID - position: Position title - date: Report date (defaults to current) Returns: Compensation analysis report """ conn = get_db_connection() cursor = conn.cursor() # Base query for salary statistics query = ''' SELECT d.name as department, p.title as position, COUNT(DISTINCT e.id) as employee_count, AVG(s.base_salary) as avg_salary, MIN(s.base_salary) as min_salary, MAX(s.base_salary) as max_salary, SUM(s.base_salary) as total_payroll, AVG(s.bonus) as avg_bonus FROM employees e JOIN salaries s ON e.id = s.employee_id LEFT JOIN departments d ON e.department_id = d.id LEFT JOIN positions p ON e.position_id = p.id WHERE e.employment_status = 'active' AND s.end_date IS NULL ''' params = [] if filters.get('department'): if isinstance(filters['department'], int): query += " AND e.department_id = ?" params.append(filters['department']) else: query += " AND d.name = ?" params.append(filters['department']) if filters.get('position'): query += " AND p.title LIKE ?" params.append(f"%{filters['position']}%") # Group by department and position query += " GROUP BY d.name, p.title" cursor.execute(query, params) report_data = [] total_payroll = 0 total_employees = 0 for row in cursor.fetchall(): report_data.append(dict(row)) total_payroll += row['total_payroll'] or 0 total_employees += row['employee_count'] # Get salary distribution cursor.execute(''' SELECT CASE WHEN base_salary < 50000 THEN 'Under 50k' WHEN base_salary < 75000 THEN '50k-75k' WHEN base_salary < 100000 THEN '75k-100k' WHEN base_salary < 150000 THEN '100k-150k' ELSE 'Over 150k' END as salary_range, COUNT(*) as count FROM salaries s JOIN employees e ON s.employee_id = e.id WHERE e.employment_status = 'active' AND s.end_date IS NULL GROUP BY salary_range ''') salary_distribution = [] for row in cursor.fetchall(): salary_distribution.append(dict(row)) conn.close() return { "summary": { "total_employees": total_employees, "total_payroll": total_payroll, "average_salary": total_payroll / total_employees if total_employees > 0 else 0 }, "by_department_position": report_data, "salary_distribution": salary_distribution, "generated_at": datetime.now().isoformat() } # Analytics & Reporting Tools @mcp.tool() def generate_hr_dashboard() -> Dict[str, Any]: """ Generate comprehensive HR metrics dashboard. Returns: Dashboard with key HR metrics """ conn = get_db_connection() cursor = conn.cursor() # Employee statistics cursor.execute(''' SELECT COUNT(*) as total, SUM(CASE WHEN employment_status = 'active' THEN 1 ELSE 0 END) as active, SUM(CASE WHEN employment_status = 'terminated' THEN 1 ELSE 0 END) as terminated, SUM(CASE WHEN employment_type = 'full-time' THEN 1 ELSE 0 END) as full_time, SUM(CASE WHEN employment_type = 'part-time' THEN 1 ELSE 0 END) as part_time, SUM(CASE WHEN employment_type = 'contractor' THEN 1 ELSE 0 END) as contractors FROM employees ''') employee_stats = dict(cursor.fetchone()) # Department distribution cursor.execute(''' SELECT d.name, COUNT(e.id) as count FROM departments d LEFT JOIN employees e ON d.id = e.department_id AND e.employment_status = 'active' GROUP BY d.name ''') dept_distribution = [] for row in cursor.fetchall(): dept_distribution.append(dict(row)) # Recent hires (last 90 days) cursor.execute(''' SELECT COUNT(*) as new_hires FROM employees WHERE hire_date >= date('now', '-90 days') ''') new_hires = cursor.fetchone()['new_hires'] # Upcoming reviews cursor.execute(''' SELECT COUNT(DISTINCT e.id) as pending_reviews FROM employees e LEFT JOIN performance_reviews pr ON e.id = pr.employee_id WHERE e.employment_status = 'active' AND (pr.id IS NULL OR pr.next_review_date <= date('now', '+30 days')) ''') pending_reviews = cursor.fetchone()['pending_reviews'] # Leave metrics cursor.execute(''' SELECT COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending_leaves, COUNT(CASE WHEN status = 'approved' AND start_date >= date('now') THEN 1 END) as upcoming_leaves FROM leave_requests ''') leave_metrics = dict(cursor.fetchone()) # Gender diversity cursor.execute(''' SELECT gender, COUNT(*) as count FROM employees WHERE employment_status = 'active' AND gender IS NOT NULL GROUP BY gender ''') gender_diversity = [] for row in cursor.fetchall(): gender_diversity.append(dict(row)) # Average tenure cursor.execute(''' SELECT AVG(julianday('now') - julianday(hire_date)) / 365 as avg_tenure_years FROM employees WHERE employment_status = 'active' ''') avg_tenure = cursor.fetchone()['avg_tenure_years'] conn.close() return { "employee_statistics": employee_stats, "department_distribution": dept_distribution, "recent_activity": { "new_hires_90_days": new_hires, "pending_performance_reviews": pending_reviews, "pending_leave_requests": leave_metrics['pending_leaves'], "upcoming_leaves": leave_metrics['upcoming_leaves'] }, "diversity_metrics": { "gender_distribution": gender_diversity, "average_tenure_years": round(avg_tenure, 1) if avg_tenure else 0 }, "generated_at": datetime.now().isoformat() } @mcp.tool() def analyze_turnover( department_id: Optional[int] = None, period: str = "year" ) -> Dict[str, Any]: """ Analyze employee turnover rates and patterns. Args: department_id: Specific department (None for all) period: Analysis period ('month', 'quarter', 'year') Returns: Turnover analysis report """ conn = get_db_connection() cursor = conn.cursor() # Calculate date range if period == "month": date_from = date.today() - timedelta(days=30) elif period == "quarter": date_from = date.today() - timedelta(days=90) else: # year date_from = date.today() - timedelta(days=365) # Base query base_where = " WHERE 1=1" params = [] if department_id: base_where += " AND department_id = ?" params.append(department_id) # Get termination data cursor.execute(f''' SELECT COUNT(*) as terminations, strftime('%Y-%m', updated_at) as month FROM employees {base_where} AND employment_status = 'terminated' AND updated_at >= ? GROUP BY month ''', params + [date_from.isoformat()]) monthly_terminations = {} total_terminations = 0 for row in cursor.fetchall(): monthly_terminations[row['month']] = row['terminations'] total_terminations += row['terminations'] # Get average headcount cursor.execute(f''' SELECT COUNT(*) as active_count FROM employees {base_where} AND employment_status = 'active' ''', params) current_headcount = cursor.fetchone()['active_count'] # Calculate turnover rate avg_headcount = current_headcount + (total_terminations / 2) # Simple average if period == "month": periods = 1 elif period == "quarter": periods = 3 else: # year periods = 12 annual_turnover_rate = (total_terminations / avg_headcount / periods * 12 * 100) if avg_headcount > 0 else 0 # Get reasons if tracked (would need additional field in real implementation) # For now, we'll analyze by department cursor.execute(''' SELECT d.name as department, COUNT(e.id) as terminations FROM employees e LEFT JOIN departments d ON e.department_id = d.id WHERE e.employment_status = 'terminated' AND e.updated_at >= ? GROUP BY d.name ''', [date_from.isoformat()]) by_department = [] for row in cursor.fetchall(): by_department.append(dict(row)) conn.close() return { "period": period, "date_range": { "from": date_from.isoformat(), "to": date.today().isoformat() }, "summary": { "total_terminations": total_terminations, "current_headcount": current_headcount, "annual_turnover_rate": round(annual_turnover_rate, 1) }, "monthly_trend": monthly_terminations, "by_department": by_department } # Performance Management Tools @mcp.tool() def create_performance_review( employee_id: str, reviewer_id: str, period_start: str, period_end: str ) -> Dict[str, Any]: """ Initiate performance review process. Args: employee_id: Employee being reviewed reviewer_id: Reviewer's employee ID period_start: Review period start date period_end: Review period end date Returns: Review creation result """ conn = get_db_connection() cursor = conn.cursor() try: # Get employee and reviewer IDs cursor.execute('SELECT id FROM employees WHERE employee_id = ?', (employee_id,)) emp = cursor.fetchone() if not emp: return {"success": False, "error": "Employee not found"} cursor.execute('SELECT id FROM employees WHERE employee_id = ?', (reviewer_id,)) reviewer = cursor.fetchone() if not reviewer: return {"success": False, "error": "Reviewer not found"} # Create review cursor.execute(''' INSERT INTO performance_reviews ( employee_id, reviewer_id, review_period_start, review_period_end ) VALUES (?, ?, ?, ?) ''', (emp['id'], reviewer['id'], period_start, period_end)) review_id = cursor.lastrowid conn.commit() return { "success": True, "review_id": review_id, "message": "Performance review initiated" } except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() # Simple wrapper tools for better LLM understanding @mcp.tool() def add_employee( first_name: str, last_name: str, email: str, department: str, position: str, salary: Optional[float] = None, phone: Optional[str] = None, hire_date: Optional[str] = None ) -> Dict[str, Any]: """ Add a new employee to the company. Simple interface for adding employees. Args: first_name: Employee's first name last_name: Employee's last name email: Employee's email address department: Department name (will be created if doesn't exist) position: Position/job title (will be created if doesn't exist) salary: Starting salary (optional) phone: Phone number (optional) hire_date: Hire date YYYY-MM-DD (optional, defaults to today) Returns: Success status and employee ID Example: add_employee("John", "Doe", "john.doe@company.com", "Engineering", "Software Engineer", 85000) """ employee_data = { 'first_name': first_name, 'last_name': last_name, 'email': email, 'department_name': department, 'position_title': position, 'phone': phone, 'hire_date': hire_date } if salary: employee_data['salary'] = salary return manage_employee('add', employee_data) @mcp.tool() def list_all_employees() -> List[Dict[str, Any]]: """ List all active employees in the company. Returns: List of all active employees with their details """ return search_employees({'status': 'active'}) @mcp.tool() def find_employees_by_department(department: str) -> List[Dict[str, Any]]: """ Find all employees in a specific department. Args: department: Department name Returns: List of employees in the department """ return search_employees({'department': department}) @mcp.tool() def check_employee_leave_balance( employee_name: str, year: Optional[int] = None ) -> Dict[str, Any]: """ Check leave balance for an employee by their name. Args: employee_name: Full name of the employee (e.g., "John Doe" or "Sarah Johnson") year: Year to check balance for (defaults to current year) Returns: Leave balance details including all leave types and remaining days Example: check_employee_leave_balance("Sarah Johnson") check_employee_leave_balance("John Doe", 2024) """ # Search for employee by name name_parts = employee_name.strip().split(' ', 1) if len(name_parts) < 2: return {"success": False, "error": "Please provide both first and last name"} first_name = name_parts[0] last_name = name_parts[1] # Search for the employee employees = search_employees({'name': first_name}) # Filter by exact name match matching_employees = [ emp for emp in employees if emp['first_name'].lower() == first_name.lower() and emp['last_name'].lower() == last_name.lower() ] if not matching_employees: return {"success": False, "error": f"Employee '{employee_name}' not found"} if len(matching_employees) > 1: return { "success": False, "error": f"Multiple employees found with name '{employee_name}'. Please use employee ID.", "employees": [{"id": emp['employee_id'], "name": f"{emp['first_name']} {emp['last_name']}", "department": emp.get('department_name', 'N/A')} for emp in matching_employees] } # Get leave balance using employee_id employee = matching_employees[0] return get_leave_balance(employee['employee_id'], year) @mcp.tool() def update_employee_salary( employee_name: str, new_salary: float, effective_date: str, bonus: Optional[float] = None, reason: str = "" ) -> Dict[str, Any]: """ Update salary for an employee by their name. Args: employee_name: Full name of the employee (e.g., "John Doe") new_salary: New base salary amount effective_date: Date when new salary takes effect (YYYY-MM-DD) bonus: Optional bonus amount reason: Reason for salary change Returns: Success status and updated salary details Example: update_employee_salary("John Doe", 95000, "2024-01-01", reason="Annual raise") """ # Search for employee by name name_parts = employee_name.strip().split(' ', 1) if len(name_parts) < 2: return {"success": False, "error": "Please provide both first and last name"} first_name = name_parts[0] last_name = name_parts[1] # Search for the employee employees = search_employees({'name': first_name}) # Filter by exact name match matching_employees = [ emp for emp in employees if emp['first_name'].lower() == first_name.lower() and emp['last_name'].lower() == last_name.lower() ] if not matching_employees: return {"success": False, "error": f"Employee '{employee_name}' not found"} if len(matching_employees) > 1: return { "success": False, "error": f"Multiple employees found with name '{employee_name}'. Please use employee ID.", "employees": [{"id": emp['employee_id'], "name": f"{emp['first_name']} {emp['last_name']}", "department": emp.get('department_name', 'N/A')} for emp in matching_employees] } # Update salary using employee_id employee = matching_employees[0] return update_salary(employee['employee_id'], new_salary, effective_date, bonus, reason) @mcp.tool() def submit_leave_request( employee_name: str, leave_type: str, start_date: str, end_date: str, reason: str = "" ) -> Dict[str, Any]: """ Submit a leave request for an employee by their name. Args: employee_name: Full name of the employee (e.g., "John Doe") leave_type: Type of leave (e.g., "Annual Leave", "Sick Leave", "Personal Leave") start_date: Start date of leave (YYYY-MM-DD) end_date: End date of leave (YYYY-MM-DD) reason: Reason for leave (optional) Returns: Success status and leave request details Example: submit_leave_request("Sarah Johnson", "Annual Leave", "2024-12-20", "2024-12-27", "Holiday vacation") """ # Search for employee by name name_parts = employee_name.strip().split(' ', 1) if len(name_parts) < 2: return {"success": False, "error": "Please provide both first and last name"} first_name = name_parts[0] last_name = name_parts[1] # Search for the employee employees = search_employees({'name': first_name}) # Filter by exact name match matching_employees = [ emp for emp in employees if emp['first_name'].lower() == first_name.lower() and emp['last_name'].lower() == last_name.lower() ] if not matching_employees: return {"success": False, "error": f"Employee '{employee_name}' not found"} if len(matching_employees) > 1: return { "success": False, "error": f"Multiple employees found with name '{employee_name}'. Please use employee ID.", "employees": [{"id": emp['employee_id'], "name": f"{emp['first_name']} {emp['last_name']}", "department": emp.get('department_name', 'N/A')} for emp in matching_employees] } # Submit leave request using employee_id employee = matching_employees[0] return request_leave(employee['employee_id'], leave_type, start_date, end_date, reason) # Initialize database when module loads init_db() if __name__ == "__main__": # Start the server print("🚀 Starting HR Management System MCP Server...") parser = argparse.ArgumentParser() parser.add_argument( "--server_type", type=str, default="sse", choices=["sse", "stdio"] ) args = parser.parse_args() mcp.run(args.server_type)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ELISHA1994/mcp-sqlite-demo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server