"""Claim/ack tools: claim_clink, complete_clink, release_clink."""
from typing import Optional
from mcp.server.fastmcp import FastMCP
from .. import client
from ..types import ClinkApiError
def register(mcp: FastMCP) -> None:
"""Register claim/ack tools with the MCP server."""
@mcp.tool()
async def claim_clink(
clink_id: str,
timeout_seconds: Optional[int] = None,
) -> str:
"""Claim a clink to indicate you are processing it. This prevents other workers/agents from processing the same clink. The claim expires after a timeout (default 5 minutes) if not completed or released.
Args:
clink_id: The clink ID to claim
timeout_seconds: How long to hold the claim in seconds (default: 300 = 5 minutes)
"""
try:
if not clink_id:
return "Please provide a clink_id to claim."
clink = await client.claim_clink(
clink_id,
timeout_seconds=timeout_seconds,
)
output = f"Clink claimed successfully!\n\n"
output += f"ID: {clink.clink_id}\n"
output += f"Status: {clink.status}\n"
output += f"From: {clink.sender_name}\n"
if clink.claim_expires_at:
output += f"Expires: {clink.claim_expires_at}\n"
output += f"\nContent:\n{clink.content[:500]}"
if len(clink.content) > 500:
output += "..."
output += "\n\nUse complete_clink when done, or release_clink to let others process it."
return output
except ClinkApiError as e:
if e.status_code == 404:
return f"Clink not found: {clink_id}"
if e.status_code == 409:
return f"Clink is already claimed by another agent. Try a different clink or wait."
return f"Error: {e}"
@mcp.tool()
async def complete_clink(
clink_id: str,
response: Optional[str] = None,
) -> str:
"""Mark a claimed clink as completed. You must have claimed the clink first. Optionally send a reply to the original sender.
Args:
clink_id: The clink ID to complete
response: Optional reply clink to send to the original sender
"""
try:
if not clink_id:
return "Please provide a clink_id to complete."
result = await client.complete_clink(
clink_id,
response_text=response,
)
clink = result.clink
output = f"Clink completed!\n\n"
output += f"ID: {clink.clink_id}\n"
output += f"Status: {clink.status}\n"
if result.response_clink_id:
output += f"\nReply sent (ID: {result.response_clink_id})"
return output
except ClinkApiError as e:
if e.status_code == 404:
return f"Clink not found: {clink_id}"
if e.status_code == 403:
return "You can only complete clinks you have claimed."
if e.status_code == 409:
return "Clink is not in a state that can be completed. It may already be completed or not claimed."
return f"Error: {e}"
@mcp.tool()
async def release_clink(clink_id: str) -> str:
"""Release a claimed clink without completing it. The clink returns to the pending queue for another worker to process. Use this when you cannot complete the task.
Args:
clink_id: The clink ID to release
"""
try:
if not clink_id:
return "Please provide a clink_id to release."
clink = await client.release_clink(clink_id)
output = f"Clink released.\n\n"
output += f"ID: {clink.clink_id}\n"
output += f"Status: {clink.status}\n"
output += "\nThe clink is now available for other agents to claim."
return output
except ClinkApiError as e:
if e.status_code == 404:
return f"Clink not found: {clink_id}"
if e.status_code == 403:
return "You can only release clinks you have claimed."
return f"Error: {e}"