Unstructured Document Processor MCP
by MKhalusova
import os
from dotenv import load_dotenv
import json
from unstructured_client import UnstructuredClient
from typing import AsyncIterator
from dataclasses import dataclass
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP, Context
from unstructured_client.models import operations, shared
@dataclass
class AppContext:
client: UnstructuredClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""Manage Unstructured API client lifecycle"""
api_key = os.getenv("UNSTRUCTURED_API_KEY")
if not api_key:
raise ValueError("UNSTRUCTURED_API_KEY environment variable is required")
client = UnstructuredClient(api_key_auth=api_key)
try:
yield AppContext(client=client)
finally:
# No cleanup needed for the API client
pass
# Create MCP server instance
mcp = FastMCP("unstructured-mcp", lifespan=app_lifespan, dependencies=["unstructured-client", "python-dotenv"])
# Local directory to store processed files
PROCESSED_FILES_FOLDER = "processed_files"
def load_environment_variables() -> None:
"""
Load environment variables from .env file.
Raises an error if critical environment variables are missing.
"""
load_dotenv()
required_vars = [
"UNSTRUCTURED_API_KEY"
]
for var in required_vars:
if not os.getenv(var):
raise ValueError(f"Missing required environment variable: {var}")
def json_to_text(file_path) -> str:
with open(file_path, 'r') as file:
elements = json.load(file)
doc_texts = []
for element in elements:
text = element.get("text", "").strip()
element_type = element.get("type", "")
metadata = element.get("metadata", {})
if element_type == "Title":
doc_texts.append(f"<h1> {text}</h1><br>")
elif element_type == "Header":
doc_texts.append(f"<h2> {text}</h2><br/>")
elif element_type == "NarrativeText" or element_type == "UncategorizedText":
doc_texts.append(f"<p>{text}</p>")
elif element_type == "ListItem":
doc_texts.append(f"<li>{text}</li>")
elif element_type == "PageNumber":
doc_texts.append(f"Page number: {text}")
elif element_type == "Table":
table_html = metadata.get("text_as_html", "")
doc_texts.append(table_html) # Keep the table as HTML
else:
doc_texts.append(text)
return " ".join(doc_texts)
@mcp.tool()
async def process_document(ctx: Context, filepath: str) -> str:
"""
Sends document to process with Unstructured, return the content of the document
Args:
filepath: path to the document
"""
if not os.path.isfile(filepath):
return "File does not exist"
# Check is file extension is supported
_, ext = os.path.splitext(filepath)
supported_extensions = {".abw", ".bmp", ".csv", ".cwk", ".dbf", ".dif", ".doc", ".docm", ".docx", ".dot",
".dotm", ".eml", ".epub", ".et", ".eth", ".fods", ".gif", ".heic", ".htm", ".html",
".hwp", ".jpeg", ".jpg", ".md", ".mcw", ".mw", ".odt", ".org", ".p7s", ".pages",
".pbd", ".pdf", ".png", ".pot", ".potm", ".ppt", ".pptm", ".pptx", ".prn", ".rst",
".rtf", ".sdp", ".sgl", ".svg", ".sxg", ".tiff", ".txt", ".tsv", ".uof", ".uos1",
".uos2", ".web", ".webp", ".wk2", ".xls", ".xlsb", ".xlsm", ".xlsx", ".xlw", ".xml",
".zabw"}
if ext.lower() not in supported_extensions:
return "File extension not supported by Unstructured"
client = ctx.request_context.lifespan_context.client
file_basename = os.path.basename(filepath)
req = operations.PartitionRequest(
partition_parameters=shared.PartitionParameters(
files=shared.Files(
content=open(filepath, "rb"),
file_name=filepath,
),
strategy=shared.Strategy.AUTO,
),
)
os.makedirs(PROCESSED_FILES_FOLDER, exist_ok=True)
try:
res = client.general.partition(request=req)
element_dicts = [element for element in res.elements]
json_elements = json.dumps(element_dicts, indent=2)
output_json_file_path = os.path.join(PROCESSED_FILES_FOLDER, f"{file_basename}.json")
with open(output_json_file_path, "w") as file:
file.write(json_elements)
return json_to_text(output_json_file_path)
except Exception as e:
return f"The following exception happened during file processing: {e}"
if __name__ == "__main__":
load_environment_variables()
# Initialize and run the server
mcp.run(transport='stdio')