"""MCP server for Hadoop Hue REST API.
This server exposes HueClientRest functionality as MCP tools for:
- Executing SQL queries (Hive, SparkSQL, Impala)
- Managing HDFS files (list, upload, download)
- Exporting query results to CSV
"""
import os
from typing import Any, List, Optional, Union
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
# Create the MCP server
mcp = FastMCP(
"Hue MCP Server",
instructions="""MCP server for interacting with Hadoop Hue REST API.
This server allows you to:
- Execute SQL queries using Hive, SparkSQL, or Impala dialects
- List, upload, and download files from HDFS
- Export query results to CSV files
- Check directory existence and manage file operations
Required environment variables:
- # The code you provided is not complete and does not perform any specific action. It seems to be a
# comment in Python code that mentions a variable `HUE_HOST` but does not contain any executable
# code. Comments in Python are denoted by the `#` symbol and are used to provide explanations or
# notes within the code without affecting the program's functionality.
- HUE_HOST: Hue server URL (e.g., https://hue.example.com)
- HUE_USERNAME: Username for Hue authentication
- HUE_PASSWORD: Password for Hue authentication
""",
)
# Global client instance (lazy initialization)
_client = None
def get_client():
"""Get or create the HueClientREST instance with lazy login."""
global _client
# Import here to allow server to start even if hueclientrest is not installed
from hueclientrest import HueClientREST
if _client is None:
host = os.environ.get("HUE_HOST")
username = os.environ.get("HUE_USERNAME")
password = os.environ.get("HUE_PASSWORD")
if not all([host, username, password]):
raise ValueError(
"Missing required environment variables. "
"Please set HUE_HOST, HUE_USERNAME, and HUE_PASSWORD."
)
verify_ssl = os.environ.get("HUE_VERIFY_SSL", "true").lower() == "true"
ssl_warnings = os.environ.get("HUE_SSL_WARNINGS", "false").lower() == "true"
_client = HueClientREST(
host=host,
username=username,
password=password,
verify_ssl=verify_ssl,
ssl_warnings=ssl_warnings,
)
return _client
def ensure_logged_in(client) -> None:
"""Ensure the client is logged in, performing login if needed."""
# HueClientREST handles token management internally via login()
# We call login() which will authenticate if needed
client.login()
# =============================================================================
# Pydantic Models for structured responses
# =============================================================================
class QueryResult(BaseModel):
"""Result of a SQL query execution."""
headers: List[str] = Field(description="Column headers from the query result")
rows: List[List[Union[str, int, float, bool, None]]] = Field(description="Query result rows as nested lists")
row_count: int = Field(description="Total number of rows returned")
class FileInfo(BaseModel):
"""Information about a file or directory in HDFS."""
name: str = Field(description="File or directory name")
path: str = Field(description="Full path in HDFS")
type: str = Field(description="Type: 'file' or 'directory'")
size: Optional[int] = Field(default=None, description="File size in bytes (if file)")
mtime: Optional[str] = Field(default=None, description="Last modification time")
class DirectoryListing(BaseModel):
"""Result of listing a directory."""
path: str = Field(description="The directory path that was listed")
items: List[FileInfo] = Field(description="List of files and directories")
total_count: int = Field(description="Total number of items")
class OperationResult(BaseModel):
"""Result of a file operation."""
success: bool = Field(description="Whether the operation succeeded")
message: str = Field(description="Status message")
path: Optional[str] = Field(default=None, description="Relevant path for the operation")
# =============================================================================
# MCP Tools - SQL Query Execution
# =============================================================================
@mcp.tool()
def hue_execute_query(
statement: str,
dialect: str = "hive",
timeout: int = 300,
batch_size: int = 1000,
) -> QueryResult:
"""Execute a SQL query on Hue and return the results.
This tool executes a SQL statement, waits for completion, and fetches all results.
Use this for SELECT queries where you want to retrieve data.
Args:
statement: The SQL statement to execute (e.g., "SELECT * FROM table LIMIT 100")
dialect: SQL dialect to use - 'hive', 'sparksql', or 'impala' (default: 'hive')
timeout: Maximum time to wait for query completion in seconds (default: 300)
batch_size: Number of rows to fetch per batch for pagination (default: 1000)
Returns:
QueryResult with headers, rows, and row_count
"""
try:
client = get_client()
ensure_logged_in(client)
# Execute the query
operation_id = client.execute(statement, dialect=dialect)
# Wait for completion
client.wait(operation_id, timeout=timeout)
# Fetch all results
headers, rows = client.fetch_all(operation_id, batch_size=batch_size)
return QueryResult(
headers=headers,
rows=rows,
row_count=len(rows),
)
except Exception as e:
raise RuntimeError(f"Query execution failed: {e}") from e
@mcp.tool()
def hue_run_query_to_csv(
statement: str,
filename: str = "results.csv",
dialect: str = "hive",
batch_size: int = 1000,
) -> OperationResult:
"""Execute a SQL query and save results directly to a CSV file.
This is a convenience method that combines query execution with CSV export.
Ideal for exporting large result sets to files.
Args:
statement: The SQL statement to execute
filename: Output CSV filename (default: 'results.csv')
dialect: SQL dialect - 'hive', 'sparksql', or 'impala' (default: 'hive')
batch_size: Number of rows to fetch per batch (default: 1000)
Returns:
OperationResult indicating success and the output filename
"""
try:
client = get_client()
ensure_logged_in(client)
# Use the run() method which handles execute, wait, fetch, and save
client.run(
statement=statement,
dialect=dialect,
filename=filename,
batch_size=batch_size,
)
return OperationResult(
success=True,
message=f"Query results saved to {filename}",
path=filename,
)
except Exception as e:
raise RuntimeError(f"Query to CSV failed: {e}") from e
@mcp.tool()
def hue_export_and_download(
statement: str,
hdfs_directory: str,
local_directory: str = ".",
dialect: str = "hive",
file_pattern: Optional[str] = None,
timeout: int = 300,
) -> OperationResult:
"""Execute an INSERT OVERWRITE DIRECTORY query and download the results.
This tool is for queries that write output to HDFS (like INSERT OVERWRITE DIRECTORY),
then downloads the resulting files to the local filesystem.
Args:
statement: SQL statement with INSERT OVERWRITE DIRECTORY
hdfs_directory: The HDFS directory where results are written
local_directory: Local directory to download files to (default: '.')
dialect: SQL dialect - 'hive', 'sparksql', or 'impala' (default: 'hive')
file_pattern: Optional regex pattern to filter files to download
timeout: Maximum wait time in seconds (default: 300)
Returns:
OperationResult with list of downloaded files in message
"""
try:
client = get_client()
ensure_logged_in(client)
downloaded_files = client.run_and_download(
statement=statement,
directory_path=hdfs_directory,
local_dir=local_directory,
dialect=dialect,
file_pattern=file_pattern,
timeout=timeout,
)
return OperationResult(
success=True,
message=f"Downloaded {len(downloaded_files)} files: {', '.join(downloaded_files)}",
path=local_directory,
)
except Exception as e:
raise RuntimeError(f"Export and download failed: {e}") from e
# =============================================================================
# MCP Tools - HDFS File Operations
# =============================================================================
@mcp.tool()
def hue_list_directory(
directory_path: str,
page_size: int = 1000,
) -> DirectoryListing:
"""List files and directories in an HDFS path.
Use this to browse the contents of HDFS directories.
Args:
directory_path: The HDFS directory path (e.g., '/user/data', '/tmp')
page_size: Maximum number of items to return (default: 1000)
Returns:
DirectoryListing with path, items, and total_count
"""
try:
client = get_client()
ensure_logged_in(client)
files = client.list_directory(directory_path, pagesize=page_size)
items = [
FileInfo(
name=f.get("name", ""),
path=f.get("path", ""),
type=f.get("type", "unknown"),
size=f.get("size"),
mtime=f.get("mtime"),
)
for f in files
]
return DirectoryListing(
path=directory_path,
items=items,
total_count=len(items),
)
except Exception as e:
raise RuntimeError(f"Failed to list directory: {e}") from e
@mcp.tool()
def hue_check_directory_exists(directory_path: str) -> bool:
"""Check if a directory exists in HDFS.
Args:
directory_path: The HDFS directory path to check
Returns:
True if the directory exists, False otherwise
"""
try:
client = get_client()
ensure_logged_in(client)
return client.check_directory_exists(directory_path)
except Exception as e:
raise RuntimeError(f"Failed to check directory: {e}") from e
@mcp.tool()
def hue_download_file(
remote_path: str,
local_filename: Optional[str] = None,
) -> OperationResult:
"""Download a single file from HDFS.
Args:
remote_path: The full path to the file in HDFS
local_filename: Local filename to save as (optional, defaults to original name)
Returns:
OperationResult with the local filename where file was saved
"""
try:
client = get_client()
ensure_logged_in(client)
saved_filename = client.download_file(remote_path, local_filename)
return OperationResult(
success=True,
message=f"File downloaded successfully",
path=saved_filename,
)
except Exception as e:
raise RuntimeError(f"Failed to download file: {e}") from e
@mcp.tool()
def hue_download_directory(
directory_path: str,
local_directory: str = ".",
file_pattern: Optional[str] = None,
) -> OperationResult:
"""Download all files from an HDFS directory.
Args:
directory_path: The HDFS directory path to download from
local_directory: Local directory to save files to (default: '.')
file_pattern: Optional regex pattern to filter files (e.g., '.*\\.csv')
Returns:
OperationResult with list of downloaded files
"""
try:
client = get_client()
ensure_logged_in(client)
downloaded_files = client.download_directory_files(
directory_path=directory_path,
local_dir=local_directory,
file_pattern=file_pattern,
)
return OperationResult(
success=True,
message=f"Downloaded {len(downloaded_files)} files: {', '.join(downloaded_files)}",
path=local_directory,
)
except Exception as e:
raise RuntimeError(f"Failed to download directory: {e}") from e
@mcp.tool()
def hue_upload_file(
local_file_path: str,
hdfs_destination: str,
) -> OperationResult:
"""Upload a local file to HDFS.
Args:
local_file_path: Path to the local file to upload
hdfs_destination: Destination directory in HDFS
Returns:
OperationResult indicating success
"""
try:
client = get_client()
ensure_logged_in(client)
result = client.upload_file(hdfs_destination, local_file_path)
return OperationResult(
success=True,
message=f"File uploaded successfully to {hdfs_destination}",
path=hdfs_destination,
)
except Exception as e:
raise RuntimeError(f"Failed to upload file: {e}") from e
# =============================================================================
# Main entry point
# =============================================================================
def main():
"""Run the MCP server."""
mcp.run()
if __name__ == "__main__":
main()