Skip to main content
Glama

SharePoint MCP Server

by Sofias-ai
resources.py14.1 kB
import base64, os, fitz, io, logging, time, pandas as pd from typing import Dict, Any, List, Optional from docx import Document from openpyxl import load_workbook from .common import logger, SHP_DOC_LIBRARY, sp_context logger = logging.getLogger(__name__) # Configuration FILE_TYPES = { 'text': ['.txt', '.csv', '.json', '.xml', '.html', '.md', '.js', '.css', '.py'], 'pdf': ['.pdf'], 'excel': ['.xlsx', '.xls'], 'word': ['.docx', '.doc'] } # Tree configuration from environment variables with defaults TREE_CONFIG = { 'max_depth': int(os.getenv('SHP_MAX_DEPTH', '15')), 'max_folders_per_level': int(os.getenv('SHP_MAX_FOLDERS_PER_LEVEL', '100')), 'level_delay': float(os.getenv('SHP_LEVEL_DELAY', '0.5')) } # Download configuration DOWNLOAD_CONFIG = { 'fallback_dir': './downloads' } def _get_sp_path(sub_path: Optional[str] = None) -> str: """Create a properly formatted SharePoint path""" return f"{SHP_DOC_LIBRARY}/{sub_path or ''}".rstrip('/') def _ensure_directory_exists(directory: str) -> bool: """Ensure target directory exists, create if necessary""" try: os.makedirs(directory, exist_ok=True) return True except Exception as e: logger.error(f"Failed to create directory {directory}: {e}") return False def _get_fallback_path(file_name: str) -> str: """Generate fallback path for downloads""" fallback_dir = DOWNLOAD_CONFIG['fallback_dir'] _ensure_directory_exists(fallback_dir) return os.path.join(fallback_dir, file_name) def _save_content_to_file(content_bytes: bytes, file_path: str) -> Dict[str, Any]: """Save binary content to local file with error handling""" try: # Ensure directory exists directory = os.path.dirname(file_path) if directory and not _ensure_directory_exists(directory): raise Exception(f"Cannot create directory: {directory}") # Write file with open(file_path, 'wb') as f: f.write(content_bytes) # Verify file was created if os.path.exists(file_path) and os.path.getsize(file_path) == len(content_bytes): return {"success": True, "path": os.path.abspath(file_path), "size": len(content_bytes)} else: raise Exception("File verification failed") except Exception as e: logger.error(f"Failed to save file to {file_path}: {e}") return {"success": False, "error": str(e)} def _load_sp_items(path: str, item_type: str) -> List[Dict[str, Any]]: """Generic function to load folders or files from SharePoint""" folder = sp_context.web.get_folder_by_server_relative_url(path) items = getattr(folder, item_type) props = ["ServerRelativeUrl", "Name", "TimeCreated", "TimeLastModified"] + (["Length"] if item_type == "files" else []) sp_context.load(items, props) sp_context.execute_query() return [{ "name": item.name, "url": item.properties.get("ServerRelativeUrl"), **({"size": item.properties.get("Length")} if item_type == "files" else {}), "created": item.properties.get("TimeCreated").isoformat() if item.properties.get("TimeCreated") else None, "modified": item.properties.get("TimeLastModified").isoformat() if item.properties.get("TimeLastModified") else None } for item in items] def list_folders(parent_folder: Optional[str] = None) -> List[Dict[str, Any]]: """List folders in the specified directory or root if not specified""" logger.info(f"Listing folders in {parent_folder or 'root directory'}") return _load_sp_items(_get_sp_path(parent_folder), "folders") def list_documents(folder_name: str) -> List[Dict[str, Any]]: """List all documents in a specified folder""" logger.info(f"Listing documents in folder: {folder_name}") return _load_sp_items(_get_sp_path(folder_name), "files") def extract_text_from_pdf(pdf_content): """Extract text from PDF using PyMuPDF""" try: pdf_document = fitz.open(stream=pdf_content, filetype="pdf") text_content = "".join(pdf_document[i].get_text() + "\n" for i in range(len(pdf_document))) page_count = len(pdf_document) pdf_document.close() return text_content.strip(), page_count except Exception as e: logger.error(f"Error extracting text from PDF: {e}") raise def extract_text_from_excel(content_bytes): """Extract text from Excel files""" try: sheets = pd.read_excel(io.BytesIO(content_bytes), sheet_name=None) text_parts = [] for sheet_name, df in sheets.items(): text_parts.append(f"=== {sheet_name} ===") text_parts.extend(df.head(50).fillna('').astype(str).apply(' | '.join, axis=1).tolist()) return "\n".join(text_parts), len(sheets) except Exception as e: logger.error(f"Error extracting text from Excel: {e}") raise def extract_text_from_word(content_bytes): """Extract text from Word documents""" try: doc = Document(io.BytesIO(content_bytes)) text_parts = [p.text for p in doc.paragraphs if p.text.strip()] for table in doc.tables: for row in table.rows: text_parts.append(" | ".join(cell.text.strip() for cell in row.cells)) return "\n".join(text_parts), len(doc.paragraphs) except Exception as e: logger.error(f"Error extracting text from Word: {e}") raise def get_folder_tree(parent_folder: Optional[str] = None) -> Dict[str, Any]: """Iteratively build folder tree level by level to avoid recursion limits""" root_path, tree_nodes = _get_sp_path(parent_folder), {} logger.info(f"Building iterative tree for {parent_folder or 'root'}") try: # Get root folder root = sp_context.web.get_folder_by_server_relative_url(root_path) sp_context.load(root, ["Name", "ServerRelativeUrl", "TimeCreated", "TimeLastModified"]) sp_context.execute_query() # Process folders level by level pending = [parent_folder or ""] for level in range(TREE_CONFIG['max_depth']): if not pending: break logger.info(f"Level {level + 1}: {len(pending)} folders") # Process all folders in this level by batches current_level_folders = pending.copy() next_level_folders = [] pending = [] # Reset for next level # Process current level in batches to handle large numbers of folders while current_level_folders: batch = current_level_folders[:TREE_CONFIG['max_folders_per_level']] current_level_folders = current_level_folders[TREE_CONFIG['max_folders_per_level']:] for folder_path in batch: try: subfolders = [f["name"] for f in list_folders(folder_path)] files = list_documents(folder_path) tree_nodes[folder_path] = [ {"name": name, "type": "folder", "children": []} for name in subfolders ] + [{"name": f["name"], "path": f["url"], "type": "file", **{k: v for k, v in f.items() if k not in ["name", "url"]}} for f in files] # Add subfolders to next level processing next_level_folders.extend([f"{folder_path}/{name}".strip('/') for name in subfolders]) except: logger.warning(f"Failed to process: {folder_path}") # Small delay between batches to avoid overwhelming SharePoint if current_level_folders: # Only delay if more batches remain time.sleep(0.1) # Set up for next level pending = next_level_folders if level < TREE_CONFIG['max_depth'] - 1: time.sleep(TREE_CONFIG['level_delay']) # Build nested structure def build_node(path: str) -> List[Dict]: children = tree_nodes.get(path, []) for child in children: if child["type"] == "folder": child["children"] = build_node(f"{path}/{child['name']}".strip('/')) return children return { "name": root.name, "path": root.properties.get("ServerRelativeUrl"), "type": "folder", "created": root.properties.get("TimeCreated").isoformat() if root.properties.get("TimeCreated") else None, "modified": root.properties.get("TimeLastModified").isoformat() if root.properties.get("TimeLastModified") else None, "children": build_node(parent_folder or "") } except Exception as e: logger.error(f"Failed to build tree for '{root_path}': {e}") return {"name": os.path.basename(root_path), "path": root_path, "type": "folder", "error": "Could not access folder", "children": []} def get_document_content(folder_name: str, file_name: str) -> dict: """Retrieve document content; supports PDF text extraction""" file_path = _get_sp_path(f"{folder_name}/{file_name}") file = sp_context.web.get_file_by_server_relative_url(file_path) sp_context.load(file, ["Exists", "Length", "Name"]) sp_context.execute_query() logger.info(f"File exists: {file.exists}, size: {file.length}") content = io.BytesIO() file.download(content) sp_context.execute_query() content_bytes = content.getvalue() # Determine file type and process accordingly lower_name = file_name.lower() file_type = next((t for t, exts in FILE_TYPES.items() if any(lower_name.endswith(ext) for ext in exts)), 'binary') if file_type == 'pdf': try: text, pages = extract_text_from_pdf(content_bytes) return {"name": file_name, "content_type": "text", "content": text, "original_type": "pdf", "page_count": pages, "size": len(content_bytes)} except Exception as e: logger.warning(f"PDF processing failed: {e}") return {"name": file_name, "content_type": "binary", "content_base64": base64.b64encode(content_bytes).decode(), "original_type": "pdf", "size": len(content_bytes)} if file_type == 'excel': try: text, sheets = extract_text_from_excel(content_bytes) return {"name": file_name, "content_type": "text", "content": text, "original_type": "excel", "sheet_count": sheets, "size": len(content_bytes)} except Exception as e: logger.warning(f"Excel processing failed: {e}") return {"name": file_name, "content_type": "binary", "content_base64": base64.b64encode(content_bytes).decode(), "original_type": "excel", "size": len(content_bytes)} if file_type == 'word': try: text, paragraphs = extract_text_from_word(content_bytes) return {"name": file_name, "content_type": "text", "content": text, "original_type": "word", "paragraph_count": paragraphs, "size": len(content_bytes)} except Exception as e: logger.warning(f"Word processing failed: {e}") return {"name": file_name, "content_type": "binary", "content_base64": base64.b64encode(content_bytes).decode(), "original_type": "word", "size": len(content_bytes)} if file_type == 'text': try: return {"name": file_name, "content_type": "text", "content": content_bytes.decode('utf-8'), "size": len(content_bytes)} except UnicodeDecodeError: pass return {"name": file_name, "content_type": "binary", "content_base64": base64.b64encode(content_bytes).decode(), "size": len(content_bytes)} def download_document(folder_name: str, file_name: str, local_path: str) -> Dict[str, Any]: """Download document from SharePoint to local filesystem with fallback support""" logger.info(f"Downloading {folder_name}/{file_name} to {local_path}") try: # Get file from SharePoint file_path = _get_sp_path(f"{folder_name}/{file_name}") file = sp_context.web.get_file_by_server_relative_url(file_path) sp_context.load(file, ["Exists", "Length", "Name"]) sp_context.execute_query() if not file.exists: return {"success": False, "error": f"File {file_name} does not exist in folder {folder_name}"} # Download file content content = io.BytesIO() file.download(content) sp_context.execute_query() content_bytes = content.getvalue() # Try to save to requested path first save_result = _save_content_to_file(content_bytes, local_path) if save_result["success"]: return { "success": True, "path": save_result["path"], "size": save_result["size"], "method": "primary" } # Fallback: save to fallback directory logger.warning(f"Primary path failed: {save_result['error']}, trying fallback") fallback_path = _get_fallback_path(file_name) fallback_result = _save_content_to_file(content_bytes, fallback_path) if fallback_result["success"]: return { "success": True, "path": fallback_result["path"], "size": fallback_result["size"], "method": "fallback", "primary_error": save_result["error"] } # Both paths failed return { "success": False, "error": f"Both primary and fallback paths failed", "primary_error": save_result["error"], "fallback_error": fallback_result["error"] } except Exception as e: logger.error(f"Download failed for {folder_name}/{file_name}: {e}") return {"success": False, "error": f"Download operation failed: {str(e)}"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Sofias-ai/mcp-sharepoint'

If you have feedback or need assistance with the MCP directory API, please join our Discord server