Skip to main content
Glama
tools.py14.1 kB
# mcp_server/tools.py from datetime import datetime import logging import numpy as np import pandas as pd from openpyxl import load_workbook from openpyxl.utils import get_column_letter from mcp_server.schema import DROPDOWNS from utils.config_loader import load_config from utils.backup import create_backup # Load configuration _config = load_config() EXCEL_PATH = _config.get("excel_path", "data/employees_mcp.xlsx") BACKUP_ENABLED = _config.get("processing", {}).get("backup_before_update", True) BACKUP_DIR = _config.get("processing", {}).get("backup_directory", "backups") # Setup logger logger = logging.getLogger("excel_mcp.tools") # Helper function to convert pandas/numpy types to JSON-serializable types def convert_to_json_serializable(value): """Convert pandas/numpy types to Python native types for JSON serialization.""" if pd.isna(value): return None elif isinstance(value, (np.integer, np.int64, np.int32, np.int8)): return int(value) elif isinstance(value, (np.floating, np.float64, np.float32, np.float16)): return float(value) if not pd.isna(value) else None elif isinstance(value, (np.bool_, bool)): return bool(value) elif hasattr(value, "item"): return value.item() elif isinstance(value, datetime): return value.isoformat() else: return value def get_unprocessed_employees(): """Fetch all unprocessed employee rows from Excel.""" try: logger.info(f"Reading Excel file: {EXCEL_PATH}") df = pd.read_excel(EXCEL_PATH) # Handle case where Is_Processed column might not exist or have NaN values if "Is_Processed" not in df.columns: rows = df logger.warning("Is_Processed column not found, returning all rows") else: rows = df[df["Is_Processed"].fillna("No") != "Yes"] employees = [] # Iterate through filtered rows and get their positional indices for idx, row in rows.iterrows(): data = {} for k, v in row.to_dict().items(): # Convert pandas/numpy types to Python native types for JSON serialization data[k] = convert_to_json_serializable(v) # Get the positional index (iloc position) in the original DataFrame # This works even after saving/reading because iloc uses position, not label try: positional_idx = df.index.get_loc(idx) # Convert to int if it's a numeric type if isinstance(positional_idx, (slice, np.ndarray)): # Handle case where get_loc returns array (shouldn't happen with unique index) positional_idx = int(positional_idx[0]) if hasattr(positional_idx, '__len__') else int(positional_idx) else: positional_idx = int(positional_idx) except (KeyError, TypeError): # Fallback: use the index value directly if get_loc fails positional_idx = int(convert_to_json_serializable(idx)) if idx is not None else 0 employees.append( { "row_id": positional_idx, "data": data, } ) logger.info(f"Found {len(employees)} unprocessed employees") # return an object, not a bare list return {"employees": employees} except Exception as e: logger.error(f"Error fetching unprocessed employees: {e}", exc_info=True) raise def get_all_employees(): """Fetch ALL employee rows from Excel for comprehensive scanning.""" try: logger.info(f"Reading Excel file: {EXCEL_PATH}") df = pd.read_excel(EXCEL_PATH) employees = [] # Iterate through ALL rows for idx, row in df.iterrows(): data = {} for k, v in row.to_dict().items(): # Convert pandas/numpy types to Python native types for JSON serialization data[k] = convert_to_json_serializable(v) # Get the positional index try: positional_idx = df.index.get_loc(idx) if isinstance(positional_idx, (slice, np.ndarray)): positional_idx = int(positional_idx[0]) if hasattr(positional_idx, '__len__') else int(positional_idx) else: positional_idx = int(positional_idx) except (KeyError, TypeError): positional_idx = int(convert_to_json_serializable(idx)) if idx is not None else 0 employees.append( { "row_id": positional_idx, "data": data, } ) logger.info(f"Found {len(employees)} total employees for scanning") return {"employees": employees} except Exception as e: logger.error(f"Error fetching all employees: {e}", exc_info=True) raise except Exception as e: logger.error(f"Error fetching unprocessed employees: {e}", exc_info=True) raise def update_employee_row( row_id: int, updates: dict, reason: str, confidence: float ): """Update employee row with AI decisions, with backup and validation.""" try: logger.info(f"Updating row {row_id} with updates: {updates}") # Create backup before updating if BACKUP_ENABLED: backup_path = create_backup(EXCEL_PATH, BACKUP_DIR) if backup_path: logger.info(f"Created backup: {backup_path}") else: logger.warning("Backup creation failed, continuing without backup") df = pd.read_excel(EXCEL_PATH) # Validate row_id is a valid positional index if row_id < 0 or row_id >= len(df): error_msg = f"Invalid row_id: {row_id}. DataFrame has {len(df)} rows (0-{len(df)-1})" logger.error(error_msg) raise ValueError(error_msg) # Ensure required columns exist if "Is_Processed" not in df.columns: df["Is_Processed"] = "No" if "AI_Decision_Reason" not in df.columns: df["AI_Decision_Reason"] = "" if "Confidence_Score" not in df.columns: df["Confidence_Score"] = 0.0 if "Last_Processed_On" not in df.columns: df["Last_Processed_On"] = None if "Processing_Version" not in df.columns: df["Processing_Version"] = None if "Rule_Applied" not in df.columns: df["Rule_Applied"] = None # 🔐 MCP VALIDATION - only validate non-empty values for col, val in updates.items(): if col in DROPDOWNS and val and str(val).strip(): # Only validate if value is not empty if val not in DROPDOWNS[col]: error_msg = f"Invalid value '{val}' for {col}. Allowed values: {DROPDOWNS[col]}" logger.error(error_msg) raise ValueError(error_msg) # Use iloc for positional indexing (since we save with index=False) # Only update columns that have non-empty values for col, val in updates.items(): if val: # Only update if value is not empty/None if col in df.columns: df.iloc[row_id, df.columns.get_loc(col)] = val else: logger.warning(f"Column '{col}' not found in DataFrame") df.iloc[row_id, df.columns.get_loc("Is_Processed")] = "Yes" df.iloc[row_id, df.columns.get_loc("AI_Decision_Reason")] = reason df.iloc[row_id, df.columns.get_loc("Confidence_Score")] = confidence df.iloc[row_id, df.columns.get_loc("Last_Processed_On")] = datetime.now() # Save while preserving dropdowns/data validation try: # Save updated DataFrame to Excel df.to_excel(EXCEL_PATH, index=False, engine='openpyxl') # Reapply data validation (dropdowns) after saving _reapply_data_validation(EXCEL_PATH, df.columns) logger.info(f"Successfully saved Excel file: {EXCEL_PATH}") except Exception as e: logger.error(f"Error saving Excel file: {e}", exc_info=True) raise return {"status": "updated", "row_id": row_id, "updates_applied": updates} except Exception as e: logger.error(f"Error updating employee row {row_id}: {e}", exc_info=True) raise def _reapply_data_validation(file_path, columns): """ Reapply data validation (dropdowns) to columns after saving. This preserves dropdown functionality in Excel. """ try: from openpyxl.worksheet.datavalidation import DataValidation wb = load_workbook(file_path) ws = wb.active # Get column indices for dropdown columns for col_name in columns: if col_name in DROPDOWNS: col_idx = list(columns).index(col_name) + 1 # +1 because Excel is 1-indexed col_letter = get_column_letter(col_idx) # Create data validation rule dropdown_values = DROPDOWNS[col_name] dv = DataValidation(type="list", formula1=f'"{",".join(dropdown_values)}"', allow_blank=True) # Apply to all data rows (skip header row 1) dv_range = f"{col_letter}2:{col_letter}{ws.max_row}" dv.add(dv_range) ws.add_data_validation(dv) wb.save(file_path) wb.close() except Exception as e: logger.warning(f"Could not reapply data validation: {e}") # Continue even if validation fails - at least data is saved def update_experience_from_doj(): """ Recalculate Experience_Years for all employees based on their DOJ (Date of Joining). This should be run periodically (e.g., once a year) to update experience. """ try: logger.info("Starting experience update from DOJ") # Create backup before updating if BACKUP_ENABLED: backup_path = create_backup(EXCEL_PATH, BACKUP_DIR) if backup_path: logger.info(f"Created backup: {backup_path}") df = pd.read_excel(EXCEL_PATH) if "DOJ" not in df.columns: error_msg = "DOJ column not found" logger.error(error_msg) return {"status": "error", "message": error_msg} if "Experience_Years" not in df.columns: df["Experience_Years"] = 0.0 today = datetime.now().date() updated_count = 0 doj_col_idx = df.columns.get_loc("DOJ") exp_col_idx = df.columns.get_loc("Experience_Years") for row_idx in range(len(df)): doj = df.iloc[row_idx, doj_col_idx] if pd.isna(doj): continue # Convert DOJ to date if it's not already if isinstance(doj, str): try: doj = pd.to_datetime(doj).date() except (ValueError, TypeError): logger.warning(f"Could not parse DOJ for row {row_idx}: {doj}") continue elif hasattr(doj, 'date'): doj = doj.date() elif isinstance(doj, datetime): doj = doj.date() elif isinstance(doj, pd.Timestamp): doj = doj.date() # Calculate years of experience if isinstance(doj, type(today)): years_diff = today - doj experience_years = years_diff.days / 365.25 # Account for leap years df.iloc[row_idx, exp_col_idx] = round(experience_years, 1) updated_count += 1 # Save the updated DataFrame while preserving dropdowns try: df.to_excel(EXCEL_PATH, index=False, engine='openpyxl') _reapply_data_validation(EXCEL_PATH, df.columns) logger.info(f"Updated experience for {updated_count} employees") return {"status": "success", "message": f"Updated experience for {updated_count} employees"} except Exception as e: logger.error(f"Error saving Excel file: {e}", exc_info=True) return {"status": "error", "message": str(e)} except Exception as e: logger.error(f"Error updating experience: {e}", exc_info=True) return {"status": "error", "message": str(e)} def reset_processed_flag_for_reprocessing(): """ Reset Is_Processed flag to "No" for all employees to allow reprocessing. Useful when you want to reprocess all employees (e.g., after updating experience). """ try: logger.info("Resetting Is_Processed flag for all employees") # Create backup before updating if BACKUP_ENABLED: backup_path = create_backup(EXCEL_PATH, BACKUP_DIR) if backup_path: logger.info(f"Created backup: {backup_path}") df = pd.read_excel(EXCEL_PATH) if "Is_Processed" not in df.columns: df["Is_Processed"] = "No" # Reset all rows to "No" df["Is_Processed"] = "No" reset_count = len(df) try: df.to_excel(EXCEL_PATH, index=False, engine='openpyxl') _reapply_data_validation(EXCEL_PATH, df.columns) logger.info(f"Reset Is_Processed flag for {reset_count} employees") return {"status": "success", "message": f"Reset Is_Processed flag for {reset_count} employees. They can now be reprocessed."} except Exception as e: logger.error(f"Error saving Excel file: {e}", exc_info=True) return {"status": "error", "message": str(e)} except Exception as e: logger.error(f"Error resetting processed flag: {e}", exc_info=True) return {"status": "error", "message": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dhruvimahale08/Excel_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server