Unstructured Document Processor MCP

import os from dotenv import load_dotenv import boto3 import time import json from botocore.exceptions import ClientError import logging from unstructured_client import UnstructuredClient from unstructured_client.models.shared import ( S3SourceConnectorConfigInput, CreateSourceConnector, SourceConnectorType) from unstructured_client.models.operations import (CreateSourceRequest, CreateDestinationRequest, DeleteWorkflowRequest, DeleteSourceRequest, DeleteDestinationRequest) from unstructured_client.models.shared import ( S3DestinationConnectorConfigInput, CreateDestinationConnector, DestinationConnectorType) from unstructured_client.models.shared import ( WorkflowNode, WorkflowNodeType, WorkflowType ) from datetime import datetime from mcp.server.fastmcp import FastMCP mcp = FastMCP("unstructured_doc_processor") def load_environment_variables() -> None: """ Load environment variables from .env file. Raises an error if critical environment variables are missing. """ load_dotenv() required_vars = [ "AWS_S3_SOURCE_BUCKET", "AWS_S3_DESTINATION_BUCKET", "AWS_KEY", "AWS_SECRET", "UNSTRUCTURED_API_KEY" ] for var in required_vars: if not os.getenv(var): raise ValueError(f"Missing required environment variable: {var}") def upload_to_s3(file_path, bucket_name, aws_access_key, aws_secret_key, region_name="us-east-2", object_name=None): """ Upload a file to an S3 bucket Parameters: file_path (str): Path to the file to upload bucket_name (str): Name of the bucket to upload to object_name (str): S3 object name. If not specified, file_name from file_path will be used """ # If S3 object_name was not specified, use file_name from file_path if object_name is None: object_name = os.path.basename(file_path) s3_client = boto3.client( "s3", aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=region_name ) try: s3_client.upload_file(file_path, bucket_name, object_name) print(f"Successfully uploaded {file_path} to {bucket_name}/{object_name}") except ClientError as e: logging.error(e) print(f"Error uploading file: {e}") def download_s3_file(bucket_name, file_name, local_dir, aws_access_key, aws_secret_key, region_name="us-east-2"): """ Downloads a specific file from an S3 bucket to a local directory. :param bucket_name: Name of the S3 bucket. :param file_name: Name of the original file to download JSON for from S3. :param local_dir: Local directory where the file should be saved. :param aws_access_key: (Optional) AWS Access Key ID. :param aws_secret_key: (Optional) AWS Secret Access Key. :param region_name: AWS region where the bucket is located. :return: Path to the downloaded file if successful, None otherwise """ file_key = f"{file_name}.json" if aws_access_key and aws_secret_key: s3_client = boto3.client( "s3", aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=region_name ) else: s3_client = boto3.client("s3", region_name=region_name) try: # Ensure the local directory exists os.makedirs(local_dir, exist_ok=True) # Set the local file path local_file_path = os.path.join(local_dir, os.path.basename(file_key)) # Create subdirectories if necessary os.makedirs(os.path.dirname(local_file_path), exist_ok=True) print(f"Downloading {file_key} from bucket {bucket_name} to {local_file_path}...") s3_client.download_file(bucket_name, file_key, local_file_path) print("Download complete.") return local_file_path except Exception as e: print(f"Error downloading file {file_key}: {str(e)}") return None def get_unstructured_client(unstructured_api_key): return UnstructuredClient(api_key_auth=unstructured_api_key) def create_s3_source_connector(unstructured_client, s3_bucket_name, aws_key, aws_secret): """ Creates an S3 source connector in Unstructured platform and returns a connector's id """ source_connector_config = S3SourceConnectorConfigInput( remote_url=f"s3://{s3_bucket_name}", key=aws_key, secret=aws_secret, ) unique_source_connector_suffix = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") response = unstructured_client.sources.create_source( request=CreateSourceRequest( create_source_connector=CreateSourceConnector( name=f"s3-source-{unique_source_connector_suffix}", type=SourceConnectorType.S3, config=source_connector_config, ) ) ) return response.source_connector_information.id def create_s3_destination_connector(unstructured_client, s3_bucket_name, aws_key, aws_secret): """ Creates an S3 source connector in Unstructured platform and returns a connector's id """ destination_connector_config = S3DestinationConnectorConfigInput( remote_url=f"s3://{s3_bucket_name}", key=aws_key, secret=aws_secret, ) unique_destination_connector_suffix = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") response = unstructured_client.destinations.create_destination( request=CreateDestinationRequest( create_destination_connector=CreateDestinationConnector( name=f"s3-destination-{unique_destination_connector_suffix}", type=DestinationConnectorType.S3, config=destination_connector_config, ) ) ) return response.destination_connector_information.id def create_auto_workflow(unstructured_client, source_connector_id, destination_connector_id): """ Creates a workflow from a source to destination in Unstructured platform, returns workflow id """ # Partition the content by using a vision language model (VLM). parition_node = WorkflowNode( name="Partitioner", subtype="vlm", type=WorkflowNodeType.PARTITION, settings={ "provider": "anthropic", "provider_api_key": None, "model": "claude-3-5-sonnet-20241022", "output_format": "text/html", "user_prompt": None, "format_html": True, "unique_element_ids": True, "is_dynamic": True, "allow_fast": True } ) # Summarize each detected image. image_summarizer_node = WorkflowNode( name="Image summarizer", subtype="openai_image_description", type=WorkflowNodeType.PROMPTER, settings={} ) # Summarize each detected table. table_summarizer_node = WorkflowNode( name="Table summarizer", subtype="anthropic_table_description", type=WorkflowNodeType.PROMPTER, settings={} ) unique_workflow_suffix = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") response = unstructured_client.workflows.create_workflow( request={ "create_workflow": { "name": f"s3-to-s3-custom-workflow-{unique_workflow_suffix}", "source_id": source_connector_id, "destination_id": destination_connector_id, "workflow_type": WorkflowType.CUSTOM, "workflow_nodes": [parition_node, image_summarizer_node, table_summarizer_node] } } ) return response.workflow_information.id def run_workflow(unstructured_client, workflow_id): res = unstructured_client.workflows.run_workflow( request={ "workflow_id": workflow_id, } ) print(res.job_information) def get_latest_job_id(unstructured_client, workflow_id): response = unstructured_client.jobs.list_jobs( request={ "workflow_id": workflow_id } ) last_job = response.response_list_jobs[0] return last_job.id def poll_job_status(unstructured_client, job_id): while True: response = unstructured_client.jobs.get_job( request={ "job_id": job_id } ) job = response.job_information if job.status == "SCHEDULED": print("Job is scheduled, polling again in 10 seconds...") time.sleep(10) elif job.status == "IN_PROGRESS": print("Job is in progress, polling again in 10 seconds...") time.sleep(10) else: print("Job is completed") break return job def delete_workflow(unstructured_client, workflow_id): response = unstructured_client.workflows.delete_workflow( request=DeleteWorkflowRequest( workflow_id=workflow_id )) print(response.raw_response) def delete_source_connector(unstructured_client, source_connector_id): response = unstructured_client.sources.delete_source( request=DeleteSourceRequest( source_id=source_connector_id )) print(response.raw_response) def delete_destination_connector(unstructured_client, destination_connector_id): response = unstructured_client.destinations.delete_destination( request=DeleteDestinationRequest( destination_id=destination_connector_id ) ) print(response.raw_response) def empty_s3_bucket(bucket_name, aws_access_key, aws_secret_key, region_name="us-east-2"): """ Deletes all files from an S3 bucket and returns a list of deleted files. """ if aws_access_key and aws_secret_key: s3_client = boto3.client( "s3", aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=region_name ) else: s3_client = boto3.client("s3", region_name=region_name) deleted_files = [] # List objects in the bucket response = s3_client.list_objects_v2(Bucket=bucket_name) if "Contents" not in response: print(f"No files found in bucket: {bucket_name}") return deleted_files # Delete each file for obj in response["Contents"]: file_key = obj["Key"] print(f"Deleting {file_key}...") s3_client.delete_object(Bucket=bucket_name, Key=file_key) deleted_files.append(file_key) # Handle pagination if there are more than 1000 objects while response.get('IsTruncated', False): response = s3_client.list_objects_v2( Bucket=bucket_name, ContinuationToken=response['NextContinuationToken'] ) if "Contents" in response: for obj in response["Contents"]: file_key = obj["Key"] print(f"Deleting {file_key}...") s3_client.delete_object(Bucket=bucket_name, Key=file_key) deleted_files.append(file_key) return deleted_files def json_to_text(file_path) -> str: with open(file_path, 'r') as file: elements = json.load(file) doc_texts = [] for element in elements: text = element.get("text", "").strip() element_type = element.get("type", "") metadata = element.get("metadata", {}) if element_type == "Title": doc_texts.append(f"<h1> {text}</h1><br>") elif element_type == "Header": doc_texts.append(f"<h2> {text}</h2><br/>") elif element_type == "NarrativeText" or element_type == "UncategorizedText": doc_texts.append(f"<p>{text}</p>") elif element_type == "ListItem": doc_texts.append(f"<li>{text}</li>") elif element_type == "PageNumber": doc_texts.append(f"Page number: {text}") elif element_type == "Table": table_html = metadata.get("text_as_html", "") doc_texts.append(table_html) # Keep the table as HTML else: doc_texts.append(text) return " ".join(doc_texts) @mcp.tool() async def get_processed_doc(filepath: str) -> str: """Get the text from the given document. Args: filepath: local file path to the document """ # Check is file exists in given path if not os.path.isfile(filepath): return "File does not exist" # Check is file extension is supported _, ext = os.path.splitext(filepath) supported_extensions = {".abw", ".bmp", ".csv", ".cwk", ".dbf", ".dif", ".doc", ".docm", ".docx", ".dot", ".dotm", ".eml", ".epub", ".et", ".eth", ".fods", ".gif", ".heic", ".htm", ".html", ".hwp", ".jpeg", ".jpg", ".md", ".mcw", ".mw", ".odt", ".org", ".p7s", ".pages", ".pbd", ".pdf", ".png", ".pot", ".potm", ".ppt", ".pptm", ".pptx", ".prn", ".rst", ".rtf", ".sdp", ".sgl", ".svg", ".sxg", ".tiff", ".txt", ".tsv", ".uof", ".uos1", ".uos2", ".web", ".webp", ".wk2", ".xls", ".xlsb", ".xlsm", ".xlsx", ".xlw", ".xml", ".zabw"} if ext.lower() not in supported_extensions: return "File extension not supported by Unstructured Platform" upload_to_s3(filepath, os.getenv("AWS_S3_SOURCE_BUCKET"), os.getenv("AWS_KEY"), os.getenv("AWS_SECRET")) unstructured_client = get_unstructured_client(os.getenv("UNSTRUCTURED_API_KEY")) source_connector_id = create_s3_source_connector(unstructured_client, os.getenv("AWS_S3_SOURCE_BUCKET"), os.getenv("AWS_KEY"), os.getenv("AWS_SECRET")) destination_connector_id = create_s3_destination_connector(unstructured_client, os.getenv("AWS_S3_DESTINATION_BUCKET"), os.getenv("AWS_KEY"), os.getenv("AWS_SECRET")) workflow_id = create_auto_workflow(unstructured_client, source_connector_id, destination_connector_id) run_workflow(unstructured_client, workflow_id) job_id = get_latest_job_id(unstructured_client, workflow_id) job = poll_job_status(unstructured_client, job_id) # At this point the job is complete # TODO: add a check to see if the job is successful print(f"Unstructured Platform Completed Processing Job: {job_id}") # download the file from the destination bucket local_dir = "processed_files" file_basename = os.path.basename(filepath) download_s3_file(os.getenv("AWS_S3_DESTINATION_BUCKET"), file_basename, local_dir, os.getenv("AWS_KEY"), os.getenv("AWS_SECRET")) # Cleanup: delete_workflow(unstructured_client, workflow_id) delete_source_connector(unstructured_client, source_connector_id) delete_destination_connector(unstructured_client, destination_connector_id) deleted_source_files = empty_s3_bucket(os.getenv("AWS_S3_SOURCE_BUCKET"), os.getenv("AWS_KEY"), os.getenv("AWS_SECRET")) print("Files deleted from the source S3 bucket: ", deleted_source_files) deleted_processed_files = empty_s3_bucket(os.getenv("AWS_S3_DESTINATION_BUCKET"), os.getenv("AWS_KEY"), os.getenv("AWS_SECRET")) print("Files deleted from the output S3 bucket: ", deleted_processed_files) output_json_file_path = os.path.join(local_dir, f"{file_basename}.json") document_text = json_to_text(output_json_file_path) return document_text if __name__ == "__main__": load_environment_variables() # Initialize and run the server mcp.run(transport='stdio')