"""WebDAV client for Nextcloud file operations."""
import logging
import mimetypes
import xml.etree.ElementTree as ET
from email.utils import parsedate_to_datetime
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import unquote
from httpx import HTTPStatusError
from .base import BaseNextcloudClient
logger = logging.getLogger(__name__)
class WebDAVClient(BaseNextcloudClient):
"""Client for Nextcloud WebDAV operations."""
app_name = "webdav"
async def delete_resource(self, path: str) -> Dict[str, Any]:
"""Delete a resource (file or directory) via WebDAV DELETE."""
# Ensure path ends with a slash if it's a directory
if not path.endswith("/"):
path_with_slash = f"{path}/"
else:
path_with_slash = path
webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}"
logger.debug(f"Deleting WebDAV resource: {webdav_path}")
headers = {"OCS-APIRequest": "true"}
try:
# First try a PROPFIND to verify resource exists
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
try:
propfind_resp = await self._make_request(
"PROPFIND", webdav_path, headers=propfind_headers
)
logger.debug(
f"Resource exists check status: {propfind_resp.status_code}"
)
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"Resource '{path}' doesn't exist, no deletion needed")
return {"status_code": 404}
# For other errors, continue with deletion attempt
# Proceed with deletion
response = await self._make_request("DELETE", webdav_path, headers=headers)
logger.debug(f"Successfully deleted WebDAV resource '{path}'")
return {"status_code": response.status_code}
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"Resource '{path}' not found, no deletion needed")
return {"status_code": 404}
else:
logger.error(f"HTTP error deleting WebDAV resource '{path}': {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error deleting WebDAV resource '{path}': {e}")
raise e
async def cleanup_old_attachment_directory(
self, note_id: int, old_category: str
) -> Dict[str, Any]:
"""Clean up the attachment directory for a note in its old category location."""
old_category_path_part = f"{old_category}/" if old_category else ""
old_attachment_dir_path = (
f"Notes/{old_category_path_part}.attachments.{note_id}/"
)
logger.debug(f"Cleaning up old attachment directory: {old_attachment_dir_path}")
try:
delete_result = await self.delete_resource(path=old_attachment_dir_path)
logger.debug(f"Cleanup result: {delete_result}")
return delete_result
except Exception as e:
logger.error(f"Error during cleanup of old attachment directory: {e}")
raise e
async def cleanup_note_attachments(
self, note_id: int, category: str
) -> Dict[str, Any]:
"""Clean up attachment directory for a specific note and category."""
cat_path_part = f"{category}/" if category else ""
attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/"
logger.debug(
f"Cleaning up attachments for note {note_id} in category '{category}'"
)
try:
delete_result = await self.delete_resource(path=attachment_dir_path)
logger.debug(f"Cleanup result for note {note_id}: {delete_result}")
return delete_result
except Exception as e:
logger.error(f"Failed cleaning up attachments for note {note_id}: {e}")
raise e
async def add_note_attachment(
self,
note_id: int,
filename: str,
content: bytes,
category: Optional[str] = None,
mime_type: Optional[str] = None,
) -> Dict[str, Any]:
"""Add/Update an attachment to a note via WebDAV PUT."""
# Construct paths based on provided category
webdav_base = self._get_webdav_base_path()
category_path_part = f"{category}/" if category else ""
attachment_dir_segment = f".attachments.{note_id}"
parent_dir_webdav_rel_path = (
f"Notes/{category_path_part}{attachment_dir_segment}"
)
parent_dir_path = f"{webdav_base}/{parent_dir_webdav_rel_path}"
attachment_path = f"{parent_dir_path}/{filename}"
logger.debug(f"Uploading attachment '{filename}' for note {note_id}")
if not mime_type:
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = "application/octet-stream"
headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"}
try:
# First check if we can access WebDAV at all
notes_dir_path = f"{webdav_base}/Notes"
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
notes_dir_response = await self._make_request(
"PROPFIND", notes_dir_path, headers=propfind_headers
)
if notes_dir_response.status_code == 401:
logger.error("WebDAV authentication failed for Notes directory")
raise HTTPStatusError(
f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}",
request=notes_dir_response.request,
response=notes_dir_response,
)
elif notes_dir_response.status_code >= 400:
logger.error(
f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}"
)
notes_dir_response.raise_for_status()
# Ensure the parent directory exists using MKCOL
mkcol_headers = {"OCS-APIRequest": "true"}
mkcol_response = await self._make_request(
"MKCOL", parent_dir_path, headers=mkcol_headers
)
# MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists)
if mkcol_response.status_code not in [201, 405]:
logger.error(
f"Unexpected status code {mkcol_response.status_code} when creating attachments directory"
)
mkcol_response.raise_for_status()
# Proceed with the PUT request
response = await self._make_request(
"PUT", attachment_path, content=content, headers=headers
)
response.raise_for_status()
logger.debug(
f"Successfully uploaded attachment '{filename}' to note {note_id}"
)
return {"status_code": response.status_code}
except HTTPStatusError as e:
logger.error(
f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}"
)
raise e
except Exception as e:
logger.error(
f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}"
)
raise e
async def get_note_attachment(
self, note_id: int, filename: str, category: Optional[str] = None
) -> Tuple[bytes, str]:
"""Fetch a specific attachment from a note via WebDAV GET."""
webdav_base = self._get_webdav_base_path()
category_path_part = f"{category}/" if category else ""
attachment_dir_segment = f".attachments.{note_id}"
attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}"
logger.debug(f"Fetching attachment '{filename}' for note {note_id}")
try:
response = await self._make_request("GET", attachment_path)
response.raise_for_status()
content = response.content
mime_type = response.headers.get("content-type", "application/octet-stream")
logger.debug(
f"Successfully fetched attachment '{filename}' ({len(content)} bytes)"
)
return content, mime_type
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"Attachment '{filename}' not found for note {note_id}")
else:
logger.error(
f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}"
)
raise e
except Exception as e:
logger.error(
f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}"
)
raise e
async def list_directory(self, path: str = "") -> List[Dict[str, Any]]:
"""List files and directories in the specified path via WebDAV PROPFIND."""
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
if not webdav_path.endswith("/"):
webdav_path += "/"
logger.debug(f"Listing directory: {path}")
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:">
<d:prop>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:resourcetype/>
</d:prop>
</d:propfind>"""
headers = {"Depth": "1", "Content-Type": "text/xml", "OCS-APIRequest": "true"}
try:
response = await self._make_request(
"PROPFIND", webdav_path, content=propfind_body, headers=headers
)
response.raise_for_status()
# Parse the XML response
root = ET.fromstring(response.content)
items = []
# Skip the first response (the directory itself)
responses = root.findall(".//{DAV:}response")[1:]
for response_elem in responses:
href = response_elem.find(".//{DAV:}href")
if href is None:
continue
# Extract file/directory name from href
href_text = href.text or ""
name = href_text.rstrip("/").split("/")[-1]
if not name:
continue
# Get properties
propstat = response_elem.find(".//{DAV:}propstat")
if propstat is None:
continue
prop = propstat.find(".//{DAV:}prop")
if prop is None:
continue
# Determine if it's a directory
resourcetype = prop.find(".//{DAV:}resourcetype")
is_directory = (
resourcetype is not None
and resourcetype.find(".//{DAV:}collection") is not None
)
# Get other properties
size_elem = prop.find(".//{DAV:}getcontentlength")
size = (
int(size_elem.text)
if size_elem is not None and size_elem.text
else 0
)
content_type_elem = prop.find(".//{DAV:}getcontenttype")
content_type = (
content_type_elem.text if content_type_elem is not None else None
)
modified_elem = prop.find(".//{DAV:}getlastmodified")
modified = modified_elem.text if modified_elem is not None else None
items.append(
{
"name": name,
"path": f"{path.rstrip('/')}/{name}" if path else name,
"is_directory": is_directory,
"size": size if not is_directory else None,
"content_type": content_type,
"last_modified": modified,
}
)
logger.debug(f"Found {len(items)} items in directory: {path}")
return items
except HTTPStatusError as e:
logger.error(f"HTTP error listing directory '{webdav_path}': {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error listing directory '{webdav_path}': {e}")
raise e
async def read_file(self, path: str) -> Tuple[bytes, str]:
"""Read a file's content via WebDAV GET."""
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
logger.debug(f"Reading file: {path}")
try:
response = await self._make_request("GET", webdav_path)
response.raise_for_status()
content = response.content
content_type = response.headers.get(
"content-type", "application/octet-stream"
)
logger.debug(f"Successfully read file '{path}' ({len(content)} bytes)")
return content, content_type
except HTTPStatusError as e:
logger.error(f"HTTP error reading file '{path}': {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error reading file '{path}': {e}")
raise e
async def write_file(
self, path: str, content: bytes, content_type: Optional[str] = None
) -> Dict[str, Any]:
"""Write content to a file via WebDAV PUT."""
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
logger.debug(f"Writing file: {path}")
if not content_type:
content_type, _ = mimetypes.guess_type(path)
if not content_type:
content_type = "application/octet-stream"
headers = {"Content-Type": content_type, "OCS-APIRequest": "true"}
try:
response = await self._make_request(
"PUT", webdav_path, content=content, headers=headers
)
response.raise_for_status()
logger.debug(f"Successfully wrote file '{path}'")
return {"status_code": response.status_code}
except HTTPStatusError as e:
logger.error(f"HTTP error writing file '{path}': {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error writing file '{path}': {e}")
raise e
async def create_directory(
self, path: str, recursive: bool = False
) -> Dict[str, Any]:
"""Create a directory via WebDAV MKCOL."""
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
if not webdav_path.endswith("/"):
webdav_path += "/"
logger.debug(f"Creating directory: {path}")
headers = {"OCS-APIRequest": "true"}
try:
response = await self._make_request("MKCOL", webdav_path, headers=headers)
response.raise_for_status()
logger.debug(f"Successfully created directory '{path}'")
return {"status_code": response.status_code}
except HTTPStatusError as e:
# Method Not Allowed - directory already exists
if e.response.status_code == 405:
logger.debug(f"Directory '{path}' already exists")
return {"status_code": 405, "message": "Directory already exists"}
# File Conflict - parent directory does not exist
if e.response.status_code == 409 and recursive:
# Extract parent directory path
path_parts = path.strip("/").split("/")
if len(path_parts) > 1:
parent_dir = "/".join(path_parts[:-1])
logger.debug(
f"Parent directory '{parent_dir}' doesn't exist, creating recursively"
)
await self.create_directory(parent_dir, recursive)
# Now try to create the original directory again
return await self.create_directory(path, recursive)
else:
# This shouldn't happen for single-level directories under root
logger.error(f"409 conflict for single-level directory '{path}'")
raise e
logger.error(f"HTTP error creating directory '{path}': {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error creating directory '{path}': {e}")
raise e
async def move_resource(
self, source_path: str, destination_path: str, overwrite: bool = False
) -> Dict[str, Any]:
"""Move or rename a resource (file or directory) via WebDAV MOVE.
Args:
source_path: The path of the file or directory to move
destination_path: The new path for the file or directory
overwrite: Whether to overwrite the destination if it exists
Returns:
Dict with status_code and optional message
"""
source_webdav_path = f"{self._get_webdav_base_path()}/{source_path.lstrip('/')}"
destination_webdav_path = (
f"{self._get_webdav_base_path()}/{destination_path.lstrip('/')}"
)
# Ensure paths have consistent trailing slashes for directories
if source_path.endswith("/") and not destination_path.endswith("/"):
destination_webdav_path += "/"
elif not source_path.endswith("/") and destination_path.endswith("/"):
source_webdav_path += "/"
logger.debug(f"Moving resource from '{source_path}' to '{destination_path}'")
headers = {
"OCS-APIRequest": "true",
"Destination": destination_webdav_path,
"Overwrite": "T" if overwrite else "F",
}
try:
response = await self._make_request(
"MOVE", source_webdav_path, headers=headers
)
response.raise_for_status()
logger.debug(
f"Successfully moved resource from '{source_path}' to '{destination_path}'"
)
return {"status_code": response.status_code}
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"Source resource '{source_path}' not found")
return {"status_code": 404, "message": "Source resource not found"}
elif e.response.status_code == 412:
logger.debug(
f"Destination '{destination_path}' already exists and overwrite is false"
)
return {
"status_code": 412,
"message": "Destination already exists and overwrite is false",
}
elif e.response.status_code == 409:
logger.debug(
f"Parent directory of destination '{destination_path}' doesn't exist"
)
return {
"status_code": 409,
"message": "Parent directory of destination doesn't exist",
}
logger.debug(
f"Parent directory of destination '{destination_path}' doesn't exist"
)
return {
"status_code": 409,
"message": "Parent directory of destination doesn't exist",
}
else:
logger.error(
f"HTTP error moving resource from '{source_path}' to '{destination_path}': {e}"
)
raise e
except Exception as e:
logger.error(
f"Unexpected error moving resource from '{source_path}' to '{destination_path}': {e}"
)
raise e
async def copy_resource(
self, source_path: str, destination_path: str, overwrite: bool = False
) -> Dict[str, Any]:
"""Copy a resource (file or directory) via WebDAV COPY.
Args:
source_path: The path of the file or directory to copy
destination_path: The destination path for the copy
overwrite: Whether to overwrite the destination if it exists
Returns:
Dict with status_code and optional message
"""
source_webdav_path = f"{self._get_webdav_base_path()}/{source_path.lstrip('/')}"
destination_webdav_path = (
f"{self._get_webdav_base_path()}/{destination_path.lstrip('/')}"
)
# Ensure paths have consistent trailing slashes for directories
if source_path.endswith("/") and not destination_path.endswith("/"):
destination_webdav_path += "/"
elif not source_path.endswith("/") and destination_path.endswith("/"):
source_webdav_path += "/"
logger.debug(f"Copying resource from '{source_path}' to '{destination_path}'")
headers = {
"OCS-APIRequest": "true",
"Destination": destination_webdav_path,
"Overwrite": "T" if overwrite else "F",
}
try:
response = await self._make_request(
"COPY", source_webdav_path, headers=headers
)
response.raise_for_status()
logger.debug(
f"Successfully copied resource from '{source_path}' to '{destination_path}'"
)
return {"status_code": response.status_code}
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"Source resource '{source_path}' not found")
return {"status_code": 404, "message": "Source resource not found"}
elif e.response.status_code == 412:
logger.debug(
f"Destination '{destination_path}' already exists and overwrite is false"
)
return {
"status_code": 412,
"message": "Destination already exists and overwrite is false",
}
elif e.response.status_code == 409:
logger.debug(
f"Parent directory of destination '{destination_path}' doesn't exist"
)
return {
"status_code": 409,
"message": "Parent directory of destination doesn't exist",
}
else:
logger.error(
f"HTTP error copying resource from '{source_path}' to '{destination_path}': {e}"
)
raise e
except Exception as e:
logger.error(
f"Unexpected error copying resource from '{source_path}' to '{destination_path}': {e}"
)
raise e
async def search_files(
self,
scope: str = "",
where_conditions: Optional[str] = None,
properties: Optional[List[str]] = None,
order_by: Optional[List[Tuple[str, str]]] = None,
limit: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Search for files using WebDAV SEARCH method (RFC 5323).
Args:
scope: Directory path to search in (empty string for user root)
where_conditions: XML string for where clause conditions
properties: List of property names to retrieve (defaults to basic set)
order_by: List of (property, direction) tuples for sorting, e.g. [("getlastmodified", "descending")]
limit: Maximum number of results to return
Returns:
List of file/directory dictionaries with requested properties
"""
# Default properties if not specified
if properties is None:
properties = [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
]
# Build the SEARCH request XML
search_body = self._build_search_xml(
scope=scope,
where_conditions=where_conditions,
properties=properties,
order_by=order_by,
limit=limit,
)
# The SEARCH endpoint is at the dav root
search_path = "/remote.php/dav/"
headers = {"Content-Type": "text/xml", "OCS-APIRequest": "true"}
logger.debug(f"Searching files in scope: {scope}")
try:
response = await self._make_request(
"SEARCH", search_path, content=search_body, headers=headers
)
response.raise_for_status()
# Parse the XML response
results = self._parse_search_response(response.content, scope)
logger.debug(f"Search returned {len(results)} results")
return results
except HTTPStatusError as e:
logger.error(f"HTTP error during search: {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error during search: {e}")
raise e
def _build_search_xml(
self,
scope: str,
where_conditions: Optional[str],
properties: List[str],
order_by: Optional[List[Tuple[str, str]]],
limit: Optional[int],
) -> str:
"""Build the XML body for a SEARCH request."""
# Construct the scope path
username = self.username
scope_path = f"/files/{username}"
if scope:
scope_path = f"{scope_path}/{scope.lstrip('/')}"
# Build property list
prop_xml = "\n".join([self._property_to_xml(prop) for prop in properties])
# Build where clause
where_xml = where_conditions if where_conditions else ""
# Build order by clause
orderby_xml = ""
if order_by:
order_elements = []
for prop, direction in order_by:
prop_element = self._property_to_xml(prop)
dir_element = (
"<d:ascending/>"
if direction.lower() == "ascending"
else "<d:descending/>"
)
order_elements.append(f"<d:order>{prop_element}{dir_element}</d:order>")
orderby_xml = "\n".join(order_elements)
else:
orderby_xml = ""
# Build limit clause
limit_xml = (
f"<d:limit><d:nresults>{limit}</d:nresults></d:limit>" if limit else ""
)
# Construct the full SEARCH XML
search_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<d:searchrequest xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:basicsearch>
<d:select>
<d:prop>
{prop_xml}
</d:prop>
</d:select>
<d:from>
<d:scope>
<d:href>{scope_path}</d:href>
<d:depth>infinity</d:depth>
</d:scope>
</d:from>
<d:where>
{where_xml}
</d:where>
<d:orderby>
{orderby_xml}
</d:orderby>
{limit_xml}
</d:basicsearch>
</d:searchrequest>"""
return search_xml
def _property_to_xml(self, prop: str) -> str:
"""Convert a property name to its XML element."""
# Handle properties with namespace prefixes
if prop.startswith("{"):
# Already a full namespace
namespace_end = prop.index("}")
namespace = prop[1:namespace_end]
local_name = prop[namespace_end + 1 :]
# Map namespace URIs to prefixes
ns_map = {
"DAV:": "d",
"http://owncloud.org/ns": "oc",
"http://nextcloud.org/ns": "nc",
}
prefix = ns_map.get(namespace, "d")
return f"<{prefix}:{local_name}/>"
else:
# Guess namespace based on common properties
if prop in [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
"quota-available-bytes",
"quota-used-bytes",
]:
return f"<d:{prop}/>"
elif prop in [
"fileid",
"size",
"permissions",
"favorite",
"tags",
"owner-id",
"owner-display-name",
"share-types",
"checksums",
"comments-count",
"comments-unread",
]:
return f"<oc:{prop}/>"
else:
# Assume nc namespace for newer properties
return f"<nc:{prop}/>"
def _parse_search_response(
self, xml_content: bytes, scope: str
) -> List[Dict[str, Any]]:
"""Parse the XML response from a SEARCH request."""
root = ET.fromstring(xml_content)
items = []
# Process each response element
responses = root.findall(".//{DAV:}response")
for response_elem in responses:
href = response_elem.find(".//{DAV:}href")
if href is None:
continue
# Extract file/directory path from href
href_text = href.text or ""
# Remove the /remote.php/dav/files/username/ prefix to get relative path
path_parts = href_text.split("/files/")
if len(path_parts) > 1:
# Get the path after username
path_after_user = "/".join(path_parts[1].split("/")[1:])
relative_path = path_after_user.rstrip("/")
else:
relative_path = href_text.rstrip("/").split("/")[-1]
# Get properties
propstat = response_elem.find(".//{DAV:}propstat")
if propstat is None:
continue
prop = propstat.find(".//{DAV:}prop")
if prop is None:
continue
# Build item dictionary
item = {"path": relative_path, "href": href_text}
# Extract all properties
for child in prop:
tag = child.tag
value = child.text
# Remove namespace from tag
if "}" in tag:
tag = tag.split("}", 1)[1]
# Handle special properties
if tag == "resourcetype":
item["is_directory"] = child.find(".//{DAV:}collection") is not None
elif tag == "getcontentlength":
item["size"] = int(value) if value else 0
elif tag == "displayname":
item["name"] = value
elif tag == "getcontenttype":
item["content_type"] = value
elif tag == "getlastmodified":
item["last_modified"] = value
elif tag == "getetag":
item["etag"] = value.strip('"') if value else None
elif tag == "fileid":
item["file_id"] = int(value) if value else None
elif tag == "favorite":
item["is_favorite"] = value == "1"
elif tag == "tags":
# Tags can be comma-separated or have multiple child elements
if value:
# Handle comma-separated tags
item["tags"] = [
t.strip() for t in value.split(",") if t.strip()
]
else:
# Check for child tag elements (alternative format)
tag_elements = child.findall(".//{http://owncloud.org/ns}tag")
if tag_elements:
item["tags"] = [t.text for t in tag_elements if t.text]
else:
item["tags"] = []
elif tag == "permissions":
item["permissions"] = value
elif tag == "size":
# oc:size includes folder sizes
item["total_size"] = int(value) if value else 0
else:
# Store other properties as-is
item[tag] = value
items.append(item)
return items
async def find_by_name(
self, pattern: str, scope: str = "", limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Find files by name pattern using LIKE matching.
Args:
pattern: Name pattern to search for (supports % wildcard)
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
List of matching files/directories
Examples:
# Find all .txt files
results = await find_by_name("%.txt")
# Find files starting with "report"
results = await find_by_name("report%")
"""
where_conditions = f"""
<d:like>
<d:prop>
<d:displayname/>
</d:prop>
<d:literal>{pattern}</d:literal>
</d:like>
"""
return await self.search_files(
scope=scope, where_conditions=where_conditions, limit=limit
)
async def find_by_type(
self, mime_type: str, scope: str = "", limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Find files by MIME type.
Args:
mime_type: MIME type to search for (supports % wildcard, e.g., "image/%")
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
List of matching files
Examples:
# Find all images
results = await find_by_type("image/%")
# Find all PDFs
results = await find_by_type("application/pdf")
"""
where_conditions = f"""
<d:like>
<d:prop>
<d:getcontenttype/>
</d:prop>
<d:literal>{mime_type}</d:literal>
</d:like>
"""
return await self.search_files(
scope=scope, where_conditions=where_conditions, limit=limit
)
async def list_favorites(
self, scope: str = "", limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""List all favorite files.
Args:
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
List of favorite files/directories
Examples:
# List all favorites
results = await list_favorites()
# List favorites in a specific folder
results = await list_favorites(scope="Documents")
"""
# Use REPORT method for favorites as it's more efficient
# But we can also use SEARCH as fallback
where_conditions = """
<d:eq>
<d:prop>
<oc:favorite/>
</d:prop>
<d:literal>1</d:literal>
</d:eq>
"""
# Request favorite property
properties = [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
"fileid",
"favorite",
]
return await self.search_files(
scope=scope,
where_conditions=where_conditions,
properties=properties,
limit=limit,
)
async def find_by_tag(
self, tag_name: str, scope: str = "", limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Find files by tag name.
DEPRECATED: Use NextcloudClient.find_files_by_tag() instead, which uses
the proper OCS Tags API rather than WebDAV SEARCH.
Args:
tag_name: Tag to filter by (e.g., "vector-index")
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
List of files/directories with the specified tag
Examples:
# Find all files tagged with "vector-index"
results = await find_by_tag("vector-index")
# Find tagged files in a specific folder
results = await find_by_tag("vector-index", scope="Documents")
"""
# Use LIKE for tag matching since tags can be comma-separated
where_conditions = f"""
<d:like>
<d:prop>
<oc:tags/>
</d:prop>
<d:literal>%{tag_name}%</d:literal>
</d:like>
"""
# Request tag property along with standard properties
properties = [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
"fileid",
"tags",
]
return await self.search_files(
scope=scope,
where_conditions=where_conditions,
properties=properties,
limit=limit,
)
async def _get_file_info_by_id(self, file_id: int) -> Dict[str, Any]:
"""Get file information by Nextcloud file ID using WebDAV.
Args:
file_id: Nextcloud internal file ID
Returns:
File information dictionary with path, size, content_type, etc.
Raises:
HTTPStatusError: If file not found or request fails
"""
# Nextcloud allows accessing files by ID via special meta endpoint
meta_path = f"/remote.php/dav/meta/{file_id}/"
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:prop>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:resourcetype/>
<d:getetag/>
<oc:fileid/>
</d:prop>
</d:propfind>"""
headers = {"Depth": "0", "Content-Type": "text/xml", "OCS-APIRequest": "true"}
response = await self._make_request(
"PROPFIND", meta_path, content=propfind_body, headers=headers
)
response.raise_for_status()
# Parse the XML response
root = ET.fromstring(response.content)
responses = root.findall(".//{DAV:}response")
if not responses:
raise RuntimeError(f"File ID {file_id} not found")
response_elem = responses[0]
href = response_elem.find(".//{DAV:}href")
if href is None:
raise RuntimeError(f"No href in response for file ID {file_id}")
propstat = response_elem.find(".//{DAV:}propstat")
if propstat is None:
raise RuntimeError(f"No propstat for file ID {file_id}")
prop = propstat.find(".//{DAV:}prop")
if prop is None:
raise RuntimeError(f"No prop for file ID {file_id}")
# Extract file path from displayname or construct from file ID
displayname_elem = prop.find(".//{DAV:}displayname")
name = (
displayname_elem.text if displayname_elem is not None else f"file_{file_id}"
)
# Get file properties
size_elem = prop.find(".//{DAV:}getcontentlength")
size = int(size_elem.text) if size_elem is not None and size_elem.text else 0
content_type_elem = prop.find(".//{DAV:}getcontenttype")
content_type = content_type_elem.text if content_type_elem is not None else None
modified_elem = prop.find(".//{DAV:}getlastmodified")
modified = modified_elem.text if modified_elem is not None else None
etag_elem = prop.find(".//{DAV:}getetag")
etag = (
etag_elem.text.strip('"')
if etag_elem is not None and etag_elem.text
else None
)
# Check if it's a directory
resourcetype = prop.find(".//{DAV:}resourcetype")
is_directory = (
resourcetype is not None
and resourcetype.find(".//{DAV:}collection") is not None
)
# Try to get actual file path - meta endpoint doesn't give us the real path
# so we'll construct a reasonable path from the name
# The calling code in NextcloudClient will have the context to determine the actual path
file_info = {
"name": name,
"path": f"/{name}", # Placeholder - caller should use WebDAV to get real path if needed
"size": size,
"content_type": content_type,
"last_modified": modified,
"etag": etag,
"is_directory": is_directory,
"file_id": file_id,
}
logger.debug(f"Retrieved file info for ID {file_id}: {name}")
return file_info
async def get_tag_by_name(self, tag_name: str) -> dict[str, Any] | None:
"""Get a system tag by its name via WebDAV.
Args:
tag_name: Name of the tag to find (case-sensitive)
Returns:
Tag dictionary if found, None otherwise
"""
# Use WebDAV PROPFIND to list all systemtags
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:prop>
<oc:id/>
<oc:display-name/>
<oc:user-visible/>
<oc:user-assignable/>
</d:prop>
</d:propfind>"""
response = await self._client.request(
"PROPFIND",
"/remote.php/dav/systemtags/",
headers={"Depth": "1"},
content=propfind_body,
)
response.raise_for_status()
# Parse XML response
root = ET.fromstring(response.content)
ns = {
"d": "DAV:",
"oc": "http://owncloud.org/ns",
}
for response_elem in root.findall("d:response", ns):
href = response_elem.find("d:href", ns)
if href is None or href.text == "/remote.php/dav/systemtags/":
# Skip the collection itself
continue
propstat = response_elem.find("d:propstat", ns)
if propstat is None:
continue
prop = propstat.find("d:prop", ns)
if prop is None:
continue
# Extract tag properties
tag_id_elem = prop.find("oc:id", ns)
display_name_elem = prop.find("oc:display-name", ns)
user_visible_elem = prop.find("oc:user-visible", ns)
user_assignable_elem = prop.find("oc:user-assignable", ns)
if display_name_elem is not None and display_name_elem.text == tag_name:
tag_info = {
"id": int(tag_id_elem.text)
if tag_id_elem is not None and tag_id_elem.text is not None
else None,
"name": display_name_elem.text,
"userVisible": user_visible_elem.text.lower() == "true"
if user_visible_elem is not None
and user_visible_elem.text is not None
else True,
"userAssignable": user_assignable_elem.text.lower() == "true"
if user_assignable_elem is not None
and user_assignable_elem.text is not None
else True,
}
logger.debug(f"Found tag '{tag_name}' with ID {tag_info['id']}")
return tag_info
logger.debug(f"Tag '{tag_name}' not found")
return None
async def get_files_by_tag(self, tag_id: int) -> list[dict[str, Any]]:
"""Get all files tagged with a specific system tag via WebDAV REPORT.
Args:
tag_id: Numeric ID of the tag
Returns:
List of file info dictionaries with path, size, content_type, etc.
"""
# Use WebDAV REPORT method with systemtag filter, requesting all properties
report_body = f"""<?xml version="1.0"?>
<oc:filter-files xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
<oc:fileid/>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:getetag/>
</d:prop>
<oc:filter-rules>
<oc:systemtag>{tag_id}</oc:systemtag>
</oc:filter-rules>
</oc:filter-files>"""
response = await self._client.request(
"REPORT",
f"{self._get_webdav_base_path()}/",
content=report_body,
)
response.raise_for_status()
# Parse XML response
root = ET.fromstring(response.content)
ns = {
"d": "DAV:",
"oc": "http://owncloud.org/ns",
}
files = []
for response_elem in root.findall("d:response", ns):
# Extract href (file path)
href_elem = response_elem.find("d:href", ns)
if href_elem is None or not href_elem.text:
continue
propstat = response_elem.find("d:propstat", ns)
if propstat is None:
continue
prop = propstat.find("d:prop", ns)
if prop is None:
continue
# Extract all properties
fileid_elem = prop.find("oc:fileid", ns)
displayname_elem = prop.find("d:displayname", ns)
contentlength_elem = prop.find("d:getcontentlength", ns)
contenttype_elem = prop.find("d:getcontenttype", ns)
lastmodified_elem = prop.find("d:getlastmodified", ns)
etag_elem = prop.find("d:getetag", ns)
if fileid_elem is None or not fileid_elem.text:
continue
# Decode href path and extract the file path
href_path = unquote(href_elem.text)
# Remove WebDAV prefix to get user-relative path
webdav_prefix = f"/remote.php/dav/files/{self.username}/"
file_path = href_path.replace(webdav_prefix, "/")
# Parse last modified timestamp
last_modified_timestamp = None
if lastmodified_elem is not None and lastmodified_elem.text:
try:
dt = parsedate_to_datetime(lastmodified_elem.text)
last_modified_timestamp = int(dt.timestamp())
except Exception:
pass
file_info = {
"id": int(fileid_elem.text),
"path": file_path,
"name": displayname_elem.text
if displayname_elem is not None
else file_path.split("/")[-1],
"size": int(contentlength_elem.text)
if contentlength_elem is not None and contentlength_elem.text
else 0,
"content_type": contenttype_elem.text
if contenttype_elem is not None
else "",
"last_modified": lastmodified_elem.text
if lastmodified_elem is not None
else None,
"last_modified_timestamp": last_modified_timestamp,
"etag": etag_elem.text if etag_elem is not None else None,
}
files.append(file_info)
logger.debug(f"Found {len(files)} files with tag ID {tag_id}")
return files
async def get_file_info(self, path: str) -> dict[str, Any] | None:
"""Get file info including file ID via WebDAV PROPFIND.
Args:
path: Path to the file (relative to user's files directory)
Returns:
File info dictionary with id, name, size, content_type, etc.
Returns None if file not found.
"""
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:prop>
<oc:fileid/>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:getetag/>
<d:resourcetype/>
</d:prop>
</d:propfind>"""
try:
response = await self._client.request(
"PROPFIND",
webdav_path,
headers={"Depth": "0"},
content=propfind_body,
)
response.raise_for_status()
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"File not found: {path}")
return None
raise
# Parse XML response
root = ET.fromstring(response.content)
ns = {
"d": "DAV:",
"oc": "http://owncloud.org/ns",
}
response_elem = root.find("d:response", ns)
if response_elem is None:
return None
propstat = response_elem.find("d:propstat", ns)
if propstat is None:
return None
prop = propstat.find("d:prop", ns)
if prop is None:
return None
# Extract properties
fileid_elem = prop.find("oc:fileid", ns)
displayname_elem = prop.find("d:displayname", ns)
contentlength_elem = prop.find("d:getcontentlength", ns)
contenttype_elem = prop.find("d:getcontenttype", ns)
lastmodified_elem = prop.find("d:getlastmodified", ns)
etag_elem = prop.find("d:getetag", ns)
resourcetype_elem = prop.find("d:resourcetype", ns)
is_directory = (
resourcetype_elem is not None
and resourcetype_elem.find("d:collection", ns) is not None
)
file_info = {
"id": int(fileid_elem.text)
if fileid_elem is not None and fileid_elem.text is not None
else None,
"path": path,
"name": displayname_elem.text
if displayname_elem is not None
else path.split("/")[-1],
"size": int(contentlength_elem.text)
if contentlength_elem is not None and contentlength_elem.text
else 0,
"content_type": contenttype_elem.text
if contenttype_elem is not None
else "",
"last_modified": lastmodified_elem.text
if lastmodified_elem is not None
else None,
"etag": etag_elem.text.strip('"')
if etag_elem is not None and etag_elem.text
else None,
"is_directory": is_directory,
}
logger.debug(f"Got file info for '{path}': id={file_info['id']}")
return file_info
async def create_tag(
self,
name: str,
user_visible: bool = True,
user_assignable: bool = True,
) -> dict[str, Any]:
"""Create a system tag via WebDAV.
Args:
name: Name of the tag to create
user_visible: Whether the tag is visible to users
user_assignable: Whether users can assign this tag
Returns:
Tag dictionary with id, name, userVisible, userAssignable
Raises:
HTTPStatusError: If tag creation fails (409 if already exists)
"""
# Use WebDAV POST with JSON body to create tag
response = await self._client.post(
"/remote.php/dav/systemtags/",
headers={"Content-Type": "application/json"},
json={
"name": name,
"userVisible": user_visible,
"userAssignable": user_assignable,
},
)
response.raise_for_status()
# Extract tag ID from Content-Location header (e.g., /remote.php/dav/systemtags/42)
content_location = response.headers.get("Content-Location", "")
tag_id = None
if content_location:
# Extract the numeric ID from the path
try:
tag_id = int(content_location.rstrip("/").split("/")[-1])
except (ValueError, IndexError):
pass
tag_info = {
"id": tag_id,
"name": name,
"userVisible": user_visible,
"userAssignable": user_assignable,
}
logger.info(f"Created tag '{name}' with ID {tag_info['id']}")
return tag_info
async def get_or_create_tag(
self,
name: str,
user_visible: bool = True,
user_assignable: bool = True,
) -> dict[str, Any]:
"""Get a tag by name, creating it if it doesn't exist.
Args:
name: Name of the tag
user_visible: Whether the tag is visible to users (for creation)
user_assignable: Whether users can assign this tag (for creation)
Returns:
Tag dictionary with id, name, userVisible, userAssignable
"""
# First try to get existing tag
existing_tag = await self.get_tag_by_name(name)
if existing_tag:
logger.debug(f"Tag '{name}' already exists with ID {existing_tag['id']}")
return existing_tag
# Create new tag
try:
return await self.create_tag(name, user_visible, user_assignable)
except HTTPStatusError as e:
if e.response.status_code == 409:
# Tag was created between our check and creation, fetch it
existing_tag = await self.get_tag_by_name(name)
if existing_tag:
return existing_tag
raise
async def assign_tag_to_file(self, file_id: int, tag_id: int) -> bool:
"""Assign a system tag to a file.
Args:
file_id: Numeric file ID
tag_id: Numeric tag ID
Returns:
True if tag was assigned successfully (or already assigned)
Raises:
HTTPStatusError: If tag assignment fails
"""
response = await self._client.request(
"PUT",
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
headers={"Content-Length": "0"},
content=b"",
)
# 201 = Created (new assignment), 409 = Conflict (already assigned)
if response.status_code in (201, 409):
logger.info(f"Tagged file {file_id} with tag {tag_id}")
return True
response.raise_for_status()
return True
async def remove_tag_from_file(self, file_id: int, tag_id: int) -> bool:
"""Remove a system tag from a file.
Args:
file_id: Numeric file ID
tag_id: Numeric tag ID
Returns:
True if tag was removed successfully (or wasn't assigned)
Raises:
HTTPStatusError: If tag removal fails
"""
response = await self._client.request(
"DELETE",
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
)
# 204 = No Content (removed), 404 = Not Found (wasn't assigned)
if response.status_code in (204, 404):
logger.info(f"Removed tag {tag_id} from file {file_id}")
return True
response.raise_for_status()
return True