Skip to main content
Glama

Server Configuration

Describes the environment variables required to run the server.

NameRequiredDescriptionDefault
FABRIC_SCOPESNoOAuth scopes for Microsoft Fabrichttps://api.fabric.microsoft.com/.default
MCP_LOG_LEVELNoLogging level for the MCP serverINFO
AZURE_CLIENT_IDNoThe client ID for Azure service principal authentication.
AZURE_LOG_LEVELNoAzure SDK logging levelinfo
AZURE_TENANT_IDNoThe tenant ID for Azure service principal authentication.
FABRIC_BASE_URLNoFabric API base URLhttps://api.fabric.microsoft.com/v1
MCP_SERVER_NAMENoServer name for MCPms-fabric-mcp-server
FABRIC_MAX_RETRIESNoMax retry attempts for API calls3
LIVY_POLL_INTERVALNoLivy polling interval in seconds2.0
AZURE_CLIENT_SECRETNoThe client secret for Azure service principal authentication.
FABRIC_RETRY_BACKOFFNoBackoff factor for retries2.0
LIVY_API_CALL_TIMEOUTNoLivy API timeout in seconds120
FABRIC_API_CALL_TIMEOUTNoFabric API timeout in seconds30
LIVY_SESSION_WAIT_TIMEOUTNoLivy session wait timeout in seconds240
LIVY_STATEMENT_WAIT_TIMEOUTNoLivy statement wait timeout in seconds10

Capabilities

Features and capabilities supported by this server

CapabilityDetails
tools
{
  "listChanged": true
}
prompts
{
  "listChanged": false
}
resources
{
  "subscribe": false,
  "listChanged": false
}
experimental
{}

Tools

Functions exposed to the LLM to take actions

NameDescription
list_workspaces

List all accessible Fabric workspaces.

Returns a list of all workspaces the authenticated user has access to, including workspace ID, name, description, type, state, and capacity ID.

Parameters: None

Returns: Dictionary with status, workspace_count, and list of workspaces. Each workspace contains: id, display_name, description, type, state, capacity_id.

Example: python result = list_workspaces()

list_items

List all items in a Fabric workspace, optionally filtered by type.

Returns all items in the specified workspace. If item_type is provided, only items of that type are returned. Supported types include: Notebook, Lakehouse, Warehouse, Pipeline, DataPipeline, Report, SemanticModel, Dashboard, Dataflow, Dataset, and 40+ other Fabric item types.

Parameters: workspace_name: The display name of the workspace. item_type: Optional item type filter (e.g., "Notebook", "Lakehouse"). If not provided, all items are returned.

Returns: Dictionary with status, workspace_name, item_type_filter, item_count, and list of items. Each item contains: id, display_name, type, description, created_date, modified_date.

Example: ```python # List all items result = list_items("My Workspace")

# List only notebooks result = list_items("My Workspace", item_type="Notebook") ```
delete_item

Delete an item from a Fabric workspace.

Deletes the specified item from the workspace. The item is identified by its display name and type. Common item types include: Notebook, Lakehouse, Warehouse, Pipeline, Report, SemanticModel, Dashboard, etc.

Parameters: workspace_name: The display name of the workspace. item_display_name: Name of the item to delete. item_type: Type of the item to delete (e.g., "Notebook", "Lakehouse"). Supported types: Notebook, Lakehouse, Warehouse, Pipeline, DataPipeline, Report, SemanticModel, Dashboard, Dataflow, Dataset.

Returns: Dictionary with status and success/error message.

Example: python result = delete_item( workspace_name="My Workspace", item_display_name="Old Notebook", item_type="Notebook" )

import_notebook_to_fabric

Upload a local .ipynb into a Fabric workspace identified by name.

Imports a Jupyter notebook from the local filesystem into a Microsoft Fabric workspace. The notebook file must be in .ipynb format. The notebook can be organized into folders using forward slashes in the display name (e.g., "demos/hello_world").

Parameters: workspace_name: The display name of the target workspace (case-sensitive as shown in Fabric). notebook_display_name: Desired name (optionally with folders, e.g. "demos/hello_world") inside Fabric. local_notebook_path: Path to the notebook file (absolute or repo-relative). description: Optional description for the notebook.

Returns: Dictionary with status, message, and artifact_id if successful.

Example: python result = import_notebook_to_fabric( workspace_name="My Workspace", notebook_display_name="analysis/customer_analysis", local_notebook_path="notebooks/customer_analysis.ipynb", description="Customer behavior analysis notebook" )

get_notebook_content

Get the content and definition of a notebook.

Retrieves the full notebook definition including all cells, metadata, and configuration from a Fabric workspace. The content is returned as a dictionary matching the Jupyter notebook format.

Parameters: workspace_name: The display name of the workspace. notebook_display_name: The name of the notebook.

Returns: Dictionary with status, workspace_name, notebook_name, and notebook definition. The definition contains the full notebook structure including cells, metadata, etc.

Example: ```python result = get_notebook_content( workspace_name="My Workspace", notebook_display_name="analysis/customer_analysis" )

if result["status"] == "success": definition = result["definition"] # Access notebook cells, metadata, etc. ```
attach_lakehouse_to_notebook

Attach a default lakehouse to a notebook in Microsoft Fabric.

Updates the notebook definition to set a default lakehouse. This lakehouse will be automatically mounted when the notebook runs, providing seamless access to the lakehouse tables and files without additional configuration.

Use this tool when:

  • Setting up a new notebook with a lakehouse connection

  • Changing the default lakehouse for an existing notebook

  • Ensuring notebook code can access lakehouse tables via spark.read

Parameters: workspace_name: The display name of the workspace containing the notebook. notebook_name: Name of the notebook to update. lakehouse_name: Name of the lakehouse to attach as default. lakehouse_workspace_name: Optional workspace name for the lakehouse. If not provided, uses the same workspace as the notebook.

Returns: Dictionary with status, message, notebook_id, notebook_name, lakehouse_id, lakehouse_name, and workspace_id.

Example: ```python # Attach lakehouse in same workspace result = attach_lakehouse_to_notebook( workspace_name="Analytics Workspace", notebook_name="Data_Processing", lakehouse_name="Bronze_Lakehouse" )

# Attach lakehouse from different workspace result = attach_lakehouse_to_notebook( workspace_name="Analytics Workspace", notebook_name="Data_Processing", lakehouse_name="Shared_Lakehouse", lakehouse_workspace_name="Shared Resources" ) if result["status"] == "success": print(f"Lakehouse {result['lakehouse_name']} attached successfully!") ```
get_notebook_execution_details

Get detailed execution information for a notebook run by job instance ID.

Retrieves execution metadata from the Fabric Notebook Livy Sessions API, which provides detailed timing, resource usage, and execution state information.

Use this tool when:

  • You want to check the status and timing of a completed notebook run

  • You need to verify resource allocation for a notebook execution

  • You want to analyze execution performance (queue time, run time)

Note: This method returns execution metadata (timing, state, resource usage). Cell-level outputs are only available for active sessions. Once a notebook job completes, individual cell outputs cannot be retrieved via the REST API. To capture cell outputs, use mssparkutils.notebook.exit() in your notebook and access the exitValue through Data Pipeline activities.

Parameters: workspace_name: The display name of the workspace containing the notebook. notebook_name: Name of the notebook. job_instance_id: The job instance ID from execute_notebook or run_on_demand_job result.

Returns: Dictionary with: - status: "success" or "error" - message: Description of the result - session: Full Livy session details (state, timing, resources) - execution_summary: Summarized execution information including: - state: Execution state (Success, Failed, Cancelled, etc.) - spark_application_id: Spark application identifier - queued_duration_seconds: Time spent in queue - running_duration_seconds: Actual execution time - total_duration_seconds: Total end-to-end time - driver_memory, driver_cores, executor_memory, etc.

Example: ```python # After executing a notebook exec_result = run_on_demand_job( workspace_name="Analytics", item_name="ETL_Pipeline", item_type="Notebook", job_type="RunNotebook" )

# Get detailed execution information details = get_notebook_execution_details( workspace_name="Analytics", notebook_name="ETL_Pipeline", job_instance_id=exec_result["job_instance_id"] ) if details["status"] == "success": summary = details["execution_summary"] print(f"State: {summary['state']}") print(f"Duration: {summary['total_duration_seconds']}s") print(f"Spark App ID: {summary['spark_application_id']}") ```
list_notebook_executions

List all Livy sessions (execution history) for a notebook.

Retrieves a list of all Livy sessions associated with a notebook, providing an execution history with job instance IDs, states, and timing information.

Use this tool when:

  • You want to see the execution history of a notebook

  • You need to find a job instance ID for a past execution

  • You want to analyze execution patterns over time

Parameters: workspace_name: The display name of the workspace containing the notebook. notebook_name: Name of the notebook. limit: Optional maximum number of sessions to return.

Returns: Dictionary with: - status: "success" or "error" - message: Description of the result - sessions: List of session summaries, each containing: - job_instance_id: Unique identifier for the job - livy_id: Livy session identifier - state: Execution state (Success, Failed, Cancelled, etc.) - operation_name: Type of operation (Notebook Scheduled Run, etc.) - spark_application_id: Spark application identifier - submitted_time_utc: When the job was submitted - start_time_utc: When execution started - end_time_utc: When execution ended - total_duration_seconds: Total execution time - total_count: Total number of sessions found

Example: ```python history = list_notebook_executions( workspace_name="Analytics", notebook_name="ETL_Pipeline", limit=10 )

if history["status"] == "success": print(f"Found {history['total_count']} executions") for session in history["sessions"]: print(f"{session['job_instance_id']}: {session['state']}") ```
get_notebook_driver_logs

Get Spark driver logs for a notebook execution.

Retrieves the driver logs (stdout or stderr) from a completed notebook run. This is particularly useful for getting detailed error messages and Python tracebacks when a notebook fails.

Important Notes:

  • Python exceptions and tracebacks appear in stdout, not stderr

  • stderr contains Spark/system logs (typically larger)

  • For failed notebooks, check stdout first for the Python error

  • Look for "Error", "Exception", "Traceback" in the output

Use this tool when:

  • A notebook execution failed and you need to see the Python error

  • You want to debug notebook issues by examining driver logs

  • You need to analyze Spark driver behavior (stderr)

Parameters: workspace_name: The display name of the workspace containing the notebook. notebook_name: Name of the notebook. job_instance_id: The job instance ID from execute_notebook or run_on_demand_job result. log_type: Type of log to retrieve - "stdout" (default) or "stderr". Use "stdout" for Python errors and print statements. Use "stderr" for Spark/system logs. max_lines: Maximum number of lines to return (default: 500, None for all). Returns the last N lines (most recent, where errors typically are).

Returns: Dictionary with: - status: "success" or "error" - message: Description of the result - log_type: Type of log retrieved - log_content: The actual log content as a string - log_size_bytes: Total size of the log file - truncated: Whether the log was truncated - spark_application_id: The Spark application ID - livy_id: The Livy session ID

Example: ```python # Get Python error from a failed notebook result = get_notebook_driver_logs( workspace_name="Analytics", notebook_name="ETL_Pipeline", job_instance_id="12345678-1234-1234-1234-123456789abc", log_type="stdout" # Python errors are in stdout! )

if result["status"] == "success": print(result["log_content"]) # Output will include Python traceback like: # ZeroDivisionError: division by zero # Traceback (most recent call last): # Cell In[11], line 2 # result = x / 0 ```
run_on_demand_job

Run an on-demand job for a Fabric item.

Executes a job for the specified item. Common job types include:

  • RunNotebook: Execute a notebook

  • Pipeline: Run a data pipeline

  • DefaultJob: Default job type for the item

The job runs asynchronously. Use get_job_status or get_job_status_by_url to check the job's progress and result.

Parameters: workspace_name: The display name of the workspace. item_name: Name of the item to run job for. item_type: Type of the item (Notebook, Pipeline, Lakehouse, Warehouse, etc.). job_type: Type of job to run (RunNotebook, DefaultJob, Pipeline, etc.). execution_data: Optional execution data payload for the job (e.g., notebook parameters).

Returns: Dictionary with status, message, job_instance_id, location_url, and retry_after.

Example: ```python # Run a notebook result = run_on_demand_job( workspace_name="My Workspace", item_name="analysis_notebook", item_type="Notebook", job_type="RunNotebook", execution_data={"parameters": {"start_date": "2025-01-01"}} )

# Use the location URL to check status job_status = get_job_status_by_url(result["location_url"]) ```
get_job_status

Get status of a specific job instance.

Retrieves the current status and details of a running or completed job. The job state includes: NotStarted, InProgress, Completed, Failed, Cancelled.

Parameters: workspace_name: The display name of the workspace. item_name: Name of the item. item_type: Type of the item (Notebook, Pipeline, etc.). job_instance_id: ID of the job instance to check.

Returns: Dictionary with status, message, and job details including: - job_instance_id, item_id, job_type, job_status - invoke_type, root_activity_id, start_time_utc, end_time_utc - failure_reason (if failed) - is_terminal, is_successful, is_failed, is_running flags

Example: ```python result = get_job_status( workspace_name="My Workspace", item_name="analysis_notebook", item_type="Notebook", job_instance_id="12345678-1234-1234-1234-123456789abc" )

if result["job"]["is_terminal"]: if result["job"]["is_successful"]: print("Job completed successfully!") else: print(f"Job failed: {result['job']['failure_reason']}") ```
get_job_status_by_url

Get job status using the location URL from run_on_demand_job.

Retrieves job status using the location URL returned when the job was created. This is convenient when you have the location URL but not the individual workspace/item/job identifiers.

Parameters: location_url: The location URL returned from job creation.

Returns: Dictionary with status, message, and job details (same structure as get_job_status).

Example: ```python # Start a job start_result = run_on_demand_job(...)

# Check status using the location URL status_result = get_job_status_by_url(start_result["location_url"]) ```
get_operation_result

Get the result of a long-running operation.

Retrieves the result of an asynchronous operation using its operation ID. Operation IDs are typically returned in the x-ms-operation-id header from API calls that return 202 Accepted responses.

Parameters: operation_id: The operation ID (from x-ms-operation-id header).

Returns: Dictionary with status, operation_id, message, and operation result.

Example: ```python result = get_operation_result("12345678-1234-1234-1234-123456789abc")

if result["status"] == "success": operation_result = result["result"] # Process operation result ```
livy_create_session

Create a new Livy session for Spark code execution.

Creates a Spark session for executing PySpark, Scala, or SparkR code. Session creation can take 6+ minutes on first startup as Spark initializes. It's recommended to keep with_wait=True to ensure the session is ready before use.

Parameters: workspace_id: Fabric workspace ID (use list_workspaces tool to find by name). lakehouse_id: Fabric lakehouse ID (use list_items tool with item_type="Lakehouse"). environment_id: Optional Fabric environment ID for pre-installed libraries. kind: Session kind - 'pyspark' (default), 'scala', or 'sparkr'. conf: Optional Spark configuration as key-value pairs (e.g., {"spark.executor.memory": "4g"}). with_wait: If True (default), wait for session to become available before returning. timeout_seconds: Maximum time to wait for session availability (default: from config).

Returns: Dictionary with session details including id, state, kind, appId, appInfo, and log.

Example: ```python # Create a PySpark session result = livy_create_session( workspace_id="12345678-1234-1234-1234-123456789abc", lakehouse_id="87654321-4321-4321-4321-210987654321", kind="pyspark", with_wait=True )

if result.get("state") == "idle": session_id = result["id"] # Session is ready to execute code ```
livy_list_sessions

List all Livy sessions in a workspace/lakehouse.

Retrieves all active Livy sessions for the specified workspace and lakehouse, including session IDs, states, and configuration details.

Parameters: workspace_id: Fabric workspace ID. lakehouse_id: Fabric lakehouse ID.

Returns: Dictionary with sessions list containing id, state, kind, appId, and other details.

Example: ```python result = livy_list_sessions( workspace_id="12345678-1234-1234-1234-123456789abc", lakehouse_id="87654321-4321-4321-4321-210987654321" )

for session in result.get("sessions", []): print(f"Session {session['id']}: {session['state']}") ```
livy_get_session_status

Get the current status and details of a Livy session.

Retrieves detailed information about a session including its state, Spark application details, and configuration. Use this to check session health and readiness.

Session States:

  • 'not_started': Session created but not yet started

  • 'starting': Session is initializing

  • 'idle': Session is ready to accept statements

  • 'busy': Session is currently executing a statement

  • 'shutting_down': Session is terminating

  • 'error': Session encountered an error

  • 'dead': Session has terminated

  • 'killed': Session was forcefully terminated

  • 'success': Session completed successfully

Parameters: workspace_id: Fabric workspace ID. lakehouse_id: Fabric lakehouse ID. session_id: Livy session ID to check.

Returns: Dictionary with session status including state, appId, appInfo, kind, and log.

Example: ```python result = livy_get_session_status( workspace_id="12345678-1234-1234-1234-123456789abc", lakehouse_id="87654321-4321-4321-4321-210987654321", session_id="0" )

if result.get("state") == "idle": # Session is ready to execute code pass elif result.get("state") == "busy": # Session is executing a statement pass ```
livy_close_session

Close (terminate) a Livy session.

Terminates the specified Livy session and releases its resources. Any running statements will be cancelled.

Parameters: workspace_id: Fabric workspace ID. lakehouse_id: Fabric lakehouse ID. session_id: Livy session ID to close.

Returns: Dictionary with success/error status and message.

Example: python result = livy_close_session( workspace_id="12345678-1234-1234-1234-123456789abc", lakehouse_id="87654321-4321-4321-4321-210987654321", session_id="0" )

livy_run_statement

Execute code in a Livy session.

Executes PySpark, Scala, or SparkR code in an existing Livy session. The session must be in 'idle' state to accept new statements.

Important Notes:

  • Use df.show() or df.printSchema() to inspect DataFrames before accessing columns

  • SHOW TABLES returns 'namespace' column, not 'database' in Fabric

  • Avoid direct Row attribute access without schema verification

  • When with_wait=False, returns immediately with statement ID - check status separately

Parameters: workspace_id: Fabric workspace ID. lakehouse_id: Fabric lakehouse ID. session_id: Livy session ID (must be in 'idle' state). code: Code to execute (PySpark, Scala, or SparkR). kind: Statement kind - 'pyspark' (default), 'scala', or 'sparkr'. with_wait: If True (default), wait for statement completion before returning. timeout_seconds: Maximum time to wait for statement completion (default: from config).

Returns: Dictionary with statement details including id, state, output, and execution details.

Example: ```python # Execute PySpark code result = livy_run_statement( workspace_id="12345678-1234-1234-1234-123456789abc", lakehouse_id="87654321-4321-4321-4321-210987654321", session_id="0", code="df = spark.range(10)\ndf.count()", kind="pyspark", with_wait=True )

if result.get("state") == "available": output = result.get("output", {}) if output.get("status") == "ok": print(f"Result: {output.get('data', {}).get('text/plain')}") ```
livy_get_statement_status

Get the current status and output of a Livy statement.

Retrieves the status, output, and execution details of a statement. Use this for manual status checking without auto-polling.

Statement States:

  • 'waiting': Statement is queued for execution

  • 'running': Statement is currently executing

  • 'available': Statement completed successfully

  • 'error': Statement encountered an error

  • 'cancelling': Statement is being cancelled

  • 'cancelled': Statement was cancelled

Parameters: workspace_id: Fabric workspace ID. lakehouse_id: Fabric lakehouse ID. session_id: Livy session ID. statement_id: Statement ID to check.

Returns: Dictionary with statement status including id, state, output, and code. Output field contains execution results when state is 'available'.

Example: ```python result = livy_get_statement_status( workspace_id="12345678-1234-1234-1234-123456789abc", lakehouse_id="87654321-4321-4321-4321-210987654321", session_id="0", statement_id="1" )

if result.get("state") == "available": output = result.get("output", {}) print(f"Status: {output.get('status')}") print(f"Result: {output.get('data', {}).get('text/plain')}") ```
livy_cancel_statement

Cancel a running Livy statement without killing the session.

Cancels a statement that is currently 'waiting' or 'running'. The statement will transition to 'cancelling' then 'cancelled' state. The session remains available for new statements.

Note: Only works on statements in 'waiting' or 'running' state.

Parameters: workspace_id: Fabric workspace ID. lakehouse_id: Fabric lakehouse ID. session_id: Livy session ID. statement_id: Statement ID to cancel.

Returns: Dictionary with cancellation result (typically {"msg": "canceled"}).

Example: python result = livy_cancel_statement( workspace_id="12345678-1234-1234-1234-123456789abc", lakehouse_id="87654321-4321-4321-4321-210987654321", session_id="0", statement_id="1" )

livy_get_session_log

Fetch incremental Livy driver logs for a session.

Retrieves Spark driver logs for debugging session startup issues or statement problems. Supports incremental reads with start/size parameters for paging through logs.

Use Cases:

  • Debugging session startup issues

  • Troubleshooting failed statements

  • Investigating Spark driver problems

  • Monitoring session health

Note: Returns driver-side logs only, not executor logs.

Parameters: workspace_id: Fabric workspace ID. lakehouse_id: Fabric lakehouse ID. session_id: Livy session ID. start: Starting log line index (default: 0). size: Number of log lines to retrieve (default: 500).

Returns: Dictionary with log content and metadata: {"status": "success", "log_content": "", "log_size_bytes": , "offset": , "size": }.

Example: ```python # Get first 100 log lines result = livy_get_session_log( workspace_id="12345678-1234-1234-1234-123456789abc", lakehouse_id="87654321-4321-4321-4321-210987654321", session_id="0", start=0, size=100 )

for log_line in result.get("log", []): print(log_line) # Get next 100 lines result = livy_get_session_log(..., start=100, size=100) ```
create_blank_pipeline

Create a blank Fabric pipeline with no activities.

Creates a Data Pipeline in the specified workspace with an empty activities array, ready to be populated with activities later using the add_copy_activity_to_pipeline tool.

Parameters: workspace_name: The display name of the workspace where the pipeline will be created. pipeline_name: Name for the new pipeline (must be unique in workspace). description: Optional description for the pipeline.

Returns: Dictionary with status, pipeline_id, pipeline_name, workspace_name, and message.

Example: ```python # Create a blank pipeline result = create_blank_pipeline( workspace_name="Analytics Workspace", pipeline_name="My_Data_Integration_Pipeline", description="Pipeline for data integration workflows" )

# Later, add activities to it add_copy_activity_to_pipeline( workspace_name="Analytics Workspace", pipeline_name="My_Data_Integration_Pipeline", source_type="AzurePostgreSqlSource", source_connection_id=conn_id, source_table_schema="public", source_table_name="customers", destination_lakehouse_id=lakehouse_id, destination_connection_id=lakehouse_conn_id, destination_table_name="customers" ) ```
add_copy_activity_to_pipeline

Add a Copy Activity to an existing Fabric pipeline.

Retrieves an existing pipeline, adds a Copy Activity to it, and updates the pipeline definition. The Copy Activity will be appended to any existing activities in the pipeline.

Use this tool when:

  • You have an existing pipeline and want to add a new Copy Activity

  • You're building complex pipelines with multiple data copy operations

  • You want to incrementally build a pipeline

Parameters: workspace_name: The display name of the workspace containing the pipeline. pipeline_name: Name of the existing pipeline to update. source_type: Type of source (e.g., "AzurePostgreSqlSource", "AzureSqlSource", "SqlServerSource"). source_connection_id: Fabric workspace connection ID for source database. source_table_schema: Schema name of the source table (e.g., "public", "dbo"). source_table_name: Name of the source table (e.g., "movie"). destination_lakehouse_id: Workspace artifact ID of the destination Lakehouse. destination_connection_id: Fabric workspace connection ID for destination Lakehouse. destination_table_name: Name for the destination table in Lakehouse. activity_name: Optional custom name for the activity (default: auto-generated). source_access_mode: Source access mode ("direct" or "sql"). Default is "direct". source_sql_query: Optional SQL query for sql access mode. table_action_option: Table action option (default: "Append", options: "Append", "Overwrite"). apply_v_order: Apply V-Order optimization (default: True). timeout: Activity timeout (default: "0.12:00:00"). retry: Number of retry attempts (default: 0). retry_interval_seconds: Retry interval in seconds (default: 30).

Returns: Dictionary with status, pipeline_id, pipeline_name, activity_name, workspace_name, and message.

Example: ```python # First, get the lakehouse and connection IDs lakehouses = list_items(workspace_name="Analytics", item_type="Lakehouse") lakehouse_id = lakehouses["items"][0]["id"] lakehouse_conn_id = "a216973e-47d7-4224-bb56-2c053bac6831"

# Add a Copy Activity to an existing pipeline result = add_copy_activity_to_pipeline( workspace_name="Analytics Workspace", pipeline_name="My_Existing_Pipeline", source_type="AzurePostgreSqlSource", source_connection_id="12345678-1234-1234-1234-123456789abc", source_table_schema="public", source_table_name="orders", destination_lakehouse_id=lakehouse_id, destination_connection_id=lakehouse_conn_id, destination_table_name="orders", activity_name="CopyOrdersData", table_action_option="Overwrite" ) # Add another Copy Activity to the same pipeline result = add_copy_activity_to_pipeline( workspace_name="Analytics Workspace", pipeline_name="My_Existing_Pipeline", source_type="AzurePostgreSqlSource", source_connection_id="12345678-1234-1234-1234-123456789abc", source_table_schema="public", source_table_name="customers", destination_lakehouse_id=lakehouse_id, destination_connection_id=lakehouse_conn_id, destination_table_name="customers", activity_name="CopyCustomersData" ) # SQL fallback mode (use when direct Lakehouse copy fails with # "datasource type Lakehouse is invalid" error): result = add_copy_activity_to_pipeline( workspace_name="Analytics Workspace", pipeline_name="My_Existing_Pipeline", source_type="LakehouseTableSource", source_connection_id=sql_endpoint_conn_id, # SQL analytics endpoint connection source_table_schema="dbo", source_table_name="fact_sale", destination_lakehouse_id=lakehouse_id, destination_connection_id=lakehouse_conn_id, destination_table_name="fact_sale_copy", source_access_mode="sql", source_sql_query="SELECT * FROM dbo.fact_sale" # optional ) ```
add_notebook_activity_to_pipeline

Add a Notebook Activity to an existing Fabric pipeline.

Retrieves an existing pipeline, adds a Notebook Activity to it, and updates the pipeline definition. The Notebook Activity will be appended to any existing activities in the pipeline.

Use this tool when:

  • You have an existing pipeline and want to add a new Notebook Activity

  • You're building complex pipelines with multiple activities

  • You want to incrementally build a pipeline

Parameters: workspace_name: The display name of the workspace containing the pipeline. pipeline_name: Name of the existing pipeline to update. notebook_name: Name of the notebook to run. notebook_workspace_name: Optional name of the workspace containing the notebook. activity_name: Optional custom name for the activity (default: auto-generated). depends_on_activity_name: Optional name of an existing activity this one depends on. session_tag: Optional session tag for the notebook execution. parameters: Optional parameters to pass to the notebook. timeout: Activity timeout (default: "0.12:00:00"). retry: Number of retry attempts (default: 0). retry_interval_seconds: Retry interval in seconds (default: 30).

Returns: Dictionary with status, pipeline_id, pipeline_name, activity_name, workspace_name, and message.

add_dataflow_activity_to_pipeline

Add a Dataflow Activity to an existing Fabric pipeline.

Retrieves an existing pipeline, adds a Dataflow Activity to it, and updates the pipeline definition. The Dataflow Activity will be appended to any existing activities in the pipeline.

Use this tool when:

  • You have an existing pipeline and want to add a new Dataflow Activity

  • You're building complex pipelines with multiple activities

  • You want to incrementally build a pipeline

Parameters: workspace_name: The display name of the workspace containing the pipeline. pipeline_name: Name of the existing pipeline to update. dataflow_name: Name of the Dataflow to run. dataflow_workspace_name: Optional name of the workspace containing the Dataflow. activity_name: Optional custom name for the activity (default: auto-generated). depends_on_activity_name: Optional name of an existing activity this one depends on. timeout: Activity timeout (default: "0.12:00:00"). retry: Number of retry attempts (default: 0). retry_interval_seconds: Retry interval in seconds (default: 30).

Returns: Dictionary with status, pipeline_id, pipeline_name, activity_name, workspace_name, and message.

add_activity_to_pipeline

Add a generic activity to an existing Fabric pipeline from a JSON template.

Retrieves an existing pipeline, adds an activity from the provided JSON template, and updates the pipeline definition. This is a more general-purpose tool compared to add_copy_activity_to_pipeline, allowing you to add any type of Fabric pipeline activity by providing its complete JSON definition.

Use this tool when:

  • You have a custom activity JSON template to add

  • You want to add activity types beyond Copy (e.g., Notebook, Script, Web, etc.)

  • You need full control over the activity definition

  • You're working with complex activity configurations

Activity JSON Requirements:

  • Must be a valid dictionary/object

  • Must include a "name" field (string)

  • Must include a "type" field (e.g., "Copy", "Notebook", "Script", "Web", etc.)

  • Should include all required properties for the specific activity type

  • Common fields: "dependsOn", "policy", "typeProperties"

Parameters: workspace_name: The display name of the workspace containing the pipeline. pipeline_name: Name of the existing pipeline to update. activity_json: Complete JSON dictionary representing the activity definition. Must include "name", "type", and all required properties.

Returns: Dictionary with status, pipeline_id, pipeline_name, activity_name, activity_type, workspace_name, and message.

Example: ```python # Example 1: Add a Copy Activity from JSON template copy_activity = { "name": "CopyCustomData", "type": "Copy", "dependsOn": [], "policy": { "timeout": "0.12:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": False, "secureInput": False }, "typeProperties": { "source": { "type": "AzurePostgreSqlSource", "partitionOption": "None", "queryTimeout": "02:00:00", "datasetSettings": { "type": "AzurePostgreSqlTable", "schema": [], "typeProperties": { "schema": "public", "table": "products" }, "externalReferences": { "connection": "12345678-1234-1234-1234-123456789abc" } } }, "sink": { "type": "LakehouseTableSink", "tableActionOption": "Overwrite", "applyVOrder": True, "datasetSettings": { "type": "LakehouseTable", "typeProperties": { "table": "products" } } } } }

result = add_activity_to_pipeline( workspace_name="Analytics Workspace", pipeline_name="My_Pipeline", activity_json=copy_activity ) # Example 2: Add a Notebook Activity notebook_activity = { "name": "RunTransformation", "type": "Notebook", "dependsOn": [ { "activity": "CopyCustomData", "dependencyConditions": ["Succeeded"] } ], "policy": { "timeout": "1.00:00:00", "retry": 0 }, "typeProperties": { "notebookPath": "/Notebooks/TransformData", "parameters": { "table_name": "products" } } } result = add_activity_to_pipeline( workspace_name="Analytics Workspace", pipeline_name="My_Pipeline", activity_json=notebook_activity ) ```
delete_activity_from_pipeline

Delete an activity from an existing Fabric pipeline.

Removes the specified activity from the pipeline definition. This will fail if any other activity depends on it. Use remove_activity_dependency to remove dependencies first.

Parameters: workspace_name: The display name of the workspace containing the pipeline. pipeline_name: Name of the existing pipeline to update. activity_name: Name of the activity to delete.

Returns: Dictionary with status, pipeline_id, pipeline_name, activity_name, workspace_name, and message.

remove_activity_dependency

Remove dependsOn references to a target activity.

Removes dependsOn edges pointing to the target activity. If from_activity_name is provided, only removes edges from that activity.

Parameters: workspace_name: The display name of the workspace containing the pipeline. pipeline_name: Name of the existing pipeline to update. activity_name: Name of the activity being depended on. from_activity_name: Optional activity to remove dependencies from.

Returns: Dictionary with status, pipeline_id, pipeline_name, activity_name, removed_count, workspace_name, and message.

create_semantic_model

Create an empty Fabric semantic model.

add_table_to_semantic_model

Add a table from a lakehouse to an existing semantic model.

add_measures_to_semantic_model

Add measures to a table in an existing semantic model.

delete_measures_from_semantic_model

Delete measures from a table in an existing semantic model.

get_semantic_model_details

Get semantic model metadata by name or ID.

get_semantic_model_definition

Get semantic model definition parts in the requested format.

add_relationship_to_semantic_model

Add a relationship between two tables in an existing semantic model.

refresh_semantic_model

Refresh a semantic model and wait for completion.

execute_dax_query

Execute a DAX query and return the raw Power BI response.

Prompts

Interactive templates invoked by user choice

NameDescription

No prompts

Resources

Contextual data attached and managed by the client

NameDescription

No resources

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bablulawrence/ms-fabric-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server