Skip to main content
Glama

Redmine MCP Server

by snowild
redmine_client.py29.8 kB
""" Redmine API 客戶端 負責與 Redmine 系統的 HTTP 通訊 """ import requests from typing import Dict, List, Optional, Any, Union from dataclasses import dataclass from datetime import datetime import json import os from pathlib import Path from .config import get_config from .validators import RedmineValidator, validate_and_clean_data, RedmineValidationError @dataclass class RedmineIssue: """Redmine 議題數據結構""" id: int subject: str description: str status: Dict[str, Any] priority: Dict[str, Any] project: Dict[str, Any] tracker: Dict[str, Any] author: Dict[str, Any] assigned_to: Optional[Dict[str, Any]] = None created_on: Optional[str] = None updated_on: Optional[str] = None done_ratio: int = 0 @dataclass class RedmineProject: """Redmine 專案數據結構""" id: int name: str identifier: str description: str status: int created_on: Optional[str] = None updated_on: Optional[str] = None @dataclass class RedmineUser: """Redmine 用戶數據結構""" id: int login: str firstname: str lastname: str mail: str status: int created_on: Optional[str] = None last_login_on: Optional[str] = None class RedmineAPIError(Exception): """Redmine API 錯誤""" def __init__(self, message: str, status_code: Optional[int] = None, response_data: Optional[Dict] = None): super().__init__(message) self.status_code = status_code self.response_data = response_data class RedmineClient: """Redmine API 客戶端""" def __init__(self): self.config = get_config() self.session = requests.Session() self.session.headers.update(self.config.api_headers) self.session.timeout = self.config.redmine_timeout # 快取相關設定 self.cache_dir = Path.home() / ".redmine_mcp" self.cache_dir.mkdir(exist_ok=True) # 根據 domain 建立唯一的快取檔案名稱 domain_hash = hash(self.config.redmine_domain) safe_domain = self.config.redmine_domain.replace('://', '_').replace('/', '_').replace(':', '_') self._cache_file = self.cache_dir / f"cache_{safe_domain}_{abs(domain_hash)}.json" self._enum_cache: Optional[Dict[str, Any]] = None def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """執行 HTTP 請求""" url = f"{self.config.redmine_domain}/{endpoint.lstrip('/')}" try: response = self.session.request(method, url, **kwargs) response.raise_for_status() if response.content: return response.json() return {} except requests.exceptions.Timeout: friendly_msg = RedmineValidator.get_friendly_error_message( Exception("timeout"), "request" ) raise RedmineAPIError(friendly_msg) except requests.exceptions.ConnectionError as e: friendly_msg = RedmineValidator.get_friendly_error_message(e, "connection") raise RedmineAPIError(friendly_msg) except requests.exceptions.HTTPError as e: status_code = e.response.status_code if e.response else None error_data = None try: if e.response and e.response.content: error_data = e.response.json() except: pass # 使用友好的錯誤訊息 context = "issue" if "/issues" in url else "project" if "/projects" in url else "request" friendly_msg = RedmineValidator.get_friendly_error_message(e, context) raise RedmineAPIError(friendly_msg, status_code, error_data) except requests.exceptions.RequestException as e: friendly_msg = RedmineValidator.get_friendly_error_message(e, "request") raise RedmineAPIError(friendly_msg) except json.JSONDecodeError as e: friendly_msg = RedmineValidator.get_friendly_error_message(e, "response") raise RedmineAPIError(friendly_msg) def get_issue(self, issue_id: int, include: Optional[List[str]] = None) -> RedmineIssue: """取得單一議題""" params = {} if include: params['include'] = ','.join(include) response = self._make_request('GET', f'/issues/{issue_id}.json', params=params) if 'issue' not in response: raise RedmineAPIError(f"議題 {issue_id} 不存在") issue_data = response['issue'] return RedmineIssue( id=issue_data['id'], subject=issue_data['subject'], description=issue_data.get('description', ''), status=issue_data['status'], priority=issue_data['priority'], project=issue_data['project'], tracker=issue_data['tracker'], author=issue_data['author'], assigned_to=issue_data.get('assigned_to'), created_on=issue_data.get('created_on'), updated_on=issue_data.get('updated_on'), done_ratio=issue_data.get('done_ratio', 0) ) def get_issue_raw(self, issue_id: int, include: Optional[List[str]] = None) -> Dict[str, Any]: """取得單一議題的原始 API 資料(包含 journals 和 attachments)""" params = {} if include: params['include'] = ','.join(include) response = self._make_request('GET', f'/issues/{issue_id}.json', params=params) if 'issue' not in response: raise RedmineAPIError(f"議題 {issue_id} 不存在") return response['issue'] def list_issues(self, project_id: Optional[int] = None, status_id: Optional[int] = None, assigned_to_id: Optional[int] = None, tracker_id: Optional[int] = None, priority_id: Optional[int] = None, author_id: Optional[int] = None, created_on: Optional[str] = None, updated_on: Optional[str] = None, limit: int = 100, offset: int = 0, sort: Optional[str] = None, include: Optional[List[str]] = None) -> List[RedmineIssue]: """列出議題""" # 驗證查詢參數 query_params = { 'project_id': project_id, 'status_id': status_id, 'assigned_to_id': assigned_to_id, 'tracker_id': tracker_id, 'priority_id': priority_id, 'author_id': author_id, 'created_on': created_on, 'updated_on': updated_on, 'limit': limit, 'offset': offset, 'sort': sort } try: validated_params = validate_and_clean_data(query_params, "query") except RedmineValidationError as e: raise RedmineAPIError(f"查詢參數驗證失敗:{e}") params = validated_params # 加入額外參數 if include: params['include'] = ','.join(include) response = self._make_request('GET', '/issues.json', params=params) issues = [] for issue_data in response.get('issues', []): issues.append(RedmineIssue( id=issue_data['id'], subject=issue_data['subject'], description=issue_data.get('description', ''), status=issue_data['status'], priority=issue_data['priority'], project=issue_data['project'], tracker=issue_data['tracker'], author=issue_data['author'], assigned_to=issue_data.get('assigned_to'), created_on=issue_data.get('created_on'), updated_on=issue_data.get('updated_on'), done_ratio=issue_data.get('done_ratio', 0) )) return issues def create_issue(self, project_id: int, subject: str, description: str = "", tracker_id: Optional[int] = None, status_id: Optional[int] = None, priority_id: Optional[int] = None, assigned_to_id: Optional[int] = None, parent_issue_id: Optional[int] = None, custom_fields: Optional[List[Dict]] = None) -> int: """建立新議題,回傳議題 ID""" # 準備驗證資料 validation_data = { 'project_id': project_id, 'subject': subject, 'description': description, 'tracker_id': tracker_id, 'status_id': status_id, 'priority_id': priority_id, 'assigned_to_id': assigned_to_id, 'parent_issue_id': parent_issue_id, 'custom_fields': custom_fields } # 驗證資料 try: validated_data = validate_and_clean_data(validation_data, "issue") except RedmineValidationError as e: raise RedmineAPIError(f"議題資料驗證失敗:{e}") issue_data = {'issue': validated_data} response = self._make_request('POST', '/issues.json', json=issue_data) if 'issue' not in response: raise RedmineAPIError("建立議題失敗:回應中沒有議題資料") return response['issue']['id'] def update_issue(self, issue_id: int, **kwargs) -> bool: """更新議題""" update_data = {'issue': {}} # 支援的更新欄位 if 'subject' in kwargs: update_data['issue']['subject'] = kwargs['subject'] if 'description' in kwargs: update_data['issue']['description'] = kwargs['description'] if 'status_id' in kwargs: update_data['issue']['status_id'] = kwargs['status_id'] if 'priority_id' in kwargs: update_data['issue']['priority_id'] = kwargs['priority_id'] if 'assigned_to_id' in kwargs: update_data['issue']['assigned_to_id'] = kwargs['assigned_to_id'] if 'done_ratio' in kwargs: update_data['issue']['done_ratio'] = kwargs['done_ratio'] if 'tracker_id' in kwargs: update_data['issue']['tracker_id'] = kwargs['tracker_id'] if 'parent_issue_id' in kwargs: # 如果 parent_issue_id 為 None,則移除父議題關係 if kwargs['parent_issue_id'] is None: update_data['issue']['parent_issue_id'] = "" else: update_data['issue']['parent_issue_id'] = kwargs['parent_issue_id'] if 'start_date' in kwargs: update_data['issue']['start_date'] = kwargs['start_date'] if 'due_date' in kwargs: update_data['issue']['due_date'] = kwargs['due_date'] if 'estimated_hours' in kwargs: update_data['issue']['estimated_hours'] = kwargs['estimated_hours'] if 'notes' in kwargs: update_data['issue']['notes'] = kwargs['notes'] if not update_data['issue']: raise RedmineAPIError("沒有提供要更新的欄位") self._make_request('PUT', f'/issues/{issue_id}.json', json=update_data) return True def delete_issue(self, issue_id: int) -> bool: """刪除議題""" self._make_request('DELETE', f'/issues/{issue_id}.json') return True def add_watcher(self, issue_id: int, user_id: int) -> bool: """新增議題觀察者""" watcher_data = {'user_id': user_id} self._make_request('POST', f'/issues/{issue_id}/watchers.json', json=watcher_data) return True def remove_watcher(self, issue_id: int, user_id: int) -> bool: """移除議題觀察者""" self._make_request('DELETE', f'/issues/{issue_id}/watchers/{user_id}.json') return True def get_project(self, project_id: Union[int, str], include: Optional[List[str]] = None) -> RedmineProject: """取得專案資訊""" params = {} if include: params['include'] = ','.join(include) response = self._make_request('GET', f'/projects/{project_id}.json', params=params) if 'project' not in response: raise RedmineAPIError(f"專案 {project_id} 不存在") project_data = response['project'] return RedmineProject( id=project_data['id'], name=project_data['name'], identifier=project_data['identifier'], description=project_data.get('description', ''), status=project_data['status'], created_on=project_data.get('created_on'), updated_on=project_data.get('updated_on') ) def list_projects(self, limit: int = 100, offset: int = 0) -> List[RedmineProject]: """列出專案""" params = { 'limit': limit, 'offset': offset } response = self._make_request('GET', '/projects.json', params=params) projects = [] for project_data in response.get('projects', []): projects.append(RedmineProject( id=project_data['id'], name=project_data['name'], identifier=project_data['identifier'], description=project_data.get('description', ''), status=project_data['status'], created_on=project_data.get('created_on'), updated_on=project_data.get('updated_on') )) return projects def create_project(self, name: str, identifier: str, description: str = "", homepage: str = "", is_public: bool = True, parent_id: Optional[int] = None, inherit_members: bool = False, tracker_ids: Optional[List[int]] = None, enabled_module_names: Optional[List[str]] = None) -> int: """建立新專案,回傳專案 ID""" # 準備驗證資料 validation_data = { 'name': name, 'identifier': identifier, 'description': description, 'homepage': homepage, 'is_public': is_public, 'parent_id': parent_id, 'inherit_members': inherit_members, 'tracker_ids': tracker_ids, 'enabled_module_names': enabled_module_names } # 驗證資料 try: validated_data = validate_and_clean_data(validation_data, "project") except RedmineValidationError as e: raise RedmineAPIError(f"專案資料驗證失敗:{e}") project_data = {'project': validated_data} response = self._make_request('POST', '/projects.json', json=project_data) if 'project' not in response: raise RedmineAPIError("建立專案失敗:回應中沒有專案資料") return response['project']['id'] def update_project(self, project_id: Union[int, str], **kwargs) -> bool: """更新專案""" update_data = {'project': {}} # 支援的更新欄位 for field in ['name', 'description', 'homepage', 'is_public', 'parent_id', 'inherit_members', 'tracker_ids', 'enabled_module_names']: if field in kwargs: update_data['project'][field] = kwargs[field] if not update_data['project']: raise RedmineAPIError("沒有提供要更新的欄位") self._make_request('PUT', f'/projects/{project_id}.json', json=update_data) return True def delete_project(self, project_id: Union[int, str]) -> bool: """刪除專案""" self._make_request('DELETE', f'/projects/{project_id}.json') return True def archive_project(self, project_id: Union[int, str]) -> bool: """封存專案""" self._make_request('PUT', f'/projects/{project_id}/archive.json') return True def unarchive_project(self, project_id: Union[int, str]) -> bool: """解除封存專案""" self._make_request('PUT', f'/projects/{project_id}/unarchive.json') return True def get_issue_statuses(self) -> List[Dict[str, Any]]: """取得議題狀態列表""" response = self._make_request('GET', '/issue_statuses.json') return response.get('issue_statuses', []) def get_priorities(self) -> List[Dict[str, Any]]: """取得優先級列表""" response = self._make_request('GET', '/enumerations/issue_priorities.json') return response.get('issue_priorities', []) def get_trackers(self) -> List[Dict[str, Any]]: """取得追蹤器列表""" response = self._make_request('GET', '/trackers.json') return response.get('trackers', []) def get_time_entry_activities(self) -> List[Dict[str, Any]]: """取得時間追蹤活動列表""" response = self._make_request('GET', '/enumerations/time_entry_activities.json') return response.get('time_entry_activities', []) def get_document_categories(self) -> List[Dict[str, Any]]: """取得文件分類列表""" response = self._make_request('GET', '/enumerations/document_categories.json') return response.get('document_categories', []) def get_users(self, status: Optional[int] = None, name: Optional[str] = None, group_id: Optional[int] = None, limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]: """取得用戶列表""" params = { 'limit': limit, 'offset': offset } if status: params['status'] = status if name: params['name'] = name if group_id: params['group_id'] = group_id response = self._make_request('GET', '/users.json', params=params) return response.get('users', []) def get_user(self, user_id: Union[int, str], include: Optional[List[str]] = None) -> Dict[str, Any]: """取得單一用戶資訊""" params = {} if include: params['include'] = ','.join(include) response = self._make_request('GET', f'/users/{user_id}.json', params=params) if 'user' not in response: raise RedmineAPIError(f"用戶 {user_id} 不存在") return response['user'] def get_current_user(self) -> Dict[str, Any]: """取得當前用戶資訊""" response = self._make_request('GET', '/my/account.json') if 'user' not in response: raise RedmineAPIError("無法取得當前用戶資訊") return response['user'] def list_users(self, limit: int = 20, offset: int = 0, status: int = None) -> List[RedmineUser]: """列出用戶""" params = { 'limit': min(max(limit, 1), 100), 'offset': max(offset, 0) } if status is not None: params['status'] = status response = self._make_request('GET', '/users.json', params=params) users = [] for user_data in response.get('users', []): users.append(RedmineUser( id=user_data['id'], login=user_data['login'], firstname=user_data.get('firstname', ''), lastname=user_data.get('lastname', ''), mail=user_data.get('mail', ''), status=user_data.get('status', 1), created_on=user_data.get('created_on'), last_login_on=user_data.get('last_login_on') )) return users def search_users(self, query: str, limit: int = 10) -> List[RedmineUser]: """搜尋用戶(依姓名或登入名)""" if not query.strip(): return [] params = { 'name': query.strip(), 'limit': min(max(limit, 1), 50) } response = self._make_request('GET', '/users.json', params=params) users = [] for user_data in response.get('users', []): users.append(RedmineUser( id=user_data['id'], login=user_data['login'], firstname=user_data.get('firstname', ''), lastname=user_data.get('lastname', ''), mail=user_data.get('mail', ''), status=user_data.get('status', 1), created_on=user_data.get('created_on'), last_login_on=user_data.get('last_login_on') )) return users def get_user(self, user_id: int) -> Dict[str, Any]: """取得特定用戶詳情""" response = self._make_request('GET', f'/users/{user_id}.json') if 'user' not in response: raise RedmineAPIError(f"找不到用戶 ID {user_id}") return response['user'] def _load_enum_cache(self) -> Dict[str, Any]: """載入列舉值快取""" if self._enum_cache is not None: return self._enum_cache try: if self._cache_file.exists(): with open(self._cache_file, 'r', encoding='utf-8') as f: self._enum_cache = json.load(f) # 檢查 domain 是否匹配 cached_domain = self._enum_cache.get('domain') if cached_domain != self.config.redmine_domain: # Domain 不匹配,重新建立快取 self._refresh_enum_cache() return self._enum_cache or {} # 檢查快取是否需要更新(超過24小時) cache_time = self._enum_cache.get('cache_time', 0) current_time = datetime.now().timestamp() if current_time - cache_time > 86400: # 24小時 self._refresh_enum_cache() else: self._refresh_enum_cache() except Exception: # 快取讀取失敗,重新建立 self._refresh_enum_cache() return self._enum_cache or {} def _refresh_enum_cache(self): """刷新列舉值快取""" try: # 取得所有列舉值 priorities = self.get_priorities() statuses = self.get_issue_statuses() trackers = self.get_trackers() time_entry_activities = self.get_time_entry_activities() # 取得用戶列表(限制100個避免太大) users = self.list_users(limit=100) # 建立名稱到ID的對應 user_by_name = {} user_by_login = {} for user in users: full_name = f"{user.firstname} {user.lastname}".strip() if full_name: user_by_name[full_name] = user.id user_by_login[user.login] = user.id self._enum_cache = { 'cache_time': datetime.now().timestamp(), 'domain': self.config.redmine_domain, 'priorities': {item['name']: item['id'] for item in priorities}, 'statuses': {item['name']: item['id'] for item in statuses}, 'trackers': {item['name']: item['id'] for item in trackers}, 'time_entry_activities': {item['name']: item['id'] for item in time_entry_activities}, 'users_by_name': user_by_name, 'users_by_login': user_by_login } # 儲存到檔案 with open(self._cache_file, 'w', encoding='utf-8') as f: json.dump(self._enum_cache, f, ensure_ascii=False, indent=2) except Exception as e: # 快取刷新失敗,使用空快取 self._enum_cache = { 'cache_time': 0, 'domain': self.config.redmine_domain, 'priorities': {}, 'statuses': {}, 'trackers': {}, 'time_entry_activities': {}, 'users_by_name': {}, 'users_by_login': {} } def find_priority_id_by_name(self, name: str) -> Optional[int]: """根據優先權名稱找到對應的 ID""" cache = self._load_enum_cache() return cache.get('priorities', {}).get(name) def find_status_id_by_name(self, name: str) -> Optional[int]: """根據狀態名稱找到對應的 ID""" cache = self._load_enum_cache() return cache.get('statuses', {}).get(name) def find_tracker_id_by_name(self, name: str) -> Optional[int]: """根據追蹤器名稱找到對應的 ID""" cache = self._load_enum_cache() return cache.get('trackers', {}).get(name) def get_available_priorities(self) -> Dict[str, int]: """取得所有可用的優先權選項(名稱到ID的對應)""" cache = self._load_enum_cache() return cache.get('priorities', {}) def get_available_statuses(self) -> Dict[str, int]: """取得所有可用的狀態選項(名稱到ID的對應)""" cache = self._load_enum_cache() return cache.get('statuses', {}) def get_available_trackers(self) -> Dict[str, int]: """取得所有可用的追蹤器選項(名稱到ID的對應)""" cache = self._load_enum_cache() return cache.get('trackers', {}) def find_user_id_by_name(self, name: str) -> Optional[int]: """根據用戶姓名找到對應的 ID""" cache = self._load_enum_cache() return cache.get('users_by_name', {}).get(name) def find_user_id_by_login(self, login: str) -> Optional[int]: """根據用戶登入名找到對應的 ID""" cache = self._load_enum_cache() return cache.get('users_by_login', {}).get(login) def find_user_id(self, identifier: str) -> Optional[int]: """根據用戶姓名或登入名找到對應的 ID(智慧查詢)""" cache = self._load_enum_cache() # 先嘗試姓名查詢 user_id = cache.get('users_by_name', {}).get(identifier) if user_id: return user_id # 再嘗試登入名查詢 return cache.get('users_by_login', {}).get(identifier) def get_available_users(self) -> Dict[str, Dict[str, int]]: """取得所有可用的用戶選項""" cache = self._load_enum_cache() return { 'by_name': cache.get('users_by_name', {}), 'by_login': cache.get('users_by_login', {}) } def find_time_entry_activity_id_by_name(self, name: str) -> Optional[int]: """根據時間追蹤活動名稱找到對應的 ID""" cache = self._load_enum_cache() return cache.get('time_entry_activities', {}).get(name) def get_available_time_entry_activities(self) -> Dict[str, int]: """取得所有可用的時間追蹤活動選項(名稱到ID的對應)""" cache = self._load_enum_cache() return cache.get('time_entry_activities', {}) def refresh_cache(self): """手動刷新快取""" self._refresh_enum_cache() def create_time_entry(self, issue_id: int, hours: float, activity_id: int, comments: str = "", spent_on: Optional[str] = None, user_id: Optional[int] = None) -> int: """建立時間記錄,回傳時間記錄 ID Args: issue_id: 議題 ID hours: 耗用工時 activity_id: 活動 ID comments: 備註(可選) spent_on: 記錄日期 YYYY-MM-DD 格式(可選,預設今日) user_id: 用戶 ID(可選,預設當前用戶) Returns: 時間記錄 ID """ from datetime import date # 準備時間記錄資料 time_entry_data = { 'issue_id': issue_id, 'hours': hours, 'activity_id': activity_id } if comments: time_entry_data['comments'] = comments if spent_on: time_entry_data['spent_on'] = spent_on else: time_entry_data['spent_on'] = date.today().strftime('%Y-%m-%d') if user_id: time_entry_data['user_id'] = user_id # 發送請求 response = self._make_request('POST', '/time_entries.json', json={'time_entry': time_entry_data}) if 'time_entry' not in response: raise RedmineAPIError("建立時間記錄失敗:回應中沒有時間記錄資料") return response['time_entry']['id'] def test_connection(self) -> bool: """測試連線""" try: response = self._make_request('GET', '/my/account.json') return 'user' in response except RedmineAPIError: return False # 全域客戶端實例 _client: Optional[RedmineClient] = None def get_client() -> RedmineClient: """取得全域客戶端實例(單例模式)""" global _client if _client is None: _client = RedmineClient() return _client def reload_client() -> RedmineClient: """重新載入客戶端(主要用於測試)""" global _client _client = None return get_client()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/snowild/redmine-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server