azure_devops_client.py•30.2 kB
import os
from msrest.authentication import BasicAuthentication
from azure.devops.connection import Connection
from azure.devops.v7_1.work_item_tracking.models import JsonPatchOperation, Wiql
from azure.devops.v7_1.wiki.models import WikiCreateParametersV2, WikiPagesBatchRequest
from azure.devops.v7_1.graph.graph_client import GraphClient
class AzureDevOpsClient:
def __init__(self):
self.org_url = os.getenv("AZURE_DEVOPS_ORG_URL")
self.pat = os.getenv("AZURE_DEVOPS_PAT")
self.project_context = None
if not self.org_url or not self.pat:
raise ValueError("AZURE_DEVOPS_ORG_URL and AZURE_DEVOPS_PAT environment variables must be set.")
self.credentials = BasicAuthentication('', self.pat)
self.connection = Connection(base_url=self.org_url, creds=self.credentials)
# Initialize clients lazily to avoid connection issues during server startup
self._core_client = None
self._work_item_tracking_client = None
self._wiki_client = None
self._git_client = None
self._graph_client = None
@property
def core_client(self):
if self._core_client is None:
self._core_client = self.connection.clients.get_core_client()
return self._core_client
@property
def work_item_tracking_client(self):
if self._work_item_tracking_client is None:
self._work_item_tracking_client = self.connection.clients.get_work_item_tracking_client()
return self._work_item_tracking_client
@property
def wiki_client(self):
if self._wiki_client is None:
self._wiki_client = self.connection.clients.get_wiki_client()
return self._wiki_client
@property
def git_client(self):
if self._git_client is None:
self._git_client = self.connection.clients.get_git_client()
return self._git_client
@property
def graph_client(self):
if self._graph_client is None:
self._graph_client = self.connection.clients.get_graph_client()
return self._graph_client
def list_users(self):
return self.graph_client.list_users()
def set_project_context(self, project):
self.project_context = project
return {"message": f"Project context set to '{project}'."}
def clear_project_context(self):
self.project_context = None
return {"message": "Project context cleared."}
def get_projects(self):
return self.core_client.get_projects()
def create_work_item(self, project, work_item_type, title, description, relations=None):
patch_document = [
JsonPatchOperation(
op="add",
path="/fields/System.Title",
value=title
),
JsonPatchOperation(
op="add",
path="/fields/System.Description",
value=description
)
]
if relations:
for relation in relations:
patch_document.append(
JsonPatchOperation(
op="add",
path="/relations/-",
value={
"rel": relation["rel"],
"url": relation["url"]
}
)
)
return self.work_item_tracking_client.create_work_item(
document=patch_document,
project=project,
type=work_item_type
)
def get_work_item(self, work_item_id, expand=None):
work_item = self.work_item_tracking_client.get_work_item(id=work_item_id, expand=expand)
result = {
"id": work_item.id,
"url": work_item.url,
"fields": work_item.fields
}
if work_item.relations:
result["relations"] = [
{
"rel": r.rel,
"url": r.url,
"attributes": r.attributes
} for r in work_item.relations
]
return result
def update_work_item(self, work_item_id, updates, relations=None):
patch_document = [
JsonPatchOperation(
op="add",
path=f"/fields/{field}",
value=value
) for field, value in updates.items()
]
if relations:
for relation in relations:
patch_document.append(
JsonPatchOperation(
op="add",
path="/relations/-",
value={
"rel": relation["rel"],
"url": relation["url"]
}
)
)
return self.work_item_tracking_client.update_work_item(
document=patch_document,
id=work_item_id
)
def delete_work_item(self, work_item_id):
return self.work_item_tracking_client.delete_work_item(id=work_item_id)
def search_work_items(self, project, wiql_query):
# Add project filter to the WIQL query if not already present
if "[System.TeamProject]" not in wiql_query and "WHERE" in wiql_query.upper():
# Insert project filter into existing WHERE clause
wiql_query = wiql_query.replace(" WHERE ", f" WHERE [System.TeamProject] = '{project}' AND ")
elif "[System.TeamProject]" not in wiql_query:
# Add WHERE clause with project filter
wiql_query += f" WHERE [System.TeamProject] = '{project}'"
wiql = Wiql(query=wiql_query)
# Call query_by_wiql without the project parameter
query_result = self.work_item_tracking_client.query_by_wiql(wiql)
if query_result.work_items:
work_item_ids = [item.id for item in query_result.work_items]
work_items = self.work_item_tracking_client.get_work_items(ids=work_item_ids)
return [
{
"id": wi.id,
"title": wi.fields.get("System.Title"),
"state": wi.fields.get("System.State"),
"url": wi.url,
}
for wi in work_items
]
else:
return []
def get_work_item_comments(self, work_item_id, project=None, top=None, continuation_token=None, include_deleted=False, expand=None, order=None):
"""
Get comments for a specific work item.
Args:
work_item_id (int): The ID of the work item to get comments for
project (str, optional): Project name or ID. If not provided, uses project_context
top (int, optional): Maximum number of comments to return (pagination)
continuation_token (str, optional): Token for getting next page of results
include_deleted (bool): Whether to include deleted comments (default: False)
expand (str, optional): Additional data retrieval options for work item comments
order (str, optional): Order in which comments should be returned
Returns:
dict: Contains comments list and pagination info
"""
# Use provided project or fallback to context
project_name = project or self.project_context
if not project_name:
raise ValueError("Project must be specified either as parameter or set in project context")
# Get comments from Azure DevOps API
comment_list = self.work_item_tracking_client.get_comments(
project=project_name,
work_item_id=work_item_id,
top=top,
continuation_token=continuation_token,
include_deleted=include_deleted,
expand=expand,
order=order
)
# Format the response
result = {
"total_count": comment_list.total_count if hasattr(comment_list, 'total_count') else None,
"continuation_token": comment_list.continuation_token if hasattr(comment_list, 'continuation_token') else None,
"comments": []
}
if comment_list.comments:
for comment in comment_list.comments:
formatted_comment = {
"id": comment.id,
"text": comment.text,
"created_by": {
"id": comment.created_by.id if comment.created_by else None,
"display_name": comment.created_by.display_name if comment.created_by else None,
"unique_name": comment.created_by.unique_name if comment.created_by else None,
"image_url": comment.created_by.image_url if comment.created_by else None
} if comment.created_by else None,
"created_date": comment.created_date.isoformat() if comment.created_date else None,
"modified_by": {
"id": comment.modified_by.id if comment.modified_by else None,
"display_name": comment.modified_by.display_name if comment.modified_by else None,
"unique_name": comment.modified_by.unique_name if comment.modified_by else None,
"image_url": comment.modified_by.image_url if comment.modified_by else None
} if comment.modified_by else None,
"modified_date": comment.modified_date.isoformat() if comment.modified_date else None,
"url": comment.url if hasattr(comment, 'url') else None,
"version": comment.version if hasattr(comment, 'version') else None
}
result["comments"].append(formatted_comment)
return result
def create_wiki_page(self, project, wiki_identifier, path, content):
parameters = {
"content": content
}
return self.wiki_client.create_or_update_page(
project=project,
wiki_identifier=wiki_identifier,
path=path,
parameters=parameters,
version=None
)
def get_wiki_page(self, project, wiki_identifier, path):
return self.wiki_client.get_page(
project=project,
wiki_identifier=wiki_identifier,
path=path,
include_content=True
)
def update_wiki_page(self, project, wiki_identifier, path, content):
page = self.wiki_client.get_page(
project=project,
wiki_identifier=wiki_identifier,
path=path
)
# Try to get ETag from various possible locations
etag = None
if hasattr(page, 'eTag'):
etag = page.eTag
elif hasattr(page, 'etag'):
etag = page.etag
elif hasattr(page, 'e_tag'):
etag = page.e_tag
elif hasattr(page, '_etag'):
etag = page._etag
elif hasattr(page, 'page') and hasattr(page.page, 'eTag'):
etag = page.page.eTag
elif hasattr(page, 'page') and hasattr(page.page, 'etag'):
etag = page.page.etag
elif hasattr(page, 'page') and hasattr(page.page, 'e_tag'):
etag = page.page.e_tag
parameters = {
"content": content
}
return self.wiki_client.create_or_update_page(
project=project,
wiki_identifier=wiki_identifier,
path=path,
parameters=parameters,
version=etag
)
def update_wiki_page_safe(self, project, wiki_identifier, path, content, max_retries=3):
"""
Safely updates a wiki page with automatic retry on version conflicts.
"""
for attempt in range(max_retries):
try:
# Get the latest version of the page
page = self.wiki_client.get_page(
project=project,
wiki_identifier=wiki_identifier,
path=path
)
# Try to get ETag from various possible locations
etag = None
if hasattr(page, 'eTag'):
etag = page.eTag
elif hasattr(page, 'etag'):
etag = page.etag
elif hasattr(page, 'e_tag'):
etag = page.e_tag
elif hasattr(page, '_etag'):
etag = page._etag
elif hasattr(page, 'page') and hasattr(page.page, 'eTag'):
etag = page.page.eTag
elif hasattr(page, 'page') and hasattr(page.page, 'etag'):
etag = page.page.etag
elif hasattr(page, 'page') and hasattr(page.page, 'e_tag'):
etag = page.page.e_tag
parameters = {
"content": content
}
return self.wiki_client.create_or_update_page(
project=project,
wiki_identifier=wiki_identifier,
path=path,
parameters=parameters,
version=etag
)
except Exception as e:
if "version" in str(e).lower() and attempt < max_retries - 1:
# Version conflict, retry with fresh version
continue
else:
raise e
raise Exception(f"Failed to update wiki page after {max_retries} attempts due to version conflicts")
def create_or_update_wiki_page_smart(self, project, wiki_identifier, path, content):
"""
Creates a new wiki page or updates existing one intelligently.
"""
try:
# Try to update first
return self.update_wiki_page_safe(project, wiki_identifier, path, content)
except Exception as e:
if "not found" in str(e).lower() or "404" in str(e):
# Page doesn't exist, create it
return self.create_wiki_page(project, wiki_identifier, path, content)
else:
raise e
def search_wiki_pages(self, project, wiki_identifier, search_term):
"""
Search for wiki pages by title or content.
"""
pages = self.list_wiki_pages(project, wiki_identifier)
matching_pages = []
for page_info in pages:
try:
# Get page content to search in
page = self.wiki_client.get_page(
project=project,
wiki_identifier=wiki_identifier,
path=page_info["path"],
include_content=True
)
# Search in path (title) and content
if (search_term.lower() in page_info["path"].lower() or
(page.page.content and search_term.lower() in page.page.content.lower())):
matching_pages.append({
"path": page_info["path"],
"url": page_info["url"],
"content_preview": page.page.content[:200] + "..." if page.page.content and len(page.page.content) > 200 else page.page.content
})
except Exception:
# Skip pages that can't be accessed
continue
return matching_pages
def get_wiki_page_tree(self, project, wiki_identifier):
"""
Get hierarchical structure of wiki pages.
"""
pages = self.list_wiki_pages(project, wiki_identifier)
# Organize pages into a tree structure
tree = {}
for page in pages:
path_parts = page["path"].strip("/").split("/")
current_level = tree
for i, part in enumerate(path_parts):
if part not in current_level:
current_level[part] = {
"children": {},
"info": None
}
if i == len(path_parts) - 1:
# This is the final part, store page info
current_level[part]["info"] = page
current_level = current_level[part]["children"]
return tree
def find_wiki_by_name(self, project, partial_name):
"""
Find wikis by partial name match.
"""
wikis = self.get_wikis(project)
matching_wikis = []
for wiki in wikis:
if partial_name.lower() in wiki.name.lower():
matching_wikis.append({
"id": wiki.id,
"name": wiki.name,
"url": wiki.url,
"remote_url": wiki.remote_url,
})
return matching_wikis
def get_wiki_page_by_title(self, project, wiki_identifier, title):
"""
Find wiki page by title instead of exact path.
"""
pages = self.list_wiki_pages(project, wiki_identifier)
for page in pages:
# Extract title from path (last part after /)
page_title = page["path"].split("/")[-1].replace("-", " ").replace("_", " ")
if title.lower() in page_title.lower() or page_title.lower() in title.lower():
try:
full_page = self.get_wiki_page(project, wiki_identifier, page["path"])
return full_page
except Exception:
continue
return None
def list_all_wikis_in_organization(self):
"""
List all wikis across all projects in the organization.
"""
projects = self.get_projects()
all_wikis = []
for project in projects:
try:
wikis = self.get_wikis(project.name)
for wiki in wikis:
all_wikis.append({
"project": project.name,
"id": wiki.id,
"name": wiki.name,
"url": wiki.url,
"remote_url": wiki.remote_url,
})
except Exception:
# Skip projects where we can't access wikis
continue
return all_wikis
def get_recent_wiki_pages(self, project, wiki_identifier, limit=10):
"""
Get recently modified wiki pages.
"""
pages = self.list_wiki_pages(project, wiki_identifier)
# Sort by view stats if available (proxy for recent activity)
pages_with_activity = []
for page in pages:
if page.get("view_stats"):
latest_activity = max(page["view_stats"], key=lambda x: x["date"]) if page["view_stats"] else None
pages_with_activity.append({
**page,
"latest_activity": latest_activity
})
else:
pages_with_activity.append({
**page,
"latest_activity": None
})
# Sort by latest activity date
pages_with_activity.sort(
key=lambda x: x["latest_activity"]["date"] if x["latest_activity"] else "1900-01-01",
reverse=True
)
return pages_with_activity[:limit]
def get_wiki_page_suggestions(self, project, wiki_identifier, partial_input):
"""
Get page suggestions based on partial input.
"""
pages = self.list_wiki_pages(project, wiki_identifier)
suggestions = []
for page in pages:
path_lower = page["path"].lower()
input_lower = partial_input.lower()
# Score based on how well the input matches
score = 0
if path_lower.startswith(input_lower):
score = 100 # Exact prefix match
elif input_lower in path_lower:
score = 50 # Contains match
elif any(part.startswith(input_lower) for part in path_lower.split("/")):
score = 25 # Part starts with input
if score > 0:
suggestions.append({
**page,
"match_score": score
})
# Sort by score and return top suggestions
suggestions.sort(key=lambda x: x["match_score"], reverse=True)
return suggestions[:10]
def create_wiki_pages_batch(self, project, wiki_identifier, pages_data):
"""
Create multiple wiki pages at once.
pages_data: list of {"path": str, "content": str}
"""
results = []
for page_data in pages_data:
try:
result = self.create_wiki_page(
project=project,
wiki_identifier=wiki_identifier,
path=page_data["path"],
content=page_data["content"]
)
results.append({
"path": page_data["path"],
"status": "success",
"result": result
})
except Exception as e:
results.append({
"path": page_data["path"],
"status": "error",
"error": str(e)
})
return results
def delete_wiki_page(self, project, wiki_identifier, path):
return self.wiki_client.delete_page(
project=project,
wiki_identifier=wiki_identifier,
path=path
)
def move_wiki_page(self, project, wiki_identifier, from_path, to_path):
"""
Move a wiki page from one location to another atomically.
This involves getting the source content, creating at target, and deleting original.
"""
try:
# Step 1: Get the source page content
source_page = self.wiki_client.get_page(
project=project,
wiki_identifier=wiki_identifier,
path=from_path,
include_content=True
)
if not source_page or not source_page.page:
raise Exception(f"Source page '{from_path}' not found")
source_content = source_page.page.content or ""
# Step 2: Create the page at the target location
try:
target_page = self.create_wiki_page(
project=project,
wiki_identifier=wiki_identifier,
path=to_path,
content=source_content
)
except Exception as create_error:
raise Exception(f"Failed to create page at target location '{to_path}': {str(create_error)}")
# Step 3: Delete the original page (only if creation succeeded)
try:
self.delete_wiki_page(
project=project,
wiki_identifier=wiki_identifier,
path=from_path
)
except Exception as delete_error:
# If deletion fails, we should warn but not fail the whole operation
# since the content is now at the target location
return {
"status": "partial_success",
"message": f"Page moved to '{to_path}' but failed to delete original at '{from_path}': {str(delete_error)}",
"from_path": from_path,
"to_path": to_path,
"target_page": {
"path": target_page.page.path,
"url": target_page.page.url
},
"warning": f"Original page at '{from_path}' still exists and may need manual deletion"
}
# Success - both operations completed
return {
"status": "success",
"message": f"Page successfully moved from '{from_path}' to '{to_path}'",
"from_path": from_path,
"to_path": to_path,
"target_page": {
"path": target_page.page.path,
"url": target_page.page.url
}
}
except Exception as e:
# Complete failure - operation couldn't proceed
raise Exception(f"Failed to move wiki page from '{from_path}' to '{to_path}': {str(e)}")
def list_wiki_pages(self, project, wiki_identifier):
pages_batch_request = WikiPagesBatchRequest(
top=100 # Retrieve up to 100 pages
)
pages = self.wiki_client.get_pages_batch(
project=project,
wiki_identifier=wiki_identifier,
pages_batch_request=pages_batch_request
)
return [
{
"path": page.path,
"url": getattr(page, 'url', ''), # Handle missing url attribute
"view_stats": [
{"date": stat.date.isoformat(), "count": stat.count}
for stat in page.view_stats
] if page.view_stats else []
}
for page in pages
]
def get_wikis(self, project):
return self.wiki_client.get_all_wikis(project=project)
def create_wiki(self, project, name):
project_object = self.core_client.get_project(project)
wiki_params = WikiCreateParametersV2(name=name, type='projectWiki', project_id=project_object.id)
return self.wiki_client.create_wiki(wiki_create_params=wiki_params, project=project)
def list_repositories(self, project):
return self.git_client.get_repositories(project=project)
def list_files(self, project, repository_id, path):
return self.git_client.get_items(
project=project,
repository_id=repository_id,
scope_path=path,
recursion_level='full'
)
def get_file_content(self, project, repository_id, path):
return self.git_client.get_item_text(
project=project,
repository_id=repository_id,
path=path
)
def get_work_item_types(self, project):
"""
Get all work item types available in a project.
"""
return self.work_item_tracking_client.get_work_item_types(project=project)
def get_work_item_states(self, project, work_item_type):
"""
Get all possible states for a specific work item type.
"""
work_item_type_obj = self.work_item_tracking_client.get_work_item_type(
project=project,
type=work_item_type
)
# Extract states from the work item type definition
states = []
if hasattr(work_item_type_obj, 'states') and work_item_type_obj.states:
states = [
{
"name": state.name,
"color": getattr(state, 'color', None),
"category": getattr(state, 'category', None)
}
for state in work_item_type_obj.states
]
return states
def get_work_item_fields(self, project):
"""
Get all work item fields available in a project.
"""
fields = self.work_item_tracking_client.get_fields(project=project)
return [
{
"name": field.name,
"reference_name": field.reference_name,
"type": getattr(field, 'type', None),
"description": getattr(field, 'description', None),
"read_only": getattr(field, 'read_only', False),
"can_sort_by": getattr(field, 'can_sort_by', False)
}
for field in fields
]
def get_work_item_transitions(self, project, work_item_type, from_state):
"""
Get valid state transitions for a work item type from a specific state.
"""
try:
# This requires calling the process configuration API
# which might not be directly available in the Python SDK
# We'll use the work item type to get transition info
work_item_type_obj = self.work_item_tracking_client.get_work_item_type(
project=project,
type=work_item_type
)
# Extract transition rules if available
transitions = []
if hasattr(work_item_type_obj, 'transitions') and work_item_type_obj.transitions:
transitions = [
{
"to": getattr(transition, 'to', None),
"actions": getattr(transition, 'actions', [])
}
for transition in work_item_type_obj.transitions
if hasattr(transition, 'from') and getattr(transition, 'from', None) == from_state
]
else:
# Fallback: return all available states as potential transitions
if hasattr(work_item_type_obj, 'states') and work_item_type_obj.states:
transitions = [
{
"to": state.name,
"actions": []
}
for state in work_item_type_obj.states
if state.name != from_state
]
return transitions
except Exception as e:
# Fallback: return empty transitions with error info
return {"error": str(e), "transitions": []}