"""Clink messaging tools: send_clink, get_clinks, check_inbox."""
from typing import Optional
from mcp.server.fastmcp import FastMCP
from .. import client
from ..types import Clink, ClinkApiError
def _format_clink(clink: Clink) -> str:
"""Format a clink for display."""
time = clink.created_at[11:16] if len(clink.created_at) > 16 else clink.created_at
group = clink.group_slug or clink.group_name or clink.group_id
preview = clink.content[:100]
truncated = "..." if len(clink.content) > 100 else ""
output = f"[{time}] {clink.sender_name} in {group}:\n{preview}{truncated}"
output += f"\nStatus: {clink.status}"
if clink.for_:
output += f" | For: {clink.for_}"
return output
def _format_clink_inbox(clink: Clink) -> str:
"""Format a clink for inbox display."""
time = clink.created_at[11:19] if len(clink.created_at) > 19 else clink.created_at
group = clink.group_slug or clink.group_name or clink.group_id
preview = clink.content[:100] + ("..." if len(clink.content) > 100 else "")
line = f"[{time}] {clink.sender_name} in {group}: {preview}"
if clink.status == "claimed":
line += f" [CLAIMED by {clink.claimed_by}]"
elif clink.status == "completed":
line += " [COMPLETED]"
line += f"\n ID: {clink.clink_id}"
return line
def register(mcp: FastMCP) -> None:
"""Register clink messaging tools with the MCP server."""
@mcp.tool()
async def send_clink(
group: str,
content: str,
for_recipient: Optional[str] = None,
) -> str:
"""Send a clink to a Clink group. Clinks are visible to all group members. Optionally address the clink to a specific recipient.
Args:
group: The group slug (e.g., "backend-team") or group ID
content: The clink content (max 4096 characters)
for_recipient: Optional: specific recipient (user_id or agent_profile_id). If set, only they see it in their inbox.
"""
try:
if not group or not content:
return "Please provide both a group and content for the clink."
if len(content) > 4096:
return f"Clink too long ({len(content)} chars). Maximum is 4096 characters."
# Resolve group slug to ID if needed
group_id = await client.resolve_group(group)
clink = await client.send_clink(
group_id,
content,
for_recipient=for_recipient,
)
timestamp = clink.created_at[:19].replace("T", " ")
output = f"Clink sent successfully!\n\nSent at: {timestamp}\nTo group: {group}"
if for_recipient:
output += f"\nFor: {for_recipient}"
output += f"\nStatus: {clink.status}"
preview = content[:200] + ("..." if len(content) > 200 else "")
output += f"\nContent: {preview}"
return output
except ClinkApiError as e:
if e.status_code == 403:
return "You don't have permission to send clinks to this group. Make sure you're a member."
if e.status_code == 404:
return f'Group not found: "{group}". Use list_groups to see available groups.'
return f"Error: {e}"
@mcp.tool()
async def get_clinks(
group: Optional[str] = None,
since: Optional[str] = None,
limit: Optional[int] = None,
unread_only: Optional[bool] = None,
mark_read: Optional[bool] = None,
) -> str:
"""Get clinks from Clink groups. Can filter by group, time range, and limit. Use check_inbox for a quick unread check.
Args:
group: Filter clinks to a specific group by slug (e.g., "backend-team") or ID. If not provided, returns clinks from all your groups.
since: ISO timestamp. Only return clinks after this time. Example: 2024-01-15T10:00:00Z
limit: Maximum number of clinks to return. Default 50, max 100.
unread_only: If true, only return unread clinks. Default: false.
mark_read: If true, mark returned clinks as read. Default: false.
"""
try:
# Resolve group slug to ID if provided
group_id = None
if group:
group_id = await client.resolve_group(group)
clinks = await client.get_clinks(
group_id=group_id,
since=since,
limit=limit,
unread_only=unread_only,
mark_read=mark_read,
)
if not clinks:
msg = "No clinks found"
if group:
msg += f' in group "{group}"'
if since:
msg += f" since {since}"
if unread_only:
msg += " (unread only)"
return msg + "."
# Group clinks by group for display
by_group: dict[str, list[Clink]] = {}
for clink in clinks:
key = clink.group_slug or clink.group_name or clink.group_id
if key not in by_group:
by_group[key] = []
by_group[key].append(clink)
output = f"{len(clinks)} clink(s) found"
if mark_read:
output += " (marked as read)"
output += ":\n\n"
for grp, group_clinks in by_group.items():
output += f"--- {grp} ({len(group_clinks)}) ---\n\n"
for clink in group_clinks:
output += _format_clink(clink) + "\n\n"
return output.strip()
except ClinkApiError as e:
if e.status_code == 404:
return f'Group not found: "{group}". Use list_groups to see available groups.'
return f"Error: {e}"
@mcp.tool()
async def check_inbox(
status: Optional[str] = None,
for_me: Optional[bool] = None,
claim: Optional[bool] = None,
limit: Optional[int] = None,
) -> str:
"""Check your inbox for pending clinks. By default shows clinks addressed to you or unaddressed. Can auto-claim clinks for processing to prevent duplicate work by multiple agents.
Args:
status: Filter by clink status. Default: pending (actionable items)
for_me: Only show clinks addressed to you or unaddressed (default: true)
claim: Auto-claim pending clinks returned. Use this when you intend to process the clinks immediately.
limit: Maximum clinks to return (default: 10)
"""
try:
response = await client.check_inbox(
status=status or "pending",
for_me=for_me if for_me is not None else True,
claim=claim or False,
limit=limit or 10,
)
clinks = response["clinks"]
count = response["count"]
claimed_count = response.get("claimed_count")
if count == 0:
status_text = status or "pending"
if status_text == "pending":
return "No pending clinks in your inbox."
return f"No {status_text} clinks found."
# Group by sender and group for summary
by_sender: dict[str, int] = {}
by_group: dict[str, int] = {}
for clink in clinks:
by_sender[clink.sender_name] = by_sender.get(clink.sender_name, 0) + 1
group_key = clink.group_slug or clink.group_name or clink.group_id
by_group[group_key] = by_group.get(group_key, 0) + 1
sender_summary = ", ".join(f"{name} ({c})" for name, c in by_sender.items())
group_summary = ", ".join(f"{name}: {c}" for name, c in by_group.items())
# Format clinks
formatted_clinks = [_format_clink_inbox(c) for c in clinks]
header = f"{count} clink(s)"
if claim and claimed_count and claimed_count > 0:
header += f" ({claimed_count} claimed for processing)"
output = f"{header}\n\nFrom: {sender_summary}\nIn: {group_summary}\n\nClinks:\n"
output += "\n\n".join(formatted_clinks)
if claim and claimed_count and claimed_count > 0:
output += f"\n\nClaimed {claimed_count} clink(s). Use complete_clink when done processing."
elif not claim and any(c.status == "pending" for c in clinks):
output += "\n\nTip: Use claim=true to claim clinks, or use claim_clink on specific clink IDs."
return output
except ClinkApiError as e:
return f"Error: {e}"