Skip to main content
Glama
token.ts8.48 kB
/** * Access Token Management Tools * * 访问令牌管理相关的 MCP 工具实现 */ import type { GiteaConfig } from '../config.js'; import type { GiteaAccessToken, CreateAccessTokenOptions, } from '../types/gitea.js'; import { createLogger } from '../logger.js'; const logger = createLogger('tools:token'); export interface TokenToolsContext { config: GiteaConfig; } /** * 通过账号密码创建访问令牌 * * 此功能使用 Basic Authentication(账号密码)来创建新的 API Token * 创建成功后,返回的 token 可以用于后续的 API 调用 * * 注意:token 的完整值(sha1)只在创建时返回一次,请妥善保存 */ export async function createTokenWithPassword( ctx: TokenToolsContext, args: { username: string; password: string; token_name: string; scopes?: string[]; } ) { logger.debug({ username: args.username, token_name: args.token_name }, 'Creating access token with password'); const baseUrl = ctx.config.baseUrl.replace(/\/$/, ''); const url = `${baseUrl}/api/v1/users/${args.username}/tokens`; // 构建 Basic Authentication header const credentials = Buffer.from(`${args.username}:${args.password}`).toString('base64'); const headers = { 'Authorization': `Basic ${credentials}`, 'Content-Type': 'application/json', 'Accept': 'application/json', }; // 构建请求体 const body: CreateAccessTokenOptions = { name: args.token_name, scopes: args.scopes, }; try { const response = await fetch(url, { method: 'POST', headers, body: JSON.stringify(body), signal: AbortSignal.timeout(ctx.config.timeout || 30000), }); // 解析响应 let data: any; const contentType = response.headers.get('content-type'); if (contentType?.includes('application/json')) { data = await response.json(); } else { data = await response.text(); } // 检查响应状态 if (!response.ok) { logger.error( { status: response.status, statusText: response.statusText, data, }, 'Failed to create access token' ); // 处理常见错误 if (response.status === 401) { throw new Error('Authentication failed: Invalid username or password'); } if (response.status === 422) { throw new Error(`Token creation failed: ${data.message || 'Invalid parameters'}`); } throw new Error(`Failed to create token: ${response.status} ${response.statusText}`); } const token = data as GiteaAccessToken; logger.info( { username: args.username, token_id: token.id, token_name: token.name, }, 'Access token created successfully' ); return { success: true, token: { id: token.id, name: token.name, token: token.sha1, // The full token value (only returned once!) token_last_eight: token.token_last_eight, scopes: token.scopes, }, message: 'Token created successfully. Please save the token value as it will not be shown again.', }; } catch (error) { if (error instanceof Error) { logger.error({ error: error.message }, 'Failed to create access token'); throw error; } throw new Error('Failed to create access token: Unknown error'); } } /** * 列出用户的访问令牌 * * 注意:此接口需要已有的 token 或 Basic Auth 认证 * 返回的 token 列表不包含完整的 token 值,只有最后 8 位 */ export async function listTokens( ctx: TokenToolsContext, args: { username: string; password?: string; // 可选:如果提供,使用 Basic Auth;否则使用配置的认证 } ) { logger.debug({ username: args.username }, 'Listing access tokens'); const baseUrl = ctx.config.baseUrl.replace(/\/$/, ''); const url = `${baseUrl}/api/v1/users/${args.username}/tokens`; // 构建认证头 let authHeader: string; if (args.password) { // 使用提供的密码进行 Basic Authentication const credentials = Buffer.from(`${args.username}:${args.password}`).toString('base64'); authHeader = `Basic ${credentials}`; } else { // 使用配置的认证方式 if (ctx.config.apiToken) { authHeader = `token ${ctx.config.apiToken}`; } else if (ctx.config.username && ctx.config.password) { const credentials = Buffer.from(`${ctx.config.username}:${ctx.config.password}`).toString('base64'); authHeader = `Basic ${credentials}`; } else { throw new Error('No authentication available. Please provide password or configure GITEA_API_TOKEN'); } } const headers = { 'Authorization': authHeader, 'Accept': 'application/json', }; try { const response = await fetch(url, { method: 'GET', headers, signal: AbortSignal.timeout(ctx.config.timeout || 30000), }); if (!response.ok) { const data = await response.text(); logger.error( { status: response.status, statusText: response.statusText, data, }, 'Failed to list access tokens' ); if (response.status === 401) { throw new Error('Authentication failed: Invalid credentials'); } throw new Error(`Failed to list tokens: ${response.status} ${response.statusText}`); } const tokens = await response.json() as GiteaAccessToken[]; logger.debug({ count: tokens.length }, 'Access tokens listed'); return { success: true, tokens: tokens.map((token) => ({ id: token.id, name: token.name, token_last_eight: token.token_last_eight, scopes: token.scopes, })), }; } catch (error) { if (error instanceof Error) { logger.error({ error: error.message }, 'Failed to list access tokens'); throw error; } throw new Error('Failed to list access tokens: Unknown error'); } } /** * 删除访问令牌 * * 注意:此接口需要已有的 token 或 Basic Auth 认证 */ export async function deleteToken( ctx: TokenToolsContext, args: { username: string; token_id: number; password?: string; // 可选:如果提供,使用 Basic Auth;否则使用配置的认证 } ) { logger.debug({ username: args.username, token_id: args.token_id }, 'Deleting access token'); const baseUrl = ctx.config.baseUrl.replace(/\/$/, ''); const url = `${baseUrl}/api/v1/users/${args.username}/tokens/${args.token_id}`; // 构建认证头 let authHeader: string; if (args.password) { const credentials = Buffer.from(`${args.username}:${args.password}`).toString('base64'); authHeader = `Basic ${credentials}`; } else { if (ctx.config.apiToken) { authHeader = `token ${ctx.config.apiToken}`; } else if (ctx.config.username && ctx.config.password) { const credentials = Buffer.from(`${ctx.config.username}:${ctx.config.password}`).toString('base64'); authHeader = `Basic ${credentials}`; } else { throw new Error('No authentication available. Please provide password or configure GITEA_API_TOKEN'); } } const headers = { 'Authorization': authHeader, 'Accept': 'application/json', }; try { const response = await fetch(url, { method: 'DELETE', headers, signal: AbortSignal.timeout(ctx.config.timeout || 30000), }); if (!response.ok && response.status !== 204) { const data = await response.text(); logger.error( { status: response.status, statusText: response.statusText, data, }, 'Failed to delete access token' ); if (response.status === 401) { throw new Error('Authentication failed: Invalid credentials'); } if (response.status === 404) { throw new Error(`Token not found: #${args.token_id}`); } throw new Error(`Failed to delete token: ${response.status} ${response.statusText}`); } logger.info({ username: args.username, token_id: args.token_id }, 'Access token deleted successfully'); return { success: true, message: `Token #${args.token_id} has been deleted`, }; } catch (error) { if (error instanceof Error) { logger.error({ error: error.message }, 'Failed to delete access token'); throw error; } throw new Error('Failed to delete access token: Unknown error'); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SupenBysz/gitea-mcp-tool'

If you have feedback or need assistance with the MCP directory API, please join our Discord server