Linear MCP Server
- linear_mcp
from typing import Dict, List, Optional
import httpx
class LinearMCPClient:
API_URL = "https://api.linear.app/graphql"
def __init__(self, api_key: str):
"""Initialize the Linear client.
Args:
api_key: Linear API key
"""
self.api_key = api_key
self.client = httpx.Client(
headers={
"Authorization": api_key,
"Content-Type": "application/json",
}
)
@classmethod
def create(cls, api_key: str) -> "LinearMCPClient":
"""Create a new Linear client instance.
Args:
api_key: Linear API key
Returns:
LinearMCPClient instance
"""
return cls(api_key)
def _execute_query(self, query: str, variables: Optional[Dict] = None) -> Dict:
"""Execute a GraphQL query.
Args:
query: GraphQL query
variables: Optional query variables
Returns:
Query response
"""
response = self.client.post(
self.API_URL,
json={"query": query, "variables": variables or {}},
)
response.raise_for_status()
return response.json()
def list_issues(self, limit: int = 50) -> List[Dict]:
"""List recent issues.
Args:
limit: Maximum number of issues to return
Returns:
List of issues with basic details
"""
query = """
query ListIssues($first: Int!) {
issues(first: $first, orderBy: updatedAt) {
nodes {
id
identifier
title
description
priority
state {
id
name
}
assignee {
id
name
}
team {
id
name
}
url
}
}
}
"""
result = self._execute_query(query, {"first": limit})
issues = result.get("data", {}).get("issues", {}).get("nodes", [])
return [
{
"uri": f"linear-issue:///{issue['id']}",
"mimeType": "application/json",
"name": issue["title"],
"description": f"Linear issue {issue['identifier']}: {issue['title']}",
"metadata": {
"identifier": issue["identifier"],
"priority": issue.get("priority"),
"status": issue.get("state", {}).get("name"),
"assignee": issue.get("assignee", {}).get("name"),
"team": issue.get("team", {}).get("name"),
},
}
for issue in issues
]
def get_issue(self, issue_id: str) -> Dict:
"""Get details for a specific issue.
Args:
issue_id: Issue ID
Returns:
Issue details
"""
query = """
query GetIssue($id: ID!) {
issue(id: $id) {
id
identifier
title
description
priority
state {
name
}
assignee {
name
}
team {
name
}
url
}
}
"""
result = self._execute_query(query, {"id": issue_id})
issue = result.get("data", {}).get("issue", {})
if not issue:
raise ValueError(f"Issue {issue_id} not found")
return {
"id": issue["id"],
"identifier": issue["identifier"],
"title": issue["title"],
"description": issue["description"],
"priority": issue.get("priority"),
"status": issue.get("state", {}).get("name"),
"assignee": issue.get("assignee", {}).get("name"),
"team": issue.get("team", {}).get("name"),
"url": issue["url"],
}
def create_issue(
self,
title: str,
team_id: str,
description: Optional[str] = None,
priority: Optional[int] = None,
status: Optional[str] = None,
) -> Optional[Dict]:
"""Create a new Linear issue.
Args:
title: Issue title
team_id: Team ID
description: Optional issue description (markdown supported)
priority: Optional priority level (0-4)
status: Optional initial status name
Returns:
Created issue details if successful, None otherwise
"""
# First get the state ID if status is provided
state_id = None
if status:
state_query = """
query States($teamId: String!) {
team(id: $teamId) {
states {
nodes {
id
name
}
}
}
}
"""
state_result = self._execute_query(state_query, {"teamId": team_id})
states = (
state_result.get("data", {})
.get("team", {})
.get("states", {})
.get("nodes", [])
)
for state in states:
if state["name"].lower() == status.lower():
state_id = state["id"]
break
mutation = """
mutation CreateIssue($input: IssueCreateInput!) {
issueCreate(input: $input) {
success
issue {
id
identifier
title
description
priority
state {
id
name
}
team {
id
key
}
url
createdAt
}
}
}
"""
variables = {
"input": {
"title": title,
"teamId": team_id,
"description": description,
"priority": priority,
"stateId": state_id,
}
}
result = self._execute_query(mutation, variables)
issue_result = result.get("data", {}).get("issueCreate", {})
if not issue_result.get("success"):
return None
issue = issue_result.get("issue", {})
return {
"id": issue["id"],
"identifier": issue["identifier"],
"title": issue["title"],
"description": issue["description"],
"priority": issue.get("priority"),
"state": issue["state"]["name"] if issue.get("state") else None,
"team": issue["team"]["key"],
"url": issue["url"],
"created_at": issue["createdAt"],
}
def update_issue(
self,
issue_id: str,
title: Optional[str] = None,
description: Optional[str] = None,
priority: Optional[int] = None,
status: Optional[str] = None,
) -> Optional[Dict]:
"""Update an existing Linear issue.
Args:
issue_id: Issue ID to update
title: Optional new title
description: Optional new description
priority: Optional new priority (0-4)
status: Optional new status name
Returns:
Updated issue details if successful, None otherwise
"""
# First get the state ID if status is provided
state_id = None
if status:
# Get the team ID first
issue_query = """
query Issue($id: String!) {
issue(id: $id) {
team {
id
}
}
}
"""
issue_result = self._execute_query(issue_query, {"id": issue_id})
team_id = (
issue_result.get("data", {}).get("issue", {}).get("team", {}).get("id")
)
if team_id:
state_query = """
query States($teamId: String!) {
team(id: $teamId) {
states {
nodes {
id
name
}
}
}
}
"""
state_result = self._execute_query(state_query, {"teamId": team_id})
states = (
state_result.get("data", {})
.get("team", {})
.get("states", {})
.get("nodes", [])
)
for state in states:
if state["name"].lower() == status.lower():
state_id = state["id"]
break
mutation = """
mutation UpdateIssue($id: String!, $input: IssueUpdateInput!) {
issueUpdate(id: $id, input: $input) {
success
issue {
id
identifier
title
description
priority
state {
name
}
team {
key
}
url
updatedAt
}
}
}
"""
update_input = {}
if title is not None:
update_input["title"] = title
if description is not None:
update_input["description"] = description
if priority is not None:
update_input["priority"] = priority
if state_id is not None:
update_input["stateId"] = state_id
variables = {
"id": issue_id,
"input": update_input,
}
result = self._execute_query(mutation, variables)
issue_result = result.get("data", {}).get("issueUpdate", {})
if not issue_result.get("success"):
return None
issue = issue_result.get("issue", {})
return {
"id": issue["id"],
"identifier": issue["identifier"],
"title": issue["title"],
"description": issue["description"],
"priority": issue.get("priority"),
"state": issue["state"]["name"] if issue.get("state") else None,
"team": issue["team"]["key"],
"url": issue["url"],
"updated_at": issue["updatedAt"],
}
def search_issues(
self,
query: Optional[str] = None,
team_id: Optional[str] = None,
status: Optional[str] = None,
assignee_id: Optional[str] = None,
labels: Optional[List[str]] = None,
priority: Optional[int] = None,
limit: int = 10,
) -> List[Dict]:
"""Search issues with flexible filtering.
Args:
query: Optional text to search in title/description
team_id: Optional team ID or key filter
status: Optional status name filter
assignee_id: Optional assignee ID filter
labels: Optional list of label names to filter by
priority: Optional priority level filter (0-4)
limit: Maximum number of results to return
Returns:
List of matching issues
"""
# If team_id looks like a team key (string without dashes), get the actual team ID
if team_id and "-" not in team_id:
team_query = """
query Team($key: String!) {
team(key: $key) {
id
}
}
"""
team_result = self._execute_query(team_query, {"key": team_id})
team_data = team_result.get("data", {}).get("team", {})
if team_data:
team_id = team_data.get("id")
gql_query = """
query SearchIssues($first: Int!, $filter: IssueFilter) {
issues(first: $first, filter: $filter) {
nodes {
id
identifier
title
description
priority
state {
name
}
assignee {
id
name
}
team {
id
key
}
labels {
nodes {
name
}
}
url
createdAt
}
}
}
"""
filter_conditions = {}
if query:
filter_conditions["or"] = [
{"title": {"contains": query}},
{"description": {"contains": query}},
]
if team_id:
filter_conditions["team"] = {"id": {"eq": team_id}}
if status:
filter_conditions["state"] = {"name": {"eq": status}}
if assignee_id:
filter_conditions["assignee"] = {"id": {"eq": assignee_id}}
if labels:
filter_conditions["labels"] = {"some": {"name": {"in": labels}}}
if priority is not None:
filter_conditions["priority"] = {"eq": priority}
variables = {
"first": limit,
"filter": filter_conditions,
}
try:
result = self._execute_query(gql_query, variables)
data = result.get("data", {})
if not data:
error = result.get("errors", [{}])[0].get("message", "Unknown error")
raise Exception(error)
issues = data.get("issues", {}).get("nodes", [])
return [
{
"id": issue["id"],
"identifier": issue["identifier"],
"title": issue["title"],
"description": issue.get("description"),
"priority": issue.get("priority"),
"state": issue.get("state", {}).get("name"),
"assignee": {
"id": issue["assignee"]["id"],
"name": issue["assignee"]["name"],
}
if issue.get("assignee")
else None,
"team": {
"id": issue["team"]["id"],
"key": issue["team"]["key"],
}
if issue.get("team")
else None,
"labels": [
label["name"]
for label in issue.get("labels", {}).get("nodes", [])
]
if issue.get("labels")
else [],
"url": issue["url"],
"created_at": issue["createdAt"],
}
for issue in issues
]
except Exception as e:
raise Exception(f"Search failed: {str(e)}")
def get_user_issues(
self,
user_id: Optional[str] = None,
include_archived: bool = False,
limit: int = 50,
) -> List[Dict]:
"""Get issues assigned to a user.
Args:
user_id: Optional user ID (omit for authenticated user)
include_archived: Whether to include archived issues
limit: Maximum number of results to return
Returns:
List of issues assigned to the user
"""
if user_id:
query = """
query UserIssues($userId: String!, $first: Int!, $includeArchived: Boolean) {
user(id: $userId) {
assignedIssues(first: $first, includeArchived: $includeArchived) {
nodes {
id
identifier
title
description
priority
state {
name
}
team {
key
}
url
createdAt
archivedAt
}
}
}
}
"""
variables = {
"userId": user_id,
"first": limit,
"includeArchived": include_archived,
}
result = self._execute_query(query, variables)
issues = (
result.get("data", {})
.get("user", {})
.get("assignedIssues", {})
.get("nodes", [])
)
else:
query = """
query ViewerIssues($first: Int!, $includeArchived: Boolean) {
viewer {
assignedIssues(first: $first, includeArchived: $includeArchived) {
nodes {
id
identifier
title
description
priority
state {
name
}
team {
key
}
url
createdAt
archivedAt
}
}
}
}
"""
variables = {
"first": limit,
"includeArchived": include_archived,
}
result = self._execute_query(query, variables)
issues = (
result.get("data", {})
.get("viewer", {})
.get("assignedIssues", {})
.get("nodes", [])
)
return [
{
"id": issue["id"],
"identifier": issue["identifier"],
"title": issue["title"],
"description": issue.get("description"),
"priority": issue.get("priority"),
"state": issue["state"]["name"] if issue.get("state") else None,
"team": issue["team"]["key"],
"url": issue["url"],
"created_at": issue["createdAt"],
"archived_at": issue.get("archivedAt"),
}
for issue in issues
]
def add_comment(
self,
issue_id: str,
body: str,
create_as_user: Optional[str] = None,
display_icon_url: Optional[str] = None,
) -> Optional[Dict]:
"""Add a comment to an issue.
Args:
issue_id: Issue ID to comment on
body: Comment text (markdown supported)
create_as_user: Optional custom username
display_icon_url: Optional custom avatar URL
Returns:
Created comment details if successful, None otherwise
"""
mutation = """
mutation CreateComment($input: CommentCreateInput!) {
commentCreate(input: $input) {
success
comment {
id
body
user {
name
}
createdAt
}
}
}
"""
variables = {
"input": {
"issueId": issue_id,
"body": body,
}
}
if create_as_user:
variables["input"]["createAsUser"] = create_as_user
if display_icon_url:
variables["input"]["displayIconUrl"] = display_icon_url
result = self._execute_query(mutation, variables)
comment_result = result.get("data", {}).get("commentCreate", {})
if not comment_result.get("success"):
return None
comment = comment_result.get("comment", {})
return {
"id": comment["id"],
"body": comment["body"],
"user": comment["user"]["name"] if comment.get("user") else None,
"created_at": comment["createdAt"],
}
def get_team_issues(self, team_id: str) -> List[Dict]:
"""Get issues for a specific team.
Args:
team_id: Team ID
Returns:
List of team issues
"""
query = """
query GetTeamIssues($teamId: ID!) {
team(id: $teamId) {
issues {
nodes {
id
identifier
title
description
priority
state {
name
}
assignee {
name
}
url
}
}
}
}
"""
result = self._execute_query(query, {"teamId": team_id})
if not result.get("data", {}).get("team"):
raise ValueError(f"Team {team_id} not found")
issues = (
result.get("data", {}).get("team", {}).get("issues", {}).get("nodes", [])
)
return [
{
"id": issue["id"],
"identifier": issue["identifier"],
"title": issue["title"],
"description": issue.get("description"),
"priority": issue.get("priority"),
"status": issue.get("state", {}).get("name"),
"assignee": issue.get("assignee", {}).get("name"),
"url": issue["url"],
}
for issue in issues
]
def get_viewer(self) -> Dict:
"""Get information about the authenticated user.
Returns:
User information including teams and organization
"""
query = """
query {
viewer {
id
name
email
admin
teams {
nodes {
id
name
key
}
}
}
organization {
id
name
urlKey
}
}
"""
result = self._execute_query(query)
viewer = result.get("data", {}).get("viewer", {})
organization = result.get("data", {}).get("organization", {})
return {
"id": viewer["id"],
"name": viewer["name"],
"email": viewer.get("email"),
"admin": viewer.get("admin"),
"teams": [
{"id": team["id"], "name": team["name"], "key": team["key"]}
for team in viewer.get("teams", {}).get("nodes", [])
],
"organization": {
"id": organization["id"],
"name": organization["name"],
"urlKey": organization["urlKey"],
},
}
def get_organization(self) -> Dict:
"""Get information about the organization.
Returns:
Organization information including teams and users
"""
query = """
query {
organization {
id
name
urlKey
teams {
nodes {
id
name
key
}
}
users {
nodes {
id
name
email
admin
active
}
}
}
}
"""
result = self._execute_query(query)
organization = result.get("data", {}).get("organization", {})
return {
"id": organization["id"],
"name": organization["name"],
"urlKey": organization["urlKey"],
"teams": [
{"id": team["id"], "name": team["name"], "key": team["key"]}
for team in organization.get("teams", {}).get("nodes", [])
],
"users": [
{
"id": user["id"],
"name": user["name"],
"email": user.get("email"),
"admin": user.get("admin"),
"active": user.get("active"),
}
for user in organization.get("users", {}).get("nodes", [])
],
}