#!/usr/bin/env python3
'''
MCP Server for Pipefy API.
This server provides read-only tools to interact with Pipefy GraphQL API,
enabling LLMs to query pipes, cards, and phases.
'''
import os
import json
from typing import Optional, List, Any
from enum import Enum
import httpx
from pydantic import BaseModel, Field, ConfigDict
from mcp.server.fastmcp import FastMCP
# Initialize the MCP server
mcp = FastMCP("pipefy_mcp")
# Constants
PIPEFY_API_URL = "https://api.pipefy.com/graphql"
CHARACTER_LIMIT = 25000
# Enums
class ResponseFormat(str, Enum):
'''Output format for tool responses.'''
MARKDOWN = "markdown"
JSON = "json"
# Shared utility functions
def _get_api_token() -> str:
'''Get Pipefy API token from environment.'''
token = os.environ.get("PIPEFY_API_TOKEN")
if not token:
raise ValueError(
"PIPEFY_API_TOKEN environment variable is required. "
"Get your token from Pipefy Settings > Personal Access Tokens."
)
return token
async def _make_graphql_request(query: str, variables: dict | None = None) -> dict:
'''Execute a GraphQL request to Pipefy API.'''
token = _get_api_token()
async with httpx.AsyncClient() as client:
response = await client.post(
PIPEFY_API_URL,
json={"query": query, "variables": variables or {}},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
if "errors" in data:
error_messages = [e.get("message", str(e)) for e in data["errors"]]
raise ValueError(f"GraphQL errors: {'; '.join(error_messages)}")
return data.get("data", {})
def _handle_api_error(e: Exception) -> str:
'''Consistent error formatting across all tools.'''
if isinstance(e, httpx.HTTPStatusError):
status = e.response.status_code
if status == 401:
return "Error: Invalid API token. Check your PIPEFY_API_TOKEN environment variable."
elif status == 403:
return "Error: Permission denied. You don't have access to this resource."
elif status == 429:
return "Error: Rate limit exceeded. Please wait before making more requests."
return f"Error: API request failed with status {status}"
elif isinstance(e, httpx.TimeoutException):
return "Error: Request timed out. Please try again."
elif isinstance(e, ValueError):
return f"Error: {str(e)}"
return f"Error: Unexpected error occurred: {type(e).__name__}: {str(e)}"
def _truncate_response(text: str, limit: int = CHARACTER_LIMIT) -> str:
'''Truncate response if it exceeds character limit.'''
if len(text) <= limit:
return text
return text[:limit] + f"\n\n... (truncated, {len(text) - limit} characters omitted)"
# Pydantic Models for Input Validation
class GetPipeInput(BaseModel):
'''Input model for getting a single pipe.'''
model_config = ConfigDict(str_strip_whitespace=True)
pipe_id: str = Field(
...,
description="The unique ID of the pipe to retrieve (e.g., '301234567')",
min_length=1
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class ListPipesInput(BaseModel):
'''Input model for listing pipes.'''
model_config = ConfigDict(str_strip_whitespace=True)
organization_id: Optional[str] = Field(
default=None,
description="Optional: The organization ID to list pipes from. If omitted, lists pipes from all organizations."
)
include_details: bool = Field(
default=False,
description="If true, fetches additional details like role and members count (slower)"
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class CreateCardField(BaseModel):
'''Field value for creating a card.'''
field_id: str = Field(..., description="The ID of the field (e.g., 'email', 'custom_field_123')")
field_value: Any = Field(..., description="The value for the field")
class CreateCardInput(BaseModel):
'''Input model for creating a new card.'''
model_config = ConfigDict(str_strip_whitespace=True)
pipe_id: str = Field(
...,
description="The ID of the pipe where the card will be created",
min_length=1
)
title: Optional[str] = Field(
default=None,
description="The title of the card (if the pipe has a title field)"
)
fields: List[CreateCardField] = Field(
default_factory=list,
description="List of fields and their values to populate the card"
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class GetCardInput(BaseModel):
'''Input model for getting a single card.'''
model_config = ConfigDict(str_strip_whitespace=True)
card_id: str = Field(
...,
description="The unique ID of the card to retrieve",
min_length=1
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class ListCardsInput(BaseModel):
'''Input model for listing cards in a pipe.'''
model_config = ConfigDict(str_strip_whitespace=True)
pipe_id: str = Field(
...,
description="The pipe ID to list cards from",
min_length=1
)
limit: int = Field(
default=20,
description="Maximum number of cards to return",
ge=1,
le=50
)
search: Optional[str] = Field(
default=None,
description="Optional search term to filter cards by title"
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class SearchCardsInput(BaseModel):
'''Input model for searching cards by field value.'''
model_config = ConfigDict(str_strip_whitespace=True)
pipe_id: str = Field(
...,
description="The pipe ID to search cards in",
min_length=1
)
field_id: str = Field(
...,
description="The field ID to search by (e.g., 'title', 'email', custom field ID)",
min_length=1
)
field_value: str = Field(
...,
description="The value to search for in the specified field",
min_length=1
)
limit: int = Field(
default=20,
description="Maximum number of cards to return",
ge=1,
le=50
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class GetPhaseInput(BaseModel):
'''Input model for getting phase details.'''
model_config = ConfigDict(str_strip_whitespace=True)
phase_id: str = Field(
...,
description="The unique ID of the phase to retrieve",
min_length=1
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
# ============================================================================
# TABLE MODELS (Database Tables)
# ============================================================================
class ListTablesInput(BaseModel):
'''Input model for listing database tables.'''
model_config = ConfigDict(str_strip_whitespace=True)
organization_id: str = Field(
...,
description="The organization ID to list tables from",
min_length=1
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class GetTableInput(BaseModel):
'''Input model for getting a single table.'''
model_config = ConfigDict(str_strip_whitespace=True)
table_id: str = Field(
...,
description="The alphanumeric ID of the table to retrieve (e.g., 'ZtEdWh')",
min_length=1
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class CreateTableInput(BaseModel):
'''Input model for creating a new table.'''
model_config = ConfigDict(str_strip_whitespace=True)
organization_id: str = Field(
...,
description="The organization ID where the table will be created",
min_length=1
)
name: str = Field(
...,
description="The name for the new table",
min_length=1
)
color: Optional[str] = Field(
default=None,
description="Optional color for the table (e.g., 'blue', 'red', 'green', 'lime', 'yellow')"
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
# ============================================================================
# TABLE RECORD MODELS
# ============================================================================
class ListTableRecordsInput(BaseModel):
'''Input model for listing table records.'''
model_config = ConfigDict(str_strip_whitespace=True)
table_id: str = Field(
...,
description="The alphanumeric ID of the table",
min_length=1
)
limit: int = Field(
default=20,
description="Maximum number of records to return",
ge=1,
le=50
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class GetTableRecordInput(BaseModel):
'''Input model for getting a single table record.'''
model_config = ConfigDict(str_strip_whitespace=True)
record_id: int = Field(
...,
description="The numeric ID of the record to retrieve",
gt=0
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class CreateTableRecordInput(BaseModel):
'''Input model for creating a new table record.'''
model_config = ConfigDict(str_strip_whitespace=True)
table_id: str = Field(
...,
description="The alphanumeric ID of the table",
min_length=1
)
title: str = Field(
...,
description="The title for the new record",
min_length=1
)
fields: List[CreateCardField] = Field(
default_factory=list,
description="List of fields and their values for the record"
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
# Tool definitions
@mcp.tool(
name="pipefy_get_pipe",
annotations={
"title": "Get Pipefy Pipe Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def pipefy_get_pipe(params: GetPipeInput) -> str:
'''Get detailed information about a specific Pipefy pipe.
Retrieves pipe details including name, phases, fields, members, and card counts.
Use this when you need comprehensive information about a workflow/process.
Args:
params: GetPipeInput with pipe_id and response_format
Returns:
Pipe details in the requested format (markdown or json)
Examples:
- Get pipe details: pipe_id="301234567"
- Get pipe as JSON: pipe_id="301234567", response_format="json"
'''
query = """
query GetPipe($id: ID!) {
pipe(id: $id) {
id
name
description
cards_count
opened_cards_count
done_cards_count
members {
user {
id
name
email
}
role_name
}
phases {
id
name
cards_count
done
}
start_form_fields {
id
label
type
required
}
labels {
id
name
color
}
}
}
"""
try:
data = await _make_graphql_request(query, {"id": params.pipe_id})
pipe = data.get("pipe")
if not pipe:
return f"Error: Pipe with ID '{params.pipe_id}' not found."
if params.response_format == ResponseFormat.JSON:
return _truncate_response(json.dumps(pipe, indent=2))
# Markdown format
lines = [
f"# {pipe['name']}",
f"**ID:** {pipe['id']}",
"",
]
if pipe.get("description"):
lines.extend([pipe["description"], ""])
lines.extend([
"## Card Statistics",
f"- **Total Cards:** {pipe.get('cards_count', 0)}",
f"- **Open Cards:** {pipe.get('opened_cards_count', 0)}",
f"- **Done Cards:** {pipe.get('done_cards_count', 0)}",
"",
])
# Phases
phases = pipe.get("phases", [])
if phases:
lines.append("## Phases")
for phase in phases:
done_marker = " (Done)" if phase.get("done") else ""
lines.append(f"- **{phase['name']}** (ID: {phase['id']}) - {phase.get('cards_count', 0)} cards{done_marker}")
lines.append("")
# Members
members = pipe.get("members", [])
if members:
lines.append("## Members")
for member in members:
user = member.get("user", {})
lines.append(f"- {user.get('name', 'Unknown')} ({user.get('email', 'N/A')}) - {member.get('role_name', 'N/A')}")
lines.append("")
# Labels
labels = pipe.get("labels", [])
if labels:
lines.append("## Labels")
for label in labels:
lines.append(f"- {label['name']} (ID: {label['id']})")
lines.append("")
# Start form fields
fields = pipe.get("start_form_fields", [])
if fields:
lines.append("## Start Form Fields")
for field in fields:
required = " (required)" if field.get("required") else ""
lines.append(f"- **{field['label']}** ({field['type']}){required} - ID: {field['id']}")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_list_cards",
annotations={
"title": "List Pipefy Cards",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def pipefy_list_cards(params: ListCardsInput) -> str:
'''List cards from a specific Pipefy pipe.
Retrieves cards with their basic information, current phase, and assignees.
Supports optional search filtering by card title.
Args:
params: ListCardsInput with pipe_id, limit, optional search, and response_format
Returns:
List of cards in the requested format
Examples:
- List first 20 cards: pipe_id="301234567"
- Search cards: pipe_id="301234567", search="urgent"
- Get more cards: pipe_id="301234567", limit=50
'''
query = """
query ListCards($pipeId: ID!, $first: Int!, $search: String) {
cards(pipe_id: $pipeId, first: $first, search: {title: $search}) {
edges {
node {
id
title
due_date
created_at
updated_at
current_phase {
id
name
}
assignees {
id
name
email
}
labels {
id
name
}
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""
try:
variables = {
"pipeId": params.pipe_id,
"first": params.limit,
"search": params.search,
}
data = await _make_graphql_request(query, variables)
cards_data = data.get("cards", {})
edges = cards_data.get("edges", [])
page_info = cards_data.get("pageInfo", {})
if not edges:
search_msg = f" matching '{params.search}'" if params.search else ""
return f"No cards found in pipe '{params.pipe_id}'{search_msg}."
cards = [edge["node"] for edge in edges]
if params.response_format == ResponseFormat.JSON:
result = {
"cards": cards,
"count": len(cards),
"has_more": page_info.get("hasNextPage", False),
}
return _truncate_response(json.dumps(result, indent=2))
# Markdown format
lines = [f"# Cards in Pipe", ""]
if params.search:
lines.append(f"*Search: '{params.search}'*")
lines.append("")
lines.append(f"Found {len(cards)} cards" + (" (more available)" if page_info.get("hasNextPage") else ""))
lines.append("")
for card in cards:
phase_name = card.get("current_phase", {}).get("name", "Unknown")
lines.append(f"## {card['title']}")
lines.append(f"- **ID:** {card['id']}")
lines.append(f"- **Phase:** {phase_name}")
if card.get("due_date"):
lines.append(f"- **Due:** {card['due_date']}")
assignees = card.get("assignees", [])
if assignees:
names = ", ".join(a.get("name", "Unknown") for a in assignees)
lines.append(f"- **Assignees:** {names}")
labels = card.get("labels", [])
if labels:
label_names = ", ".join(l.get("name", "") for l in labels)
lines.append(f"- **Labels:** {label_names}")
lines.append("")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_get_card",
annotations={
"title": "Get Pipefy Card Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def pipefy_get_card(params: GetCardInput) -> str:
'''Get detailed information about a specific Pipefy card.
Retrieves comprehensive card details including all field values, comments,
attachments, and activity history.
Args:
params: GetCardInput with card_id and response_format
Returns:
Card details in the requested format
Examples:
- Get card details: card_id="123456789"
'''
query = """
query GetCard($id: ID!) {
card(id: $id) {
id
title
due_date
created_at
updated_at
done
expired
current_phase {
id
name
}
pipe {
id
name
}
assignees {
id
name
email
}
labels {
id
name
color
}
fields {
name
value
field {
id
type
}
}
comments_count
attachments_count
}
}
"""
try:
data = await _make_graphql_request(query, {"id": params.card_id})
card = data.get("card")
if not card:
return f"Error: Card with ID '{params.card_id}' not found."
if params.response_format == ResponseFormat.JSON:
return _truncate_response(json.dumps(card, indent=2))
# Markdown format
lines = [
f"# {card['title']}",
f"**ID:** {card['id']}",
"",
]
# Status
status_parts = []
if card.get("done"):
status_parts.append("Done")
if card.get("expired"):
status_parts.append("Expired")
if not status_parts:
status_parts.append("In Progress")
lines.append(f"**Status:** {', '.join(status_parts)}")
# Pipe and Phase
pipe = card.get("pipe", {})
phase = card.get("current_phase", {})
lines.append(f"**Pipe:** {pipe.get('name', 'Unknown')} (ID: {pipe.get('id', 'N/A')})")
lines.append(f"**Phase:** {phase.get('name', 'Unknown')}")
# Dates
if card.get("due_date"):
lines.append(f"**Due Date:** {card['due_date']}")
lines.append(f"**Created:** {card.get('created_at', 'N/A')}")
lines.append(f"**Updated:** {card.get('updated_at', 'N/A')}")
lines.append("")
# Assignees
assignees = card.get("assignees", [])
if assignees:
lines.append("## Assignees")
for a in assignees:
lines.append(f"- {a.get('name', 'Unknown')} ({a.get('email', 'N/A')})")
lines.append("")
# Labels
labels = card.get("labels", [])
if labels:
lines.append("## Labels")
for label in labels:
lines.append(f"- {label['name']}")
lines.append("")
# Fields
fields = card.get("fields", [])
if fields:
lines.append("## Fields")
for field in fields:
value = field.get("value") or "(empty)"
# Handle array values (like checkboxes, connections)
if isinstance(value, list):
value = ", ".join(str(v) for v in value) if value else "(empty)"
lines.append(f"- **{field['name']}:** {value}")
lines.append("")
# Counts
lines.append("## Activity")
lines.append(f"- **Comments:** {card.get('comments_count', 0)}")
lines.append(f"- **Attachments:** {card.get('attachments_count', 0)}")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_search_cards",
annotations={
"title": "Search Pipefy Cards by Field",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def pipefy_search_cards(params: SearchCardsInput) -> str:
'''Search for cards in a pipe by a specific field value.
Finds cards where a specific field matches the given value.
Useful for finding cards by email, custom ID, or any other field.
Args:
params: SearchCardsInput with pipe_id, field_id, field_value, limit, and response_format
Returns:
Matching cards in the requested format
Examples:
- Search by email: pipe_id="301234567", field_id="email", field_value="john@example.com"
- Search by custom field: pipe_id="301234567", field_id="customer_id", field_value="CUST-001"
'''
query = """
query FindCards($pipeId: ID!, $fieldId: String!, $fieldValue: String!, $first: Int!) {
findCards(pipeId: $pipeId, search: {fieldId: $fieldId, fieldValue: $fieldValue}, first: $first) {
edges {
node {
id
title
due_date
current_phase {
id
name
}
assignees {
id
name
}
}
}
pageInfo {
hasNextPage
}
}
}
"""
try:
variables = {
"pipeId": params.pipe_id,
"fieldId": params.field_id,
"fieldValue": params.field_value,
"first": params.limit,
}
data = await _make_graphql_request(query, variables)
cards_data = data.get("findCards", {})
edges = cards_data.get("edges", [])
page_info = cards_data.get("pageInfo", {})
if not edges:
return f"No cards found with {params.field_id}='{params.field_value}' in pipe '{params.pipe_id}'."
cards = [edge["node"] for edge in edges]
if params.response_format == ResponseFormat.JSON:
result = {
"cards": cards,
"count": len(cards),
"has_more": page_info.get("hasNextPage", False),
"search": {
"field_id": params.field_id,
"field_value": params.field_value,
}
}
return _truncate_response(json.dumps(result, indent=2))
# Markdown format
lines = [
f"# Search Results",
f"*{params.field_id} = '{params.field_value}'*",
"",
f"Found {len(cards)} cards" + (" (more available)" if page_info.get("hasNextPage") else ""),
"",
]
for card in cards:
phase_name = card.get("current_phase", {}).get("name", "Unknown")
lines.append(f"## {card['title']}")
lines.append(f"- **ID:** {card['id']}")
lines.append(f"- **Phase:** {phase_name}")
if card.get("due_date"):
lines.append(f"- **Due:** {card['due_date']}")
assignees = card.get("assignees", [])
if assignees:
names = ", ".join(a.get("name", "Unknown") for a in assignees)
lines.append(f"- **Assignees:** {names}")
lines.append("")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_get_phase",
annotations={
"title": "Get Pipefy Phase Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def pipefy_get_phase(params: GetPhaseInput) -> str:
'''Get detailed information about a specific Pipefy phase.
Retrieves phase details including fields, card count, and configuration.
Args:
params: GetPhaseInput with phase_id and response_format
Returns:
Phase details in the requested format
Examples:
- Get phase details: phase_id="12345678"
'''
query = """
query GetPhase($id: ID!) {
phase(id: $id) {
id
name
description
done
cards_count
sequentialId
fields {
id
label
type
required
description
}
}
}
"""
try:
data = await _make_graphql_request(query, {"id": params.phase_id})
phase = data.get("phase")
if not phase:
return f"Error: Phase with ID '{params.phase_id}' not found."
if params.response_format == ResponseFormat.JSON:
return _truncate_response(json.dumps(phase, indent=2))
# Markdown format
done_marker = " (Final Phase)" if phase.get("done") else ""
lines = [
f"# {phase['name']}{done_marker}",
f"**ID:** {phase['id']}",
"",
]
if phase.get("description"):
lines.extend([phase["description"], ""])
lines.extend([
f"**Cards in Phase:** {phase.get('cards_count', 0)}",
"",
])
# Fields
fields = phase.get("fields", [])
if fields:
lines.append("## Phase Fields")
for field in fields:
required = " (required)" if field.get("required") else ""
lines.append(f"- **{field['label']}** ({field['type']}){required}")
lines.append(f" - ID: {field['id']}")
if field.get("description"):
lines.append(f" - {field['description']}")
lines.append("")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_list_pipes",
annotations={
"title": "List Pipefy Pipes",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def pipefy_list_pipes(params: ListPipesInput) -> str:
'''List Pipefy pipes available to the user.
Can list pipes from all organizations or a specific one.
Args:
params: ListPipesInput with optional organization_id
Returns:
List of pipes with IDs and names
'''
query = """
query GetOrganizations {
organizations {
id
name
pipes {
id
name
cards_count
color
}
}
}
"""
try:
data = await _make_graphql_request(query)
orgs = data.get("organizations", [])
if params.organization_id:
# Filter for specific org
orgs = [o for o in orgs if str(o["id"]) == params.organization_id]
if not orgs:
return f"Error: Organization with ID '{params.organization_id}' not found."
if params.response_format == ResponseFormat.JSON:
return _truncate_response(json.dumps(orgs, indent=2))
lines = ["# Your Pipes", ""]
for org in orgs:
lines.append(f"## Organization: {org['name']} (ID: {org['id']})")
pipes = org.get("pipes", [])
if not pipes:
lines.append("- No pipes found.")
else:
for pipe in pipes:
lines.append(f"- **{pipe['name']}** (ID: {pipe['id']}) - {pipe.get('cards_count', 0)} cards")
lines.append("")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_create_card",
annotations={
"title": "Create Pipefy Card",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True
}
)
async def pipefy_create_card(params: CreateCardInput) -> str:
'''Create a new card in a specific pipe.
Args:
params: CreateCardInput with pipe_id and fields
Returns:
Details of the created card
'''
query = """
mutation CreateCard($input: CreateCardInput!) {
createCard(input: $input) {
card {
id
title
url
}
}
}
"""
try:
fields_attributes = [
{"field_id": f.field_id, "field_value": f.field_value}
for f in params.fields
]
# If title is provided separately, add it (some pipes use it, usually it's a field too)
variables_input = {
"pipe_id": params.pipe_id,
"fields_attributes": fields_attributes
}
if params.title:
variables_input["title"] = params.title
data = await _make_graphql_request(query, {"input": variables_input})
card = data.get("createCard", {}).get("card")
if not card:
return "Error: Failed to create card (no card returned)."
if params.response_format == ResponseFormat.JSON:
return json.dumps(card, indent=2)
return f"""# Card Created Successfully
- **ID:** {card['id']}
- **Title:** {card['title']}
- **URL:** {card['url']}
"""
except Exception as e:
return _handle_api_error(e)
# ============================================================================
# DATABASE TABLES TOOLS
# ============================================================================
@mcp.tool(
name="pipefy_list_tables",
annotations={
"title": "List Pipefy Database Tables",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def pipefy_list_tables(params: ListTablesInput) -> str:
'''List database tables from a specific organization.
Database tables are Pipefy's information storage system for structured data
like customer registrations, supplier lists, product catalogs, etc.
Args:
params: ListTablesInput with organization_id
Returns:
List of tables with IDs, names, and basic info
'''
query = """
query GetOrgTables($orgId: ID!) {
organization(id: $orgId) {
id
name
tables {
edges {
node {
id
name
public
table_records_count
members {
user {
id
email
}
}
}
}
}
}
}
"""
try:
data = await _make_graphql_request(query, {"orgId": params.organization_id})
org = data.get("organization")
if not org:
return f"Error: Organization with ID '{params.organization_id}' not found."
tables_data = org.get("tables", {}).get("edges", [])
tables = [edge["node"] for edge in tables_data]
if params.response_format == ResponseFormat.JSON:
return _truncate_response(json.dumps(tables, indent=2))
lines = [
f"# Tables in Organization: {org['name']}",
f"**Organization ID:** {org['id']}",
"",
f"**Total Tables:** {len(tables)}",
""
]
if not tables:
lines.append("No tables found in this organization.")
else:
lines.append("## Available Tables")
for table in tables:
visibility = "Public" if table.get("public") else "Private"
lines.append(f"\n### {table['name']}")
lines.append(f"- **ID:** `{table['id']}`")
lines.append(f"- **Visibility:** {visibility}")
lines.append(f"- **Records:** {table.get('table_records_count', 0)}")
lines.append(f"- **Members:** {len(table.get('members', []))}")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_get_table",
annotations={
"title": "Get Pipefy Database Table Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def pipefy_get_table(params: GetTableInput) -> str:
'''Get detailed information about a specific database table.
Retrieves comprehensive table details including fields, members, and webhooks.
Args:
params: GetTableInput with table_id (alphanumeric, e.g., 'ZtEdWh')
Returns:
Detailed table information with fields and configuration
'''
query = """
query GetTable($id: ID!) {
table(id: $id) {
id
name
public
description
table_records_count
organization {
id
name
}
members {
user {
id
name
email
}
}
table_fields {
id
label
type
description
required
options
}
webhooks {
id
name
url
actions
}
}
}
"""
try:
data = await _make_graphql_request(query, {"id": params.table_id})
table = data.get("table")
if not table:
return f"Error: Table with ID '{params.table_id}' not found."
if params.response_format == ResponseFormat.JSON:
return _truncate_response(json.dumps(table, indent=2))
visibility = "Public" if table.get("public") else "Private"
lines = [
f"# {table['name']} ({visibility})",
f"**Table ID:** `{table['id']}`",
""
]
if table.get("description"):
lines.extend([table["description"], ""])
org = table.get("organization", {})
lines.extend([
f"**Organization:** {org.get('name')} (ID: {org.get('id')})",
f"**Records:** {table.get('table_records_count', 0)}",
""
])
# Table Fields
fields = table.get("table_fields", [])
if fields:
lines.append("## Table Fields")
for field in fields:
required = " **(required)**" if field.get("required") else ""
lines.append(f"\n### {field['label']}{required}")
lines.append(f"- **Field ID:** `{field['id']}`")
lines.append(f"- **Type:** {field['type']}")
if field.get("description"):
lines.append(f"- **Description:** {field['description']}")
if field.get("options"):
lines.append(f"- **Options:** {', '.join(field['options'])}")
lines.append("")
# Members
members = table.get("members", [])
if members:
lines.append("## Table Members")
for member in members[:10]: # Limit to 10 members
user = member.get("user", {})
lines.append(f"- {user.get('name', 'Unknown')} ({user.get('email', 'N/A')})")
if len(members) > 10:
lines.append(f"- ... and {len(members) - 10} more")
lines.append("")
# Webhooks
webhooks = table.get("webhooks", [])
if webhooks:
lines.append("## Webhooks")
for webhook in webhooks:
lines.append(f"- **{webhook['name']}** (ID: {webhook['id']})")
lines.append(f" - URL: {webhook['url']}")
lines.append(f" - Actions: {', '.join(webhook.get('actions', []))}")
lines.append("")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_create_table",
annotations={
"title": "Create New Pipefy Database Table",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def pipefy_create_table(params: CreateTableInput) -> str:
'''Create a new database table in an organization.
Creates a new table for storing structured data like customer records,
product catalogs, supplier lists, etc.
Args:
params: CreateTableInput with organization_id, name, and optional color
Returns:
Details of the created table
'''
query = """
mutation CreateTable($input: CreateTableInput!) {
createTable(input: $input) {
table {
id
name
public
}
}
}
"""
try:
variables_input = {
"organization_id": int(params.organization_id),
"name": params.name
}
if params.color:
variables_input["color"] = params.color
data = await _make_graphql_request(query, {"input": variables_input})
table = data.get("createTable", {}).get("table")
if not table:
return "Error: Failed to create table (no table returned)."
if params.response_format == ResponseFormat.JSON:
return json.dumps(table, indent=2)
return f"""# Table Created Successfully ✓
- **Table ID:** `{table['id']}`
- **Name:** {table['name']}
- **Visibility:** {'Public' if table.get('public') else 'Private'}
You can now add fields to this table using the field management tools.
"""
except Exception as e:
return _handle_api_error(e)
# ============================================================================
# TABLE RECORDS TOOLS
# ============================================================================
@mcp.tool(
name="pipefy_list_table_records",
annotations={
"title": "List Table Records",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def pipefy_list_table_records(params: ListTableRecordsInput) -> str:
'''List records from a database table.
Table records are similar to cards in pipes - they contain data stored
in database tables.
Args:
params: ListTableRecordsInput with table_id and optional limit
Returns:
List of records with IDs, titles, and field values
'''
query = """
query GetTableRecords($tableId: ID!, $first: Int!) {
table(id: $tableId) {
id
name
table_records(first: $first) {
edges {
node {
id
title
done
created_at
created_by {
id
name
}
record_fields {
name
value
field {
id
label
type
}
}
}
}
}
}
}
"""
try:
data = await _make_graphql_request(
query,
{"tableId": params.table_id, "first": params.limit}
)
table = data.get("table")
if not table:
return f"Error: Table with ID '{params.table_id}' not found."
records_data = table.get("table_records", {}).get("edges", [])
records = [edge["node"] for edge in records_data]
if params.response_format == ResponseFormat.JSON:
return _truncate_response(json.dumps(records, indent=2))
lines = [
f"# Records in Table: {table['name']}",
f"**Table ID:** `{table['id']}`",
"",
f"**Showing:** {len(records)} record(s)",
""
]
if not records:
lines.append("No records found in this table.")
else:
for record in records:
status = "Done ✓" if record.get("done") else "Active"
lines.append(f"\n## {record['title']}")
lines.append(f"- **Record ID:** {record['id']}")
lines.append(f"- **Status:** {status}")
lines.append(f"- **Created:** {record.get('created_at', 'N/A')}")
creator = record.get("created_by")
if creator:
lines.append(f"- **Created By:** {creator.get('name', 'Unknown')}")
# Show field values
record_fields = record.get("record_fields", [])
if record_fields:
lines.append("\n**Field Values:**")
for rf in record_fields[:10]: # Limit to 10 fields
value = rf.get("value", "N/A")
if value and len(str(value)) > 100:
value = str(value)[:100] + "..."
lines.append(f" - {rf.get('name', 'Unknown')}: {value}")
if len(record_fields) > 10:
lines.append(f" - ... and {len(record_fields) - 10} more fields")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_get_table_record",
annotations={
"title": "Get Table Record Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def pipefy_get_table_record(params: GetTableRecordInput) -> str:
'''Get detailed information about a specific table record.
Retrieves comprehensive record details including all field values.
Args:
params: GetTableRecordInput with record_id (numeric)
Returns:
Detailed record information with all fields
'''
query = """
query GetTableRecord($id: ID!) {
table_record(id: $id) {
id
title
done
created_at
updated_at
created_by {
id
name
email
}
table {
id
name
}
record_fields {
name
value
date_value
datetime_value
float_value
filled_at
updated_at
required
field {
id
label
type
description
}
}
}
}
"""
try:
data = await _make_graphql_request(query, {"id": params.record_id})
record = data.get("table_record")
if not record:
return f"Error: Record with ID '{params.record_id}' not found."
if params.response_format == ResponseFormat.JSON:
return _truncate_response(json.dumps(record, indent=2))
status = "Done ✓" if record.get("done") else "Active"
lines = [
f"# {record['title']}",
f"**Record ID:** {record['id']}",
f"**Status:** {status}",
""
]
table = record.get("table", {})
lines.extend([
f"**Table:** {table.get('name')} (ID: `{table.get('id')}`)",
f"**Created:** {record.get('created_at', 'N/A')}",
f"**Updated:** {record.get('updated_at', 'N/A')}",
""
])
creator = record.get("created_by")
if creator:
lines.extend([
f"**Created By:** {creator.get('name')} ({creator.get('email')})",
""
])
# Field Values
record_fields = record.get("record_fields", [])
if record_fields:
lines.append("## Field Values")
for rf in record_fields:
field = rf.get("field", {})
field_label = field.get("label", "Unknown Field")
field_type = field.get("type", "unknown")
required = " **(required)**" if rf.get("required") else ""
lines.append(f"\n### {field_label}{required}")
lines.append(f"- **Field ID:** `{field.get('id')}`")
lines.append(f"- **Type:** {field_type}")
# Show appropriate value based on type
value = rf.get("value", "N/A")
if rf.get("date_value"):
value = rf["date_value"]
elif rf.get("datetime_value"):
value = rf["datetime_value"]
elif rf.get("float_value") is not None:
value = rf["float_value"]
lines.append(f"- **Value:** {value}")
if rf.get("filled_at"):
lines.append(f"- **Last Updated:** {rf['filled_at']}")
return _truncate_response("\n".join(lines))
except Exception as e:
return _handle_api_error(e)
@mcp.tool(
name="pipefy_create_table_record",
annotations={
"title": "Create New Table Record",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def pipefy_create_table_record(params: CreateTableRecordInput) -> str:
'''Create a new record in a database table.
Similar to creating a card, but for database tables. Useful for adding
customer records, product entries, supplier information, etc.
Args:
params: CreateTableRecordInput with table_id, title, and fields
Returns:
Details of the created record
'''
query = """
mutation CreateTableRecord($input: CreateTableRecordInput!) {
createTableRecord(input: $input) {
table_record {
id
title
}
}
}
"""
try:
fields_attributes = [
{"field_id": f.field_id, "field_value": f.field_value}
for f in params.fields
]
variables_input = {
"table_id": params.table_id,
"title": params.title,
"fields_attributes": fields_attributes
}
data = await _make_graphql_request(query, {"input": variables_input})
record = data.get("createTableRecord", {}).get("table_record")
if not record:
return "Error: Failed to create record (no record returned)."
if params.response_format == ResponseFormat.JSON:
return json.dumps(record, indent=2)
return f"""# Record Created Successfully ✓
- **Record ID:** {record['id']}
- **Title:** {record['title']}
The record has been created in the table.
"""
except Exception as e:
return _handle_api_error(e)
if __name__ == "__main__":
# Run with stdio transport (default for local MCP servers)
mcp.run()