"""Utility functions for file handling."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from crewai_files.core.sources import is_file_source
if TYPE_CHECKING:
from crewai_files.core.sources import FileSource, FileSourceInput
from crewai_files.core.types import FileInput
__all__ = ["is_file_source", "normalize_input_files", "wrap_file_source"]
def wrap_file_source(source: FileSource) -> FileInput:
"""Wrap a FileSource in the appropriate typed FileInput wrapper.
Args:
source: The file source to wrap.
Returns:
Typed FileInput wrapper based on content type.
"""
from crewai_files.core.types import (
AudioFile,
ImageFile,
PDFFile,
TextFile,
VideoFile,
)
content_type = source.content_type
if content_type.startswith("image/"):
return ImageFile(source=source)
if content_type.startswith("audio/"):
return AudioFile(source=source)
if content_type.startswith("video/"):
return VideoFile(source=source)
if content_type == "application/pdf":
return PDFFile(source=source)
return TextFile(source=source)
def normalize_input_files(
input_files: list[FileSourceInput | FileInput],
) -> dict[str, FileInput]:
"""Convert a list of file sources to a named dictionary of FileInputs.
Args:
input_files: List of file source inputs or File objects.
Returns:
Dictionary mapping names to FileInput wrappers.
"""
from crewai_files.core.sources import FileBytes, FilePath, FileStream, FileUrl
from crewai_files.core.types import BaseFile
result: dict[str, FileInput] = {}
for i, item in enumerate(input_files):
if isinstance(item, BaseFile):
name = item.filename or f"file_{i}"
if "." in name:
name = name.rsplit(".", 1)[0]
result[name] = item
continue
file_source: FilePath | FileBytes | FileStream | FileUrl
if isinstance(item, (FilePath, FileBytes, FileStream, FileUrl)):
file_source = item
elif isinstance(item, Path):
file_source = FilePath(path=item)
elif isinstance(item, str):
if item.startswith(("http://", "https://")):
file_source = FileUrl(url=item)
else:
file_source = FilePath(path=Path(item))
elif isinstance(item, (bytes, memoryview)):
file_source = FileBytes(data=bytes(item))
else:
continue
name = file_source.filename or f"file_{i}"
result[name] = wrap_file_source(file_source)
return result