Skip to main content
Glama
cbcoutinho

Nextcloud MCP Server

by cbcoutinho
webdav.py55.9 kB
"""WebDAV client for Nextcloud file operations.""" import logging import mimetypes import xml.etree.ElementTree as ET from typing import Any, Dict, List, Optional, Tuple from httpx import HTTPStatusError from .base import BaseNextcloudClient logger = logging.getLogger(__name__) class WebDAVClient(BaseNextcloudClient): """Client for Nextcloud WebDAV operations.""" app_name = "webdav" async def delete_resource(self, path: str) -> Dict[str, Any]: """Delete a resource (file or directory) via WebDAV DELETE.""" # Ensure path ends with a slash if it's a directory if not path.endswith("/"): path_with_slash = f"{path}/" else: path_with_slash = path webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}" logger.debug(f"Deleting WebDAV resource: {webdav_path}") headers = {"OCS-APIRequest": "true"} try: # First try a PROPFIND to verify resource exists propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} try: propfind_resp = await self._make_request( "PROPFIND", webdav_path, headers=propfind_headers ) logger.debug( f"Resource exists check status: {propfind_resp.status_code}" ) except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Resource '{path}' doesn't exist, no deletion needed") return {"status_code": 404} # For other errors, continue with deletion attempt # Proceed with deletion response = await self._make_request("DELETE", webdav_path, headers=headers) logger.debug(f"Successfully deleted WebDAV resource '{path}'") return {"status_code": response.status_code} except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Resource '{path}' not found, no deletion needed") return {"status_code": 404} else: logger.error(f"HTTP error deleting WebDAV resource '{path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error deleting WebDAV resource '{path}': {e}") raise e async def cleanup_old_attachment_directory( self, note_id: int, old_category: str ) -> Dict[str, Any]: """Clean up the attachment directory for a note in its old category location.""" old_category_path_part = f"{old_category}/" if old_category else "" old_attachment_dir_path = ( f"Notes/{old_category_path_part}.attachments.{note_id}/" ) logger.debug(f"Cleaning up old attachment directory: {old_attachment_dir_path}") try: delete_result = await self.delete_resource(path=old_attachment_dir_path) logger.debug(f"Cleanup result: {delete_result}") return delete_result except Exception as e: logger.error(f"Error during cleanup of old attachment directory: {e}") raise e async def cleanup_note_attachments( self, note_id: int, category: str ) -> Dict[str, Any]: """Clean up attachment directory for a specific note and category.""" cat_path_part = f"{category}/" if category else "" attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/" logger.debug( f"Cleaning up attachments for note {note_id} in category '{category}'" ) try: delete_result = await self.delete_resource(path=attachment_dir_path) logger.debug(f"Cleanup result for note {note_id}: {delete_result}") return delete_result except Exception as e: logger.error(f"Failed cleaning up attachments for note {note_id}: {e}") raise e async def add_note_attachment( self, note_id: int, filename: str, content: bytes, category: Optional[str] = None, mime_type: Optional[str] = None, ) -> Dict[str, Any]: """Add/Update an attachment to a note via WebDAV PUT.""" # Construct paths based on provided category webdav_base = self._get_webdav_base_path() category_path_part = f"{category}/" if category else "" attachment_dir_segment = f".attachments.{note_id}" parent_dir_webdav_rel_path = ( f"Notes/{category_path_part}{attachment_dir_segment}" ) parent_dir_path = f"{webdav_base}/{parent_dir_webdav_rel_path}" attachment_path = f"{parent_dir_path}/{filename}" logger.debug(f"Uploading attachment '{filename}' for note {note_id}") if not mime_type: mime_type, _ = mimetypes.guess_type(filename) if not mime_type: mime_type = "application/octet-stream" headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"} try: # First check if we can access WebDAV at all notes_dir_path = f"{webdav_base}/Notes" propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} notes_dir_response = await self._make_request( "PROPFIND", notes_dir_path, headers=propfind_headers ) if notes_dir_response.status_code == 401: logger.error("WebDAV authentication failed for Notes directory") raise HTTPStatusError( f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}", request=notes_dir_response.request, response=notes_dir_response, ) elif notes_dir_response.status_code >= 400: logger.error( f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}" ) notes_dir_response.raise_for_status() # Ensure the parent directory exists using MKCOL mkcol_headers = {"OCS-APIRequest": "true"} mkcol_response = await self._make_request( "MKCOL", parent_dir_path, headers=mkcol_headers ) # MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists) if mkcol_response.status_code not in [201, 405]: logger.error( f"Unexpected status code {mkcol_response.status_code} when creating attachments directory" ) mkcol_response.raise_for_status() # Proceed with the PUT request response = await self._make_request( "PUT", attachment_path, content=content, headers=headers ) response.raise_for_status() logger.debug( f"Successfully uploaded attachment '{filename}' to note {note_id}" ) return {"status_code": response.status_code} except HTTPStatusError as e: logger.error( f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}" ) raise e except Exception as e: logger.error( f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}" ) raise e async def get_note_attachment( self, note_id: int, filename: str, category: Optional[str] = None ) -> Tuple[bytes, str]: """Fetch a specific attachment from a note via WebDAV GET.""" webdav_base = self._get_webdav_base_path() category_path_part = f"{category}/" if category else "" attachment_dir_segment = f".attachments.{note_id}" attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}" logger.debug(f"Fetching attachment '{filename}' for note {note_id}") try: response = await self._make_request("GET", attachment_path) response.raise_for_status() content = response.content mime_type = response.headers.get("content-type", "application/octet-stream") logger.debug( f"Successfully fetched attachment '{filename}' ({len(content)} bytes)" ) return content, mime_type except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Attachment '{filename}' not found for note {note_id}") else: logger.error( f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}" ) raise e except Exception as e: logger.error( f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}" ) raise e async def list_directory(self, path: str = "") -> List[Dict[str, Any]]: """List files and directories in the specified path via WebDAV PROPFIND.""" webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" if not webdav_path.endswith("/"): webdav_path += "/" logger.debug(f"Listing directory: {path}") propfind_body = """<?xml version="1.0"?> <d:propfind xmlns:d="DAV:"> <d:prop> <d:displayname/> <d:getcontentlength/> <d:getcontenttype/> <d:getlastmodified/> <d:resourcetype/> </d:prop> </d:propfind>""" headers = {"Depth": "1", "Content-Type": "text/xml", "OCS-APIRequest": "true"} try: response = await self._make_request( "PROPFIND", webdav_path, content=propfind_body, headers=headers ) response.raise_for_status() # Parse the XML response root = ET.fromstring(response.content) items = [] # Skip the first response (the directory itself) responses = root.findall(".//{DAV:}response")[1:] for response_elem in responses: href = response_elem.find(".//{DAV:}href") if href is None: continue # Extract file/directory name from href href_text = href.text or "" name = href_text.rstrip("/").split("/")[-1] if not name: continue # Get properties propstat = response_elem.find(".//{DAV:}propstat") if propstat is None: continue prop = propstat.find(".//{DAV:}prop") if prop is None: continue # Determine if it's a directory resourcetype = prop.find(".//{DAV:}resourcetype") is_directory = ( resourcetype is not None and resourcetype.find(".//{DAV:}collection") is not None ) # Get other properties size_elem = prop.find(".//{DAV:}getcontentlength") size = ( int(size_elem.text) if size_elem is not None and size_elem.text else 0 ) content_type_elem = prop.find(".//{DAV:}getcontenttype") content_type = ( content_type_elem.text if content_type_elem is not None else None ) modified_elem = prop.find(".//{DAV:}getlastmodified") modified = modified_elem.text if modified_elem is not None else None items.append( { "name": name, "path": f"{path.rstrip('/')}/{name}" if path else name, "is_directory": is_directory, "size": size if not is_directory else None, "content_type": content_type, "last_modified": modified, } ) logger.debug(f"Found {len(items)} items in directory: {path}") return items except HTTPStatusError as e: logger.error(f"HTTP error listing directory '{webdav_path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error listing directory '{webdav_path}': {e}") raise e async def read_file(self, path: str) -> Tuple[bytes, str]: """Read a file's content via WebDAV GET.""" webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" logger.debug(f"Reading file: {path}") try: response = await self._make_request("GET", webdav_path) response.raise_for_status() content = response.content content_type = response.headers.get( "content-type", "application/octet-stream" ) logger.debug(f"Successfully read file '{path}' ({len(content)} bytes)") return content, content_type except HTTPStatusError as e: logger.error(f"HTTP error reading file '{path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error reading file '{path}': {e}") raise e async def write_file( self, path: str, content: bytes, content_type: Optional[str] = None ) -> Dict[str, Any]: """Write content to a file via WebDAV PUT.""" webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" logger.debug(f"Writing file: {path}") if not content_type: content_type, _ = mimetypes.guess_type(path) if not content_type: content_type = "application/octet-stream" headers = {"Content-Type": content_type, "OCS-APIRequest": "true"} try: response = await self._make_request( "PUT", webdav_path, content=content, headers=headers ) response.raise_for_status() logger.debug(f"Successfully wrote file '{path}'") return {"status_code": response.status_code} except HTTPStatusError as e: logger.error(f"HTTP error writing file '{path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error writing file '{path}': {e}") raise e async def create_directory( self, path: str, recursive: bool = False ) -> Dict[str, Any]: """Create a directory via WebDAV MKCOL.""" webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" if not webdav_path.endswith("/"): webdav_path += "/" logger.debug(f"Creating directory: {path}") headers = {"OCS-APIRequest": "true"} try: response = await self._make_request("MKCOL", webdav_path, headers=headers) response.raise_for_status() logger.debug(f"Successfully created directory '{path}'") return {"status_code": response.status_code} except HTTPStatusError as e: # Method Not Allowed - directory already exists if e.response.status_code == 405: logger.debug(f"Directory '{path}' already exists") return {"status_code": 405, "message": "Directory already exists"} # File Conflict - parent directory does not exist if e.response.status_code == 409 and recursive: # Extract parent directory path path_parts = path.strip("/").split("/") if len(path_parts) > 1: parent_dir = "/".join(path_parts[:-1]) logger.debug( f"Parent directory '{parent_dir}' doesn't exist, creating recursively" ) await self.create_directory(parent_dir, recursive) # Now try to create the original directory again return await self.create_directory(path, recursive) else: # This shouldn't happen for single-level directories under root logger.error(f"409 conflict for single-level directory '{path}'") raise e logger.error(f"HTTP error creating directory '{path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error creating directory '{path}': {e}") raise e async def move_resource( self, source_path: str, destination_path: str, overwrite: bool = False ) -> Dict[str, Any]: """Move or rename a resource (file or directory) via WebDAV MOVE. Args: source_path: The path of the file or directory to move destination_path: The new path for the file or directory overwrite: Whether to overwrite the destination if it exists Returns: Dict with status_code and optional message """ source_webdav_path = f"{self._get_webdav_base_path()}/{source_path.lstrip('/')}" destination_webdav_path = ( f"{self._get_webdav_base_path()}/{destination_path.lstrip('/')}" ) # Ensure paths have consistent trailing slashes for directories if source_path.endswith("/") and not destination_path.endswith("/"): destination_webdav_path += "/" elif not source_path.endswith("/") and destination_path.endswith("/"): source_webdav_path += "/" logger.debug(f"Moving resource from '{source_path}' to '{destination_path}'") headers = { "OCS-APIRequest": "true", "Destination": destination_webdav_path, "Overwrite": "T" if overwrite else "F", } try: response = await self._make_request( "MOVE", source_webdav_path, headers=headers ) response.raise_for_status() logger.debug( f"Successfully moved resource from '{source_path}' to '{destination_path}'" ) return {"status_code": response.status_code} except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Source resource '{source_path}' not found") return {"status_code": 404, "message": "Source resource not found"} elif e.response.status_code == 412: logger.debug( f"Destination '{destination_path}' already exists and overwrite is false" ) return { "status_code": 412, "message": "Destination already exists and overwrite is false", } elif e.response.status_code == 409: logger.debug( f"Parent directory of destination '{destination_path}' doesn't exist" ) return { "status_code": 409, "message": "Parent directory of destination doesn't exist", } logger.debug( f"Parent directory of destination '{destination_path}' doesn't exist" ) return { "status_code": 409, "message": "Parent directory of destination doesn't exist", } else: logger.error( f"HTTP error moving resource from '{source_path}' to '{destination_path}': {e}" ) raise e except Exception as e: logger.error( f"Unexpected error moving resource from '{source_path}' to '{destination_path}': {e}" ) raise e async def copy_resource( self, source_path: str, destination_path: str, overwrite: bool = False ) -> Dict[str, Any]: """Copy a resource (file or directory) via WebDAV COPY. Args: source_path: The path of the file or directory to copy destination_path: The destination path for the copy overwrite: Whether to overwrite the destination if it exists Returns: Dict with status_code and optional message """ source_webdav_path = f"{self._get_webdav_base_path()}/{source_path.lstrip('/')}" destination_webdav_path = ( f"{self._get_webdav_base_path()}/{destination_path.lstrip('/')}" ) # Ensure paths have consistent trailing slashes for directories if source_path.endswith("/") and not destination_path.endswith("/"): destination_webdav_path += "/" elif not source_path.endswith("/") and destination_path.endswith("/"): source_webdav_path += "/" logger.debug(f"Copying resource from '{source_path}' to '{destination_path}'") headers = { "OCS-APIRequest": "true", "Destination": destination_webdav_path, "Overwrite": "T" if overwrite else "F", } try: response = await self._make_request( "COPY", source_webdav_path, headers=headers ) response.raise_for_status() logger.debug( f"Successfully copied resource from '{source_path}' to '{destination_path}'" ) return {"status_code": response.status_code} except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Source resource '{source_path}' not found") return {"status_code": 404, "message": "Source resource not found"} elif e.response.status_code == 412: logger.debug( f"Destination '{destination_path}' already exists and overwrite is false" ) return { "status_code": 412, "message": "Destination already exists and overwrite is false", } elif e.response.status_code == 409: logger.debug( f"Parent directory of destination '{destination_path}' doesn't exist" ) return { "status_code": 409, "message": "Parent directory of destination doesn't exist", } else: logger.error( f"HTTP error copying resource from '{source_path}' to '{destination_path}': {e}" ) raise e except Exception as e: logger.error( f"Unexpected error copying resource from '{source_path}' to '{destination_path}': {e}" ) raise e async def search_files( self, scope: str = "", where_conditions: Optional[str] = None, properties: Optional[List[str]] = None, order_by: Optional[List[Tuple[str, str]]] = None, limit: Optional[int] = None, ) -> List[Dict[str, Any]]: """Search for files using WebDAV SEARCH method (RFC 5323). Args: scope: Directory path to search in (empty string for user root) where_conditions: XML string for where clause conditions properties: List of property names to retrieve (defaults to basic set) order_by: List of (property, direction) tuples for sorting, e.g. [("getlastmodified", "descending")] limit: Maximum number of results to return Returns: List of file/directory dictionaries with requested properties """ # Default properties if not specified if properties is None: properties = [ "displayname", "getcontentlength", "getcontenttype", "getlastmodified", "resourcetype", "getetag", ] # Build the SEARCH request XML search_body = self._build_search_xml( scope=scope, where_conditions=where_conditions, properties=properties, order_by=order_by, limit=limit, ) # The SEARCH endpoint is at the dav root search_path = "/remote.php/dav/" headers = {"Content-Type": "text/xml", "OCS-APIRequest": "true"} logger.debug(f"Searching files in scope: {scope}") try: response = await self._make_request( "SEARCH", search_path, content=search_body, headers=headers ) response.raise_for_status() # Parse the XML response results = self._parse_search_response(response.content, scope) logger.debug(f"Search returned {len(results)} results") return results except HTTPStatusError as e: logger.error(f"HTTP error during search: {e}") raise e except Exception as e: logger.error(f"Unexpected error during search: {e}") raise e def _build_search_xml( self, scope: str, where_conditions: Optional[str], properties: List[str], order_by: Optional[List[Tuple[str, str]]], limit: Optional[int], ) -> str: """Build the XML body for a SEARCH request.""" # Construct the scope path username = self.username scope_path = f"/files/{username}" if scope: scope_path = f"{scope_path}/{scope.lstrip('/')}" # Build property list prop_xml = "\n".join([self._property_to_xml(prop) for prop in properties]) # Build where clause where_xml = where_conditions if where_conditions else "" # Build order by clause orderby_xml = "" if order_by: order_elements = [] for prop, direction in order_by: prop_element = self._property_to_xml(prop) dir_element = ( "<d:ascending/>" if direction.lower() == "ascending" else "<d:descending/>" ) order_elements.append(f"<d:order>{prop_element}{dir_element}</d:order>") orderby_xml = "\n".join(order_elements) else: orderby_xml = "" # Build limit clause limit_xml = ( f"<d:limit><d:nresults>{limit}</d:nresults></d:limit>" if limit else "" ) # Construct the full SEARCH XML search_xml = f"""<?xml version="1.0" encoding="UTF-8"?> <d:searchrequest xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns"> <d:basicsearch> <d:select> <d:prop> {prop_xml} </d:prop> </d:select> <d:from> <d:scope> <d:href>{scope_path}</d:href> <d:depth>infinity</d:depth> </d:scope> </d:from> <d:where> {where_xml} </d:where> <d:orderby> {orderby_xml} </d:orderby> {limit_xml} </d:basicsearch> </d:searchrequest>""" return search_xml def _property_to_xml(self, prop: str) -> str: """Convert a property name to its XML element.""" # Handle properties with namespace prefixes if prop.startswith("{"): # Already a full namespace namespace_end = prop.index("}") namespace = prop[1:namespace_end] local_name = prop[namespace_end + 1 :] # Map namespace URIs to prefixes ns_map = { "DAV:": "d", "http://owncloud.org/ns": "oc", "http://nextcloud.org/ns": "nc", } prefix = ns_map.get(namespace, "d") return f"<{prefix}:{local_name}/>" else: # Guess namespace based on common properties if prop in [ "displayname", "getcontentlength", "getcontenttype", "getlastmodified", "resourcetype", "getetag", "quota-available-bytes", "quota-used-bytes", ]: return f"<d:{prop}/>" elif prop in [ "fileid", "size", "permissions", "favorite", "tags", "owner-id", "owner-display-name", "share-types", "checksums", "comments-count", "comments-unread", ]: return f"<oc:{prop}/>" else: # Assume nc namespace for newer properties return f"<nc:{prop}/>" def _parse_search_response( self, xml_content: bytes, scope: str ) -> List[Dict[str, Any]]: """Parse the XML response from a SEARCH request.""" root = ET.fromstring(xml_content) items = [] # Process each response element responses = root.findall(".//{DAV:}response") for response_elem in responses: href = response_elem.find(".//{DAV:}href") if href is None: continue # Extract file/directory path from href href_text = href.text or "" # Remove the /remote.php/dav/files/username/ prefix to get relative path path_parts = href_text.split("/files/") if len(path_parts) > 1: # Get the path after username path_after_user = "/".join(path_parts[1].split("/")[1:]) relative_path = path_after_user.rstrip("/") else: relative_path = href_text.rstrip("/").split("/")[-1] # Get properties propstat = response_elem.find(".//{DAV:}propstat") if propstat is None: continue prop = propstat.find(".//{DAV:}prop") if prop is None: continue # Build item dictionary item = {"path": relative_path, "href": href_text} # Extract all properties for child in prop: tag = child.tag value = child.text # Remove namespace from tag if "}" in tag: tag = tag.split("}", 1)[1] # Handle special properties if tag == "resourcetype": item["is_directory"] = child.find(".//{DAV:}collection") is not None elif tag == "getcontentlength": item["size"] = int(value) if value else 0 elif tag == "displayname": item["name"] = value elif tag == "getcontenttype": item["content_type"] = value elif tag == "getlastmodified": item["last_modified"] = value elif tag == "getetag": item["etag"] = value.strip('"') if value else None elif tag == "fileid": item["file_id"] = int(value) if value else None elif tag == "favorite": item["is_favorite"] = value == "1" elif tag == "tags": # Tags can be comma-separated or have multiple child elements if value: # Handle comma-separated tags item["tags"] = [ t.strip() for t in value.split(",") if t.strip() ] else: # Check for child tag elements (alternative format) tag_elements = child.findall(".//{http://owncloud.org/ns}tag") if tag_elements: item["tags"] = [t.text for t in tag_elements if t.text] else: item["tags"] = [] elif tag == "permissions": item["permissions"] = value elif tag == "size": # oc:size includes folder sizes item["total_size"] = int(value) if value else 0 else: # Store other properties as-is item[tag] = value items.append(item) return items async def find_by_name( self, pattern: str, scope: str = "", limit: Optional[int] = None ) -> List[Dict[str, Any]]: """Find files by name pattern using LIKE matching. Args: pattern: Name pattern to search for (supports % wildcard) scope: Directory path to search in (empty string for user root) limit: Maximum number of results to return Returns: List of matching files/directories Examples: # Find all .txt files results = await find_by_name("%.txt") # Find files starting with "report" results = await find_by_name("report%") """ where_conditions = f""" <d:like> <d:prop> <d:displayname/> </d:prop> <d:literal>{pattern}</d:literal> </d:like> """ return await self.search_files( scope=scope, where_conditions=where_conditions, limit=limit ) async def find_by_type( self, mime_type: str, scope: str = "", limit: Optional[int] = None ) -> List[Dict[str, Any]]: """Find files by MIME type. Args: mime_type: MIME type to search for (supports % wildcard, e.g., "image/%") scope: Directory path to search in (empty string for user root) limit: Maximum number of results to return Returns: List of matching files Examples: # Find all images results = await find_by_type("image/%") # Find all PDFs results = await find_by_type("application/pdf") """ where_conditions = f""" <d:like> <d:prop> <d:getcontenttype/> </d:prop> <d:literal>{mime_type}</d:literal> </d:like> """ return await self.search_files( scope=scope, where_conditions=where_conditions, limit=limit ) async def list_favorites( self, scope: str = "", limit: Optional[int] = None ) -> List[Dict[str, Any]]: """List all favorite files. Args: scope: Directory path to search in (empty string for user root) limit: Maximum number of results to return Returns: List of favorite files/directories Examples: # List all favorites results = await list_favorites() # List favorites in a specific folder results = await list_favorites(scope="Documents") """ # Use REPORT method for favorites as it's more efficient # But we can also use SEARCH as fallback where_conditions = """ <d:eq> <d:prop> <oc:favorite/> </d:prop> <d:literal>1</d:literal> </d:eq> """ # Request favorite property properties = [ "displayname", "getcontentlength", "getcontenttype", "getlastmodified", "resourcetype", "getetag", "fileid", "favorite", ] return await self.search_files( scope=scope, where_conditions=where_conditions, properties=properties, limit=limit, ) async def find_by_tag( self, tag_name: str, scope: str = "", limit: Optional[int] = None ) -> List[Dict[str, Any]]: """Find files by tag name. DEPRECATED: Use NextcloudClient.find_files_by_tag() instead, which uses the proper OCS Tags API rather than WebDAV SEARCH. Args: tag_name: Tag to filter by (e.g., "vector-index") scope: Directory path to search in (empty string for user root) limit: Maximum number of results to return Returns: List of files/directories with the specified tag Examples: # Find all files tagged with "vector-index" results = await find_by_tag("vector-index") # Find tagged files in a specific folder results = await find_by_tag("vector-index", scope="Documents") """ # Use LIKE for tag matching since tags can be comma-separated where_conditions = f""" <d:like> <d:prop> <oc:tags/> </d:prop> <d:literal>%{tag_name}%</d:literal> </d:like> """ # Request tag property along with standard properties properties = [ "displayname", "getcontentlength", "getcontenttype", "getlastmodified", "resourcetype", "getetag", "fileid", "tags", ] return await self.search_files( scope=scope, where_conditions=where_conditions, properties=properties, limit=limit, ) async def _get_file_info_by_id(self, file_id: int) -> Dict[str, Any]: """Get file information by Nextcloud file ID using WebDAV. Args: file_id: Nextcloud internal file ID Returns: File information dictionary with path, size, content_type, etc. Raises: HTTPStatusError: If file not found or request fails """ # Nextcloud allows accessing files by ID via special meta endpoint meta_path = f"/remote.php/dav/meta/{file_id}/" propfind_body = """<?xml version="1.0"?> <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"> <d:prop> <d:displayname/> <d:getcontentlength/> <d:getcontenttype/> <d:getlastmodified/> <d:resourcetype/> <d:getetag/> <oc:fileid/> </d:prop> </d:propfind>""" headers = {"Depth": "0", "Content-Type": "text/xml", "OCS-APIRequest": "true"} response = await self._make_request( "PROPFIND", meta_path, content=propfind_body, headers=headers ) response.raise_for_status() # Parse the XML response root = ET.fromstring(response.content) responses = root.findall(".//{DAV:}response") if not responses: raise RuntimeError(f"File ID {file_id} not found") response_elem = responses[0] href = response_elem.find(".//{DAV:}href") if href is None: raise RuntimeError(f"No href in response for file ID {file_id}") propstat = response_elem.find(".//{DAV:}propstat") if propstat is None: raise RuntimeError(f"No propstat for file ID {file_id}") prop = propstat.find(".//{DAV:}prop") if prop is None: raise RuntimeError(f"No prop for file ID {file_id}") # Extract file path from displayname or construct from file ID displayname_elem = prop.find(".//{DAV:}displayname") name = ( displayname_elem.text if displayname_elem is not None else f"file_{file_id}" ) # Get file properties size_elem = prop.find(".//{DAV:}getcontentlength") size = int(size_elem.text) if size_elem is not None and size_elem.text else 0 content_type_elem = prop.find(".//{DAV:}getcontenttype") content_type = content_type_elem.text if content_type_elem is not None else None modified_elem = prop.find(".//{DAV:}getlastmodified") modified = modified_elem.text if modified_elem is not None else None etag_elem = prop.find(".//{DAV:}getetag") etag = ( etag_elem.text.strip('"') if etag_elem is not None and etag_elem.text else None ) # Check if it's a directory resourcetype = prop.find(".//{DAV:}resourcetype") is_directory = ( resourcetype is not None and resourcetype.find(".//{DAV:}collection") is not None ) # Try to get actual file path - meta endpoint doesn't give us the real path # so we'll construct a reasonable path from the name # The calling code in NextcloudClient will have the context to determine the actual path file_info = { "name": name, "path": f"/{name}", # Placeholder - caller should use WebDAV to get real path if needed "size": size, "content_type": content_type, "last_modified": modified, "etag": etag, "is_directory": is_directory, "file_id": file_id, } logger.debug(f"Retrieved file info for ID {file_id}: {name}") return file_info async def get_tag_by_name(self, tag_name: str) -> dict[str, Any] | None: """Get a system tag by its name via WebDAV. Args: tag_name: Name of the tag to find (case-sensitive) Returns: Tag dictionary if found, None otherwise """ # Use WebDAV PROPFIND to list all systemtags propfind_body = """<?xml version="1.0"?> <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"> <d:prop> <oc:id/> <oc:display-name/> <oc:user-visible/> <oc:user-assignable/> </d:prop> </d:propfind>""" response = await self._client.request( "PROPFIND", "/remote.php/dav/systemtags/", headers={"Depth": "1"}, content=propfind_body, ) response.raise_for_status() # Parse XML response root = ET.fromstring(response.content) ns = { "d": "DAV:", "oc": "http://owncloud.org/ns", } for response_elem in root.findall("d:response", ns): href = response_elem.find("d:href", ns) if href is None or href.text == "/remote.php/dav/systemtags/": # Skip the collection itself continue propstat = response_elem.find("d:propstat", ns) if propstat is None: continue prop = propstat.find("d:prop", ns) if prop is None: continue # Extract tag properties tag_id_elem = prop.find("oc:id", ns) display_name_elem = prop.find("oc:display-name", ns) user_visible_elem = prop.find("oc:user-visible", ns) user_assignable_elem = prop.find("oc:user-assignable", ns) if display_name_elem is not None and display_name_elem.text == tag_name: tag_info = { "id": int(tag_id_elem.text) if tag_id_elem is not None and tag_id_elem.text is not None else None, "name": display_name_elem.text, "userVisible": user_visible_elem.text.lower() == "true" if user_visible_elem is not None and user_visible_elem.text is not None else True, "userAssignable": user_assignable_elem.text.lower() == "true" if user_assignable_elem is not None and user_assignable_elem.text is not None else True, } logger.debug(f"Found tag '{tag_name}' with ID {tag_info['id']}") return tag_info logger.debug(f"Tag '{tag_name}' not found") return None async def get_files_by_tag(self, tag_id: int) -> list[dict[str, Any]]: """Get all files tagged with a specific system tag via WebDAV REPORT. Args: tag_id: Numeric ID of the tag Returns: List of file info dictionaries with path, size, content_type, etc. """ # Use WebDAV REPORT method with systemtag filter, requesting all properties report_body = f"""<?xml version="1.0"?> <oc:filter-files xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns"> <d:prop> <oc:fileid/> <d:displayname/> <d:getcontentlength/> <d:getcontenttype/> <d:getlastmodified/> <d:getetag/> </d:prop> <oc:filter-rules> <oc:systemtag>{tag_id}</oc:systemtag> </oc:filter-rules> </oc:filter-files>""" response = await self._client.request( "REPORT", f"{self._get_webdav_base_path()}/", content=report_body, ) response.raise_for_status() # Parse XML response root = ET.fromstring(response.content) ns = { "d": "DAV:", "oc": "http://owncloud.org/ns", } files = [] for response_elem in root.findall("d:response", ns): # Extract href (file path) href_elem = response_elem.find("d:href", ns) if href_elem is None or not href_elem.text: continue propstat = response_elem.find("d:propstat", ns) if propstat is None: continue prop = propstat.find("d:prop", ns) if prop is None: continue # Extract all properties fileid_elem = prop.find("oc:fileid", ns) displayname_elem = prop.find("d:displayname", ns) contentlength_elem = prop.find("d:getcontentlength", ns) contenttype_elem = prop.find("d:getcontenttype", ns) lastmodified_elem = prop.find("d:getlastmodified", ns) etag_elem = prop.find("d:getetag", ns) if fileid_elem is None or not fileid_elem.text: continue # Decode href path and extract the file path from urllib.parse import unquote href_path = unquote(href_elem.text) # Remove WebDAV prefix to get user-relative path webdav_prefix = f"/remote.php/dav/files/{self.username}/" file_path = href_path.replace(webdav_prefix, "/") # Parse last modified timestamp last_modified_timestamp = None if lastmodified_elem is not None and lastmodified_elem.text: from email.utils import parsedate_to_datetime try: dt = parsedate_to_datetime(lastmodified_elem.text) last_modified_timestamp = int(dt.timestamp()) except Exception: pass file_info = { "id": int(fileid_elem.text), "path": file_path, "name": displayname_elem.text if displayname_elem is not None else file_path.split("/")[-1], "size": int(contentlength_elem.text) if contentlength_elem is not None and contentlength_elem.text else 0, "content_type": contenttype_elem.text if contenttype_elem is not None else "", "last_modified": lastmodified_elem.text if lastmodified_elem is not None else None, "last_modified_timestamp": last_modified_timestamp, "etag": etag_elem.text if etag_elem is not None else None, } files.append(file_info) logger.debug(f"Found {len(files)} files with tag ID {tag_id}") return files async def get_file_info(self, path: str) -> dict[str, Any] | None: """Get file info including file ID via WebDAV PROPFIND. Args: path: Path to the file (relative to user's files directory) Returns: File info dictionary with id, name, size, content_type, etc. Returns None if file not found. """ webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" propfind_body = """<?xml version="1.0"?> <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"> <d:prop> <oc:fileid/> <d:displayname/> <d:getcontentlength/> <d:getcontenttype/> <d:getlastmodified/> <d:getetag/> <d:resourcetype/> </d:prop> </d:propfind>""" try: response = await self._client.request( "PROPFIND", webdav_path, headers={"Depth": "0"}, content=propfind_body, ) response.raise_for_status() except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"File not found: {path}") return None raise # Parse XML response root = ET.fromstring(response.content) ns = { "d": "DAV:", "oc": "http://owncloud.org/ns", } response_elem = root.find("d:response", ns) if response_elem is None: return None propstat = response_elem.find("d:propstat", ns) if propstat is None: return None prop = propstat.find("d:prop", ns) if prop is None: return None # Extract properties fileid_elem = prop.find("oc:fileid", ns) displayname_elem = prop.find("d:displayname", ns) contentlength_elem = prop.find("d:getcontentlength", ns) contenttype_elem = prop.find("d:getcontenttype", ns) lastmodified_elem = prop.find("d:getlastmodified", ns) etag_elem = prop.find("d:getetag", ns) resourcetype_elem = prop.find("d:resourcetype", ns) is_directory = ( resourcetype_elem is not None and resourcetype_elem.find("d:collection", ns) is not None ) file_info = { "id": int(fileid_elem.text) if fileid_elem is not None and fileid_elem.text is not None else None, "path": path, "name": displayname_elem.text if displayname_elem is not None else path.split("/")[-1], "size": int(contentlength_elem.text) if contentlength_elem is not None and contentlength_elem.text else 0, "content_type": contenttype_elem.text if contenttype_elem is not None else "", "last_modified": lastmodified_elem.text if lastmodified_elem is not None else None, "etag": etag_elem.text.strip('"') if etag_elem is not None and etag_elem.text else None, "is_directory": is_directory, } logger.debug(f"Got file info for '{path}': id={file_info['id']}") return file_info async def create_tag( self, name: str, user_visible: bool = True, user_assignable: bool = True, ) -> dict[str, Any]: """Create a system tag via WebDAV. Args: name: Name of the tag to create user_visible: Whether the tag is visible to users user_assignable: Whether users can assign this tag Returns: Tag dictionary with id, name, userVisible, userAssignable Raises: HTTPStatusError: If tag creation fails (409 if already exists) """ # Use WebDAV POST with JSON body to create tag response = await self._client.post( "/remote.php/dav/systemtags/", headers={"Content-Type": "application/json"}, json={ "name": name, "userVisible": user_visible, "userAssignable": user_assignable, }, ) response.raise_for_status() # Extract tag ID from Content-Location header (e.g., /remote.php/dav/systemtags/42) content_location = response.headers.get("Content-Location", "") tag_id = None if content_location: # Extract the numeric ID from the path try: tag_id = int(content_location.rstrip("/").split("/")[-1]) except (ValueError, IndexError): pass tag_info = { "id": tag_id, "name": name, "userVisible": user_visible, "userAssignable": user_assignable, } logger.info(f"Created tag '{name}' with ID {tag_info['id']}") return tag_info async def get_or_create_tag( self, name: str, user_visible: bool = True, user_assignable: bool = True, ) -> dict[str, Any]: """Get a tag by name, creating it if it doesn't exist. Args: name: Name of the tag user_visible: Whether the tag is visible to users (for creation) user_assignable: Whether users can assign this tag (for creation) Returns: Tag dictionary with id, name, userVisible, userAssignable """ # First try to get existing tag existing_tag = await self.get_tag_by_name(name) if existing_tag: logger.debug(f"Tag '{name}' already exists with ID {existing_tag['id']}") return existing_tag # Create new tag try: return await self.create_tag(name, user_visible, user_assignable) except HTTPStatusError as e: if e.response.status_code == 409: # Tag was created between our check and creation, fetch it existing_tag = await self.get_tag_by_name(name) if existing_tag: return existing_tag raise async def assign_tag_to_file(self, file_id: int, tag_id: int) -> bool: """Assign a system tag to a file. Args: file_id: Numeric file ID tag_id: Numeric tag ID Returns: True if tag was assigned successfully (or already assigned) Raises: HTTPStatusError: If tag assignment fails """ response = await self._client.request( "PUT", f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}", headers={"Content-Length": "0"}, content=b"", ) # 201 = Created (new assignment), 409 = Conflict (already assigned) if response.status_code in (201, 409): logger.info(f"Tagged file {file_id} with tag {tag_id}") return True response.raise_for_status() return True async def remove_tag_from_file(self, file_id: int, tag_id: int) -> bool: """Remove a system tag from a file. Args: file_id: Numeric file ID tag_id: Numeric tag ID Returns: True if tag was removed successfully (or wasn't assigned) Raises: HTTPStatusError: If tag removal fails """ response = await self._client.request( "DELETE", f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}", ) # 204 = No Content (removed), 404 = Not Found (wasn't assigned) if response.status_code in (204, 404): logger.info(f"Removed tag {tag_id} from file {file_id}") return True response.raise_for_status() return True

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cbcoutinho/nextcloud-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server