Skip to main content
Glama
index.js8.99 kB
/** * 数据库连接池管理 * MySQL数据库连接和查询封装 */ import mysql from 'mysql2/promise'; import { logger } from '../utils/logger.js'; import { configManager } from '../config/config-manager.js'; /** * 数据库连接池配置 */ const poolConfig = { host: configManager.get('DB_HOST', 'localhost'), port: configManager.get('DB_PORT', 3306), user: configManager.get('DB_USER', 'root'), password: configManager.get('DB_PASSWORD', ''), database: configManager.get('DB_NAME', 'xiaohongshu_mcp'), charset: 'utf8mb4', timezone: '+00:00', // 连接池配置 connectionLimit: configManager.get('DB_CONNECTION_LIMIT', 10), acquireTimeout: configManager.get('DB_ACQUIRE_TIMEOUT', 60000), timeout: configManager.get('DB_TIMEOUT', 60000), reconnect: true, // 性能优化配置 enableKeepAlive: true, keepAliveInitialDelay: 0, // 调试配置 debug: configManager.get('DB_DEBUG', false), trace: configManager.get('DB_TRACE', false) }; /** * 数据库连接池 */ class DatabasePool { constructor() { this.pool = null; this.isInitialized = false; this.connectionStats = { totalConnections: 0, activeConnections: 0, idleConnections: 0, queuedQueries: 0, errorCount: 0 }; } /** * 初始化连接池 */ async initialize() { if (this.isInitialized) { return; } try { logger.info('正在初始化数据库连接池...', { host: poolConfig.host, port: poolConfig.port, database: poolConfig.database, connectionLimit: poolConfig.connectionLimit }); this.pool = mysql.createPool(poolConfig); // 测试连接 await this.testConnection(); // 启动连接池监控 this.startPoolMonitoring(); this.isInitialized = true; logger.info('数据库连接池初始化成功'); } catch (error) { logger.error('数据库连接池初始化失败:', error); throw error; } } /** * 测试数据库连接 */ async testConnection() { try { const connection = await this.pool.getConnection(); await connection.ping(); connection.release(); logger.info('数据库连接测试成功'); } catch (error) { logger.error('数据库连接测试失败:', error); throw error; } } /** * 启动连接池监控 */ startPoolMonitoring() { // 每30秒更新一次连接池统计信息 setInterval(() => { this.updateConnectionStats(); }, 30000); // 监听连接池事件 this.pool.on('connection', (connection) => { logger.debug('新建数据库连接', { threadId: connection.threadId, connectionId: connection.connectionId }); }); this.pool.on('acquire', (connection) => { logger.debug('获取数据库连接', { threadId: connection.threadId, connectionId: connection.connectionId }); }); this.pool.on('release', (connection) => { logger.debug('释放数据库连接', { threadId: connection.threadId, connectionId: connection.connectionId }); }); this.pool.on('error', (error) => { logger.error('数据库连接池错误:', error); this.connectionStats.errorCount++; }); } /** * 更新连接池统计信息 */ updateConnectionStats() { if (!this.pool) return; try { const stats = this.pool._allConnections.length; const active = this.pool._acquiringConnections.length; const idle = this.pool._freeConnections.length; const queued = this.pool._connectionQueue.length; this.connectionStats = { totalConnections: stats, activeConnections: active, idleConnections: idle, queuedQueries: queued, errorCount: this.connectionStats.errorCount }; logger.debug('数据库连接池统计:', this.connectionStats); } catch (error) { logger.error('更新连接池统计失败:', error); } } /** * 执行查询 * @param {string} sql - SQL语句 * @param {Array} params - 参数数组 * @returns {Promise} 查询结果 */ async query(sql, params = []) { if (!this.isInitialized) { await this.initialize(); } const startTime = Date.now(); try { logger.debug('执行数据库查询:', { sql, params }); const [rows] = await this.pool.execute(sql, params); const duration = Date.now() - startTime; logger.debug(`查询完成 (${duration}ms):`, { sql: sql.substring(0, 100) + (sql.length > 100 ? '...' : ''), affectedRows: rows.affectedRows, changedRows: rows.changedRows, insertId: rows.insertId }); return rows; } catch (error) { const duration = Date.now() - startTime; logger.error(`查询失败 (${duration}ms):`, { sql, params, error: error.message }); // 特殊错误处理 if (error.code === 'ER_NO_SUCH_TABLE') { throw new Error(`数据表不存在: ${error.sqlMessage}`); } if (error.code === 'ER_DUP_ENTRY') { throw new Error(`数据重复: ${error.sqlMessage}`); } if (error.code === 'ER_NO_REFERENCED_ROW_2') { throw new Error(`外键约束错误: ${error.sqlMessage}`); } throw error; } } /** * 执行事务 * @param {Function} callback - 事务回调函数 * @returns {Promise} 事务结果 */ async transaction(callback) { if (!this.isInitialized) { await this.initialize(); } const connection = await this.pool.getConnection(); try { await connection.beginTransaction(); logger.debug('开始数据库事务'); const result = await callback(connection); await connection.commit(); logger.debug('提交数据库事务'); return result; } catch (error) { await connection.rollback(); logger.error('回滚数据库事务:', error); throw error; } finally { connection.release(); } } /** * 批量插入 * @param {string} table - 表名 * @param {Array<Object>} data - 数据数组 * @returns {Promise} 插入结果 */ async batchInsert(table, data) { if (!Array.isArray(data) || data.length === 0) { throw new Error('数据必须是包含对象的数组'); } if (!this.isInitialized) { await this.initialize(); } const keys = Object.keys(data[0]); const placeholders = keys.map(() => '?').join(','); const sql = `INSERT INTO ${table} (${keys.join(',')}) VALUES (${placeholders})`; const values = data.map(item => keys.map(key => item[key])); try { logger.debug(`批量插入 ${data.length} 条记录到 ${table}`); const [result] = await this.pool.query(`INSERT INTO ${table} (${keys.join(',')}) VALUES ?`, [values]); logger.debug(`批量插入完成: ${result.affectedRows} 条记录`); return result; } catch (error) { logger.error(`批量插入失败:`, error); throw error; } } /** * 获取连接池统计信息 * @returns {Object} 统计信息 */ getStats() { return { ...this.connectionStats, isInitialized: this.isInitialized, config: { host: poolConfig.host, port: poolConfig.port, database: poolConfig.database, connectionLimit: poolConfig.connectionLimit } }; } /** * 关闭连接池 */ async close() { if (this.pool) { try { await this.pool.end(); logger.info('数据库连接池已关闭'); } catch (error) { logger.error('关闭数据库连接池失败:', error); throw error; } finally { this.pool = null; this.isInitialized = false; } } } } // 创建全局连接池实例 const databasePool = new DatabasePool(); /** * 获取数据库连接池 * @returns {DatabasePool} 连接池实例 */ export function getDatabasePool() { return databasePool; } /** * 执行查询(快捷函数) * @param {string} sql - SQL语句 * @param {Array} params - 参数数组 * @returns {Promise} 查询结果 */ export async function query(sql, params = []) { return databasePool.query(sql, params); } /** * 执行事务(快捷函数) * @param {Function} callback - 事务回调函数 * @returns {Promise} 事务结果 */ export async function transaction(callback) { return databasePool.transaction(callback); } /** * 初始化数据库 */ export async function initializeDatabase() { await databasePool.initialize(); } /** * 关闭数据库连接 */ export async function closeDatabase() { await databasePool.close(); } // 导出连接池实例 export const pool = databasePool;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billyangbc/xiaohongshu-mcp-nodejs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server