schemas.py•10.5 kB
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class FieldSchema:
"""Schema for a field in a tool"""
type: str
description: str
required: bool = False
items: Optional[Dict[str, Any]] = None
properties: Optional[Dict[str, Any]] = None
enum: Optional[List[str]] = None
class DynamicSchemaBuilder:
"""Build dynamic schemas for ClickUp tools"""
@staticmethod
def get_base_task_fields() -> Dict[str, FieldSchema]:
"""Get base task fields that are always available"""
return {
"name": FieldSchema(
type="string",
description="The name/title of the task",
required=False
),
"markdown_content": FieldSchema(
type="string",
description="The markdown content of the task description"
),
"status": FieldSchema(
type="string",
description="Status of the task"
),
"priority": FieldSchema(
type="integer",
description="Priority level (1=urgent, 2=high, 3=normal, 4=low)"
),
"due_date": FieldSchema(
type="string",
description="Due date in Unix timestamp milliseconds"
),
"assignees": FieldSchema(
type="array",
description="Array of user IDs to assign the task to",
items={"type": "string"}
),
"tags": FieldSchema(
type="array",
description="Array of tag names",
items={"type": "string"}
),
"parent": FieldSchema(
type="string",
description="You can create a subtask by including an existing task ID. The parent task ID you include can be a subtask, but must be in the same List specified in the path parameter."
)
}
@staticmethod
def get_task_filter_fields() -> Dict[str, FieldSchema]:
"""Get fields available for task filtering"""
return {
"archived": FieldSchema(
type="boolean",
description="Include archived tasks"
),
"page": FieldSchema(
type="integer",
description="Page number for pagination"
),
"order_by": FieldSchema(
type="string",
description="Field to order by",
enum=["id", "created", "updated", "due_date"]
),
"reverse": FieldSchema(
type="boolean",
description="Reverse the order"
),
"subtasks": FieldSchema(
type="boolean",
description="Include subtasks"
),
"statuses": FieldSchema(
type="array",
description="Filter by specific statuses",
items={"type": "string"}
),
"include_closed": FieldSchema(
type="boolean",
description="Include closed tasks"
),
"assignees": FieldSchema(
type="array",
description="Filter by assignee user IDs",
items={"type": "string"}
),
"tags": FieldSchema(
type="array",
description="Filter by tag names",
items={"type": "string"}
),
"due_date_gt": FieldSchema(
type="string",
description="Filter tasks with due date greater than (Unix timestamp)"
),
"due_date_lt": FieldSchema(
type="string",
description="Filter tasks with due date less than (Unix timestamp)"
),
"date_created_gt": FieldSchema(
type="string",
description="Filter tasks created after (Unix timestamp)"
),
"date_created_lt": FieldSchema(
type="string",
description="Filter tasks created before (Unix timestamp)"
),
"date_updated_gt": FieldSchema(
type="string",
description="Filter tasks updated after (Unix timestamp)"
),
"date_updated_lt": FieldSchema(
type="string",
description="Filter tasks updated before (Unix timestamp)"
)
}
@staticmethod
def build_schema(fields: Dict[str, FieldSchema], required_fields: List[str] = None) -> Dict[str, Any]:
"""Build a JSON schema from field definitions"""
if required_fields is None:
required_fields = []
properties = {}
for name, field in fields.items():
prop = {
"type": field.type,
"description": field.description
}
if field.items:
prop["items"] = field.items
if field.properties:
prop["properties"] = field.properties
if field.enum:
prop["enum"] = field.enum
properties[name] = prop
schema = {
"type": "object",
"properties": properties
}
if required_fields:
schema["required"] = required_fields
return schema
@classmethod
def get_create_task_schema(cls, list_id_required: bool = True) -> Dict[str, Any]:
"""Get schema for creating tasks"""
fields = {
"list_id": FieldSchema(
type="string",
description="The ID of the list to create the task in",
required=list_id_required
)
}
fields.update(cls.get_base_task_fields())
required = ["list_id", "name"] if list_id_required else ["name"]
return cls.build_schema(fields, required)
@classmethod
def get_update_task_schema(cls) -> Dict[str, Any]:
"""Get schema for updating tasks"""
fields = {
"task_id": FieldSchema(
type="string",
description="The ID of the task to update",
required=True
)
}
# For updates, all task fields are optional except task_id
base_fields = cls.get_base_task_fields()
fields.update(base_fields)
# Add special assignees field for updates
fields["assignees"] = FieldSchema(
type="object",
description="Assignees to add or remove",
properties={
"add": {
"type": "array",
"items": {"type": "string"},
"description": "User IDs to add as assignees"
},
"rem": {
"type": "array",
"items": {"type": "string"},
"description": "User IDs to remove as assignees"
}
}
)
return cls.build_schema(fields, ["task_id"])
@classmethod
def get_list_tasks_schema(cls) -> Dict[str, Any]:
"""Get schema for listing tasks"""
fields = {
"list_id": FieldSchema(
type="string",
description="The ID of the list to get tasks from",
required=True
)
}
fields.update(cls.get_task_filter_fields())
return cls.build_schema(fields, ["list_id"])
@classmethod
def get_search_tasks_schema(cls) -> Dict[str, Any]:
"""Get schema for searching tasks"""
fields = {
"workspace_id": FieldSchema(
type="string",
description="The ID of the workspace to search in",
required=True
),
"query": FieldSchema(
type="string",
description="Search query text",
required=True
),
"space_ids": FieldSchema(
type="array",
description="Filter by specific space IDs",
items={"type": "string"}
),
"list_ids": FieldSchema(
type="array",
description="Filter by specific list IDs",
items={"type": "string"}
)
}
# Add common filter fields
filter_fields = cls.get_task_filter_fields()
# Remove list-specific filters that don't apply to workspace search
exclude_fields = {"archived", "include_closed"}
for field_name in filter_fields:
if field_name not in exclude_fields:
fields[field_name] = filter_fields[field_name]
return cls.build_schema(fields, ["workspace_id", "query"])
@classmethod
def add_custom_fields(cls, base_schema: Dict[str, Any], custom_fields: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Add custom fields from ClickUp to the schema"""
if not custom_fields:
return base_schema
for custom_field in custom_fields:
field_name = f"custom_field_{custom_field.get('id', 'unknown')}"
field_type = cls._map_clickup_field_type(custom_field.get('type', 'text'))
base_schema["properties"][field_name] = {
"type": field_type,
"description": f"Custom field: {custom_field.get('name', 'Unknown')} ({custom_field.get('type', 'text')})"
}
# Add enum for dropdown fields
if custom_field.get('type') == 'drop_down' and custom_field.get('type_config', {}).get('options'):
options = custom_field['type_config']['options']
base_schema["properties"][field_name]["enum"] = [opt.get('name') for opt in options]
return base_schema
@staticmethod
def _map_clickup_field_type(clickup_type: str) -> str:
"""Map ClickUp field types to JSON schema types"""
type_mapping = {
'text': 'string',
'textarea': 'string',
'number': 'number',
'currency': 'number',
'date': 'string',
'drop_down': 'string',
'checkbox': 'boolean',
'url': 'string',
'email': 'string',
'phone': 'string',
'labels': 'array',
'users': 'array'
}
return type_mapping.get(clickup_type, 'string')