import base64
import io
import os
import time
from pathlib import Path
from typing import Any
import fitz
import pandas as pd
from docx import Document
from .common import SHP_DOC_LIBRARY, logger, sp_context
# Configuration
FILE_TYPES = {
"text": [".txt", ".csv", ".json", ".xml", ".html", ".md", ".js", ".css", ".py"],
"pdf": [".pdf"],
"excel": [".xlsx", ".xls"],
"word": [".docx", ".doc"],
}
# Tree configuration from environment variables with defaults
TREE_CONFIG = {
"max_depth": int(os.getenv("SHP_MAX_DEPTH", "15")),
"max_folders_per_level": int(os.getenv("SHP_MAX_FOLDERS_PER_LEVEL", "100")),
"level_delay": float(os.getenv("SHP_LEVEL_DELAY", "0.5")),
"batch_delay": float(os.getenv("SHP_BATCH_DELAY", "0.1")),
}
# Download configuration
DOWNLOAD_CONFIG = {"fallback_dir": "./downloads"}
def _get_sp_path(sub_path: str | None = None) -> str:
"""Create a properly formatted SharePoint path"""
return f"{SHP_DOC_LIBRARY}/{sub_path or ''}".rstrip("/")
def _ensure_directory_exists(directory: str | Path) -> bool:
"""Ensure target directory exists, create if necessary"""
try:
Path(directory).mkdir(parents=True, exist_ok=True)
return True
except Exception as e:
logger.error(f"Failed to create directory {directory}: {e}")
return False
def _get_fallback_path(file_name: str) -> str:
"""Generate fallback path for downloads"""
fallback_dir = Path(DOWNLOAD_CONFIG["fallback_dir"])
_ensure_directory_exists(fallback_dir)
return str(fallback_dir / file_name)
def _save_content_to_file(content_bytes: bytes, file_path: str) -> dict[str, Any]:
"""Save binary content to local file with error handling"""
try:
path = Path(file_path)
# Ensure directory exists
if path.parent and not _ensure_directory_exists(path.parent):
raise Exception(f"Cannot create directory: {path.parent}")
# Write file
path.write_bytes(content_bytes)
# Verify file was created
if path.exists() and path.stat().st_size == len(content_bytes):
return {"success": True, "path": str(path.resolve()), "size": len(content_bytes)}
else:
raise Exception("File verification failed")
except Exception as e:
logger.error(f"Failed to save file to {file_path}: {e}")
return {"success": False, "error": str(e)}
def _load_sp_items(path: str, item_type: str) -> list[dict[str, Any]]:
"""Generic function to load folders or files from SharePoint"""
folder = sp_context.web.get_folder_by_server_relative_url(path)
items = getattr(folder, item_type)
props = ["ServerRelativeUrl", "Name", "TimeCreated", "TimeLastModified"] + (
["Length"] if item_type == "files" else []
)
sp_context.load(items, props)
sp_context.execute_query()
return [
{
"name": item.name,
"url": item.properties.get("ServerRelativeUrl"),
**({"size": item.properties.get("Length")} if item_type == "files" else {}),
"created": item.properties.get("TimeCreated").isoformat()
if item.properties.get("TimeCreated")
else None,
"modified": item.properties.get("TimeLastModified").isoformat()
if item.properties.get("TimeLastModified")
else None,
}
for item in items
]
def list_folders(parent_folder: str | None = None) -> list[dict[str, Any]]:
"""List folders in the specified directory or root if not specified"""
logger.info(f"Listing folders in {parent_folder or 'root directory'}")
return _load_sp_items(_get_sp_path(parent_folder), "folders")
def list_documents(folder_name: str) -> list[dict[str, Any]]:
"""List all documents in a specified folder"""
logger.info(f"Listing documents in folder: {folder_name}")
return _load_sp_items(_get_sp_path(folder_name), "files")
def extract_text_from_pdf(pdf_content):
"""Extract text from PDF using PyMuPDF"""
try:
pdf_document = fitz.open(stream=pdf_content, filetype="pdf")
text_content = "".join(pdf_document[i].get_text() + "\n" for i in range(len(pdf_document)))
page_count = len(pdf_document)
pdf_document.close()
return text_content.strip(), page_count
except Exception as e:
logger.error(f"Error extracting text from PDF: {e}")
raise
def extract_text_from_excel(content_bytes):
"""Extract text from Excel files"""
try:
sheets = pd.read_excel(io.BytesIO(content_bytes), sheet_name=None)
text_parts = []
for sheet_name, df in sheets.items():
text_parts.append(f"=== {sheet_name} ===")
text_parts.extend(df.head(50).fillna("").astype(str).apply(" | ".join, axis=1).tolist())
return "\n".join(text_parts), len(sheets)
except Exception as e:
logger.error(f"Error extracting text from Excel: {e}")
raise
def extract_text_from_word(content_bytes):
"""Extract text from Word documents"""
try:
doc = Document(io.BytesIO(content_bytes))
text_parts = [p.text for p in doc.paragraphs if p.text.strip()]
for table in doc.tables:
for row in table.rows:
text_parts.append(" | ".join(cell.text.strip() for cell in row.cells))
return "\n".join(text_parts), len(doc.paragraphs)
except Exception as e:
logger.error(f"Error extracting text from Word: {e}")
raise
def get_folder_tree(parent_folder: str | None = None) -> dict[str, Any]:
"""Iteratively build folder tree level by level to avoid recursion limits"""
root_path, tree_nodes = _get_sp_path(parent_folder), {}
logger.info(f"Building iterative tree for {parent_folder or 'root'}")
try:
# Get root folder
root = sp_context.web.get_folder_by_server_relative_url(root_path)
sp_context.load(root, ["Name", "ServerRelativeUrl", "TimeCreated", "TimeLastModified"])
sp_context.execute_query()
# Process folders level by level
pending = [parent_folder or ""]
max_depth = int(TREE_CONFIG["max_depth"])
for level in range(max_depth):
if not pending:
break
logger.info(f"Level {level + 1}: {len(pending)} folders")
# Process all folders in this level by batches
current_level_folders = pending.copy()
next_level_folders = []
pending = [] # Reset for next level
# Process current level in batches to handle large numbers of folders
while current_level_folders:
batch = current_level_folders[: TREE_CONFIG["max_folders_per_level"]]
current_level_folders = current_level_folders[
TREE_CONFIG["max_folders_per_level"] :
]
for folder_path in batch:
try:
subfolders = [f["name"] for f in list_folders(folder_path)]
files = list_documents(folder_path)
tree_nodes[folder_path] = [
{"name": name, "type": "folder", "children": []} for name in subfolders
] + [
{
"name": f["name"],
"path": f["url"],
"type": "file",
**{k: v for k, v in f.items() if k not in ["name", "url"]},
}
for f in files
]
# Add subfolders to next level processing
next_level_folders.extend(
[f"{folder_path}/{name}".strip("/") for name in subfolders]
)
except Exception as e:
logger.warning(f"Failed to process: {folder_path}: {e}")
# Small delay between batches to avoid overwhelming SharePoint
if current_level_folders: # Only delay if more batches remain
time.sleep(TREE_CONFIG["batch_delay"])
# Set up for next level
pending = next_level_folders
if level < max_depth - 1:
time.sleep(TREE_CONFIG["level_delay"])
# Build nested structure
def build_node(path: str) -> list[dict]:
children = tree_nodes.get(path, [])
for child in children:
if child["type"] == "folder":
child["children"] = build_node(f"{path}/{child['name']}".strip("/"))
return children
return {
"name": root.name,
"path": root.properties.get("ServerRelativeUrl"),
"type": "folder",
"created": root.properties.get("TimeCreated").isoformat()
if root.properties.get("TimeCreated")
else None,
"modified": root.properties.get("TimeLastModified").isoformat()
if root.properties.get("TimeLastModified")
else None,
"children": build_node(parent_folder or ""),
}
except Exception as e:
logger.error(f"Failed to build tree for '{root_path}': {e}")
return {
"name": Path(root_path).name,
"path": root_path,
"type": "folder",
"error": "Could not access folder",
"children": [],
}
def get_document_content(folder_name: str, file_name: str) -> dict:
"""Retrieve document content; supports PDF text extraction"""
file_path = _get_sp_path(f"{folder_name}/{file_name}")
file = sp_context.web.get_file_by_server_relative_url(file_path)
sp_context.load(file, ["Exists", "Length", "Name"])
sp_context.execute_query()
logger.info(f"File exists: {file.exists}, size: {file.length}")
content = io.BytesIO()
file.download(content)
sp_context.execute_query()
content_bytes = content.getvalue()
# Determine file type and process accordingly
lower_name = file_name.lower()
file_type = next(
(t for t, exts in FILE_TYPES.items() if any(lower_name.endswith(ext) for ext in exts)),
"binary",
)
if file_type == "pdf":
try:
text, pages = extract_text_from_pdf(content_bytes)
return {
"name": file_name,
"content_type": "text",
"content": text,
"original_type": "pdf",
"page_count": pages,
"size": len(content_bytes),
}
except Exception as e:
logger.warning(f"PDF processing failed: {e}")
return {
"name": file_name,
"content_type": "binary",
"content_base64": base64.b64encode(content_bytes).decode(),
"original_type": "pdf",
"size": len(content_bytes),
}
if file_type == "excel":
try:
text, sheets = extract_text_from_excel(content_bytes)
return {
"name": file_name,
"content_type": "text",
"content": text,
"original_type": "excel",
"sheet_count": sheets,
"size": len(content_bytes),
}
except Exception as e:
logger.warning(f"Excel processing failed: {e}")
return {
"name": file_name,
"content_type": "binary",
"content_base64": base64.b64encode(content_bytes).decode(),
"original_type": "excel",
"size": len(content_bytes),
}
if file_type == "word":
try:
text, paragraphs = extract_text_from_word(content_bytes)
return {
"name": file_name,
"content_type": "text",
"content": text,
"original_type": "word",
"paragraph_count": paragraphs,
"size": len(content_bytes),
}
except Exception as e:
logger.warning(f"Word processing failed: {e}")
return {
"name": file_name,
"content_type": "binary",
"content_base64": base64.b64encode(content_bytes).decode(),
"original_type": "word",
"size": len(content_bytes),
}
if file_type == "text":
try:
return {
"name": file_name,
"content_type": "text",
"content": content_bytes.decode("utf-8"),
"size": len(content_bytes),
}
except UnicodeDecodeError:
pass
return {
"name": file_name,
"content_type": "binary",
"content_base64": base64.b64encode(content_bytes).decode(),
"size": len(content_bytes),
}
def download_document(folder_name: str, file_name: str, local_path: str) -> dict[str, Any]:
"""Download document from SharePoint to local filesystem with fallback support"""
logger.info(f"Downloading {folder_name}/{file_name} to {local_path}")
try:
# Get file from SharePoint
file_path = _get_sp_path(f"{folder_name}/{file_name}")
file = sp_context.web.get_file_by_server_relative_url(file_path)
sp_context.load(file, ["Exists", "Length", "Name"])
sp_context.execute_query()
if not file.exists:
return {
"success": False,
"error": f"File {file_name} does not exist in folder {folder_name}",
}
# Download file content
content = io.BytesIO()
file.download(content)
sp_context.execute_query()
content_bytes = content.getvalue()
# Try to save to requested path first
save_result = _save_content_to_file(content_bytes, local_path)
if save_result["success"]:
return {
"success": True,
"path": save_result["path"],
"size": save_result["size"],
"method": "primary",
}
# Fallback: save to fallback directory
logger.warning(f"Primary path failed: {save_result['error']}, trying fallback")
fallback_path = _get_fallback_path(file_name)
fallback_result = _save_content_to_file(content_bytes, fallback_path)
if fallback_result["success"]:
return {
"success": True,
"path": fallback_result["path"],
"size": fallback_result["size"],
"method": "fallback",
"primary_error": save_result["error"],
}
# Both paths failed
return {
"success": False,
"error": "Both primary and fallback paths failed",
"primary_error": save_result["error"],
"fallback_error": fallback_result["error"],
}
except Exception as e:
logger.error(f"Download failed for {folder_name}/{file_name}: {e}")
return {"success": False, "error": f"Download operation failed: {e!s}"}