# TIMELINE DỰ ÁN: AI AGENT PHÂN TÍCH DOANH THU BIZFLY CLOUD
## PHÂN TÍCH HỆ THỐNG HIỆN TẠI
### 1. Kiến trúc Data Pipeline
```
Raw Data (Billing DB)
├── Source tables: bills, bill_lines, v4_subscriptions, plans, invoices, v4_invoices
├── Accounts: dim_accounts, customer_types, sale_cares
└── Services: 19 service types (cloud_server, dbaas, vod, ddos, cdn, kubernetes_engine, ...)
↓
Transform Layer (dbt)
├── Dimensions: dim_invoices, dim_accounts, dim_first_cycle
├── Facts: fact_usages, fact_growth, fact_payment_requests
└── Reports:
├── report_new_revenue (base - aggregate by account, service, billing_cycle)
├── report_ibs_sales_revenue (customer analysis + sales info)
├── report_cloud_server (cloud server chi tiết + on_demand/subscription)
└── report_non_cloud_server (các service khác + on_demand/subscription)
```
### 2. Logic Phân Loại Billing Plan Hiện Tại
**Vị trí:**
- `transform/models/sale_assistant/reports/report_cloud_server.sql:43-45`
- `transform/models/sale_assistant/reports/report_non_cloud_server.sql:21-24`
**Logic:**
```sql
case
when p.billing_model_id = '9e6036a3-d537-4f0a-9ef1-1082895d2a14' then 'on_demand'
else 'subscription'
end as billing_plan
```
**Vấn đề:**
- Logic này được DUPLICATE trong 2 file SQL
- Chỉ áp dụng cho cloud_server và non_cloud_server reports
- KHÔNG có trong report_new_revenue (base report)
- Không có bảng mapping centralized
### 3. Kiến Trúc Mục Tiêu (Theo system_design_mermaid.md)
```
Interface Layer
├── Telegram Bot (primary)
├── Claude Desktop
└── Cursor IDE
↓ (MCP Protocol)
AI Agent Layer - MCP Server
├── Tools: query_revenue, analyze_trend, get_customer_details, send_alert
├── Resources: Revenue Schema, Service Mapping, Business Rules, SQL Templates
├── Prompts: analyze_revenue_trend, detect_churn_risk, generate_report
└── Core Logic: SQL Query Builder (OLD REPORT LOGIC), Trend Analyzer, Response Formatter
↓
Orchestration Layer (Optional)
├── Airflow (existing - DAGs)
└── N8N / Cron / Python scripts
↓
Data Layer
└── postgres-ana (Analytics DB)
```
---
## TIMELINE CHI TIẾT
### 📅 PHASE 1: TRANSFORM DATA (T2 20/01/2026 - 1 NGÀY)
**Mục tiêu:** Tạo bảng dimension mapping centralized cho billing plan (on_demand/subscription)
#### Task 1.1: Tạo Dimension Table - dim_billing_plan_mapping
**Thời gian:** 2-3 giờ
**Chi tiết:**
1. Tạo file `transform/models/sale_assistant/dimensions/dim_billing_plan_mapping.sql`
2. Logic:
```sql
-- Mapping table từ plan_id -> billing_plan_type (on_demand/subscription)
with plans_data as (
select
p.id as plan_id,
p.name as plan_name,
p.summary,
p.billing_model_id,
s.name as service_type,
case
when p.billing_model_id = '9e6036a3-d537-4f0a-9ef1-1082895d2a14' then 'on_demand'
else 'subscription'
end as billing_plan_type,
p._created as created_at,
p._updated as updated_at
from {{ source('billing', 'plans') }} p
left join {{ source('billing', 'services') }} s on s.id = p.service_id
)
select * from plans_data
```
3. Cấu hình trong `dbt_project.yml`:
```yaml
models:
sale_assistant:
dimensions:
dim_billing_plan_mapping:
materialized: table
tags: ['dimension', 'billing_plan']
```
**Deliverable:**
- File SQL mới: `dim_billing_plan_mapping.sql`
- Bảng mới trong postgres-ana: `dim_billing_plan_mapping`
#### Task 1.2: Refactor report_cloud_server.sql
**Thời gian:** 1-2 giờ
**Chi tiết:**
1. Thay thế hardcoded logic bằng JOIN với dim_billing_plan_mapping
2. Before:
```sql
case
when p.billing_model_id = '9e6036a3-d537-4f0a-9ef1-1082895d2a14' then 'on_demand'
else 'subscription'
end as billing_plan
```
3. After:
```sql
-- Thêm vào phần WITH clauses
billing_plan_mapping as (
select plan_id, billing_plan_type
from {{ ref('dim_billing_plan_mapping') }}
)
-- Trong main query
left join billing_plan_mapping bpm on p.id = bpm.plan_id
-- Thay billing_plan column
bpm.billing_plan_type as billing_plan
```
**Deliverable:**
- File updated: `report_cloud_server.sql`
#### Task 1.3: Refactor report_non_cloud_server.sql
**Thời gian:** 1-2 giờ
**Chi tiết:** Tương tự Task 1.2
**Deliverable:**
- File updated: `report_non_cloud_server.sql`
#### Task 1.4: Thêm billing_plan vào report_new_revenue.sql
**Thời gian:** 2-3 giờ
**Chi tiết:**
1. report_new_revenue hiện tại KHÔNG có billing_plan column
2. Cần JOIN với:
- `bill_lines` -> `v4_subscriptions` -> `plans` -> `dim_billing_plan_mapping`
3. Aggregate billing_plan:
- Group by: account_id, service_type, billing_cycle, **billing_plan_type**
- Tính total, total_paid, total_open cho từng (account, service, cycle, billing_plan)
**Lưu ý:**
- Đây là BREAKING CHANGE nếu downstream reports phụ thuộc vào structure cũ
- Cân nhắc tạo view mới: `report_new_revenue_v2` để backward compatibility
- HOẶC: Tạo report riêng: `report_revenue_by_billing_plan`
**Deliverable:**
- File updated/new: `report_new_revenue.sql` hoặc `report_revenue_by_billing_plan.sql`
#### Task 1.5: Testing & Validation
**Thời gian:** 2 giờ
**Chi tiết:**
1. Chạy dbt:
```bash
dbt run --select dim_billing_plan_mapping
dbt run --select report_cloud_server report_non_cloud_server
dbt run --select report_new_revenue
```
2. Validation queries:
```sql
-- Check dim_billing_plan_mapping coverage
SELECT billing_plan_type, COUNT(*)
FROM dim_billing_plan_mapping
GROUP BY billing_plan_type;
-- Verify report_cloud_server consistency
SELECT billing_plan, COUNT(DISTINCT email)
FROM report_cloud_server
WHERE billing_cycle = '01-12-2025'
GROUP BY billing_plan;
-- Compare old vs new logic
-- (nếu có backup data)
```
3. Data quality checks:
- Không có NULL trong billing_plan_type
- Tổng revenue không thay đổi sau refactor
- Số lượng records không đổi
**Deliverable:**
- Test report document
- Sign-off từ team lead
---
### 📅 PHASE 2: IMPLEMENT MCP SERVER (T3 21/01 - T5 23/01 - 3 NGÀY)
**Mục tiêu:** Xây dựng MCP Server cung cấp tools để query revenue data qua MCP Protocol
#### Task 2.1: Setup MCP Server Project
**Thời gian:** 3-4 giờ
**Chi tiết:**
1. Tạo folder structure:
```
/opt/bizflycloud-analytics/mcp-server/
├── package.json
├── tsconfig.json
├── src/
│ ├── index.ts # Main MCP server entry
│ ├── config/
│ │ ├── database.ts # DB connection config
│ │ └── mcp-config.ts # MCP server config
│ ├── tools/ # MCP Tools implementation
│ │ ├── query-revenue.ts
│ │ ├── analyze-trend.ts
│ │ ├── get-customer-details.ts
│ │ └── index.ts
│ ├── resources/ # MCP Resources
│ │ ├── revenue-schema.ts
│ │ ├── service-mapping.ts
│ │ └── index.ts
│ ├── prompts/ # MCP Prompts
│ │ ├── analyze-revenue-trend.ts
│ │ └── index.ts
│ └── utils/
│ ├── sql-builder.ts # Query builder
│ ├── response-formatter.ts
│ └── db-client.ts # Postgres client
└── .env
```
2. Install dependencies:
```bash
npm init -y
npm install @modelcontextprotocol/sdk pg dotenv typescript @types/node ts-node
npm install --save-dev @types/pg
```
3. Setup tsconfig.json:
```json
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true
}
}
```
**Deliverable:**
- MCP server project skeleton
- package.json với dependencies
#### Task 2.2: Implement Database Client
**Thời gian:** 2 giờ
**Chi tiết:**
1. File: `src/utils/db-client.ts`
```typescript
import { Pool } from 'pg';
export class DatabaseClient {
private pool: Pool;
constructor() {
this.pool = new Pool({
host: process.env.POSTGRES_ANA_HOST,
port: parseInt(process.env.POSTGRES_ANA_PORT || '5432'),
database: process.env.POSTGRES_ANA_DB,
user: process.env.POSTGRES_ANA_USER,
password: process.env.POSTGRES_ANA_PASSWORD,
max: 20,
idleTimeoutMillis: 30000,
});
}
async query<T>(sql: string, params?: any[]): Promise<T[]> {
const client = await this.pool.connect();
try {
const result = await client.query(sql, params);
return result.rows;
} finally {
client.release();
}
}
}
```
2. File: `.env`
```
POSTGRES_ANA_HOST=localhost
POSTGRES_ANA_PORT=5432
POSTGRES_ANA_DB=analytics
POSTGRES_ANA_USER=analytics_user
POSTGRES_ANA_PASSWORD=xxx
```
**Deliverable:**
- Database client với connection pooling
- Environment configuration
#### Task 2.3: Implement MCP Resources
**Thời gian:** 3-4 giờ
**Chi tiết:**
1. **Resource 1: Revenue Schema** (`src/resources/revenue-schema.ts`)
```typescript
export const revenueSchemaResource = {
uri: "bizfly://schema/revenue",
name: "Revenue Schema",
description: "Schema of revenue tables",
mimeType: "application/json",
content: JSON.stringify({
tables: {
report_new_revenue: {
columns: {
email: "string - customer email",
billing_cycle: "string - format DD-MM-YYYY",
service_type: "string - service name",
total: "numeric - total revenue",
total_paid: "numeric - paid amount",
total_open: "numeric - unpaid amount",
first_cycle: "string - first billing cycle"
}
},
report_ibs_sales_revenue: {
columns: {
mail_tai_khoan: "string",
billing_cycle: "string",
phan_loai_kh: "string - KH mới/KH cũ/KH quay lại",
tong_chi_phi_su_dung: "numeric",
ten_sale_chinh: "string",
ten_sale_phu: "string"
}
},
dim_billing_plan_mapping: {
columns: {
plan_id: "uuid",
plan_name: "string",
service_type: "string",
billing_plan_type: "string - on_demand/subscription"
}
}
}
})
};
```
2. **Resource 2: Service Mapping** (`src/resources/service-mapping.ts`)
```typescript
export const serviceMappingResource = {
uri: "bizfly://data/service-mapping",
name: "Service Type Mapping",
description: "Mapping of service types and categories",
mimeType: "application/json",
content: JSON.stringify({
service_types: [
"cloud_server", "dbaas", "vod", "ddos", "call_center",
"traffic_manager", "auto_scaling", "cloud_watcher",
"mail_inbox", "simple_storage", "container_registry",
"kas", "kubernetes_engine", "mps", "load_balancer",
"cdn", "lms", "vpn", "cloud_storage"
],
billing_plans: {
on_demand: "Pay-as-you-go model",
subscription: "Fixed monthly/yearly plan"
}
})
};
```
**Deliverable:**
- MCP Resources providing static context to AI
#### Task 2.4: Implement MCP Tool - query_revenue
**Thời gian:** 4-5 giờ
**Chi tiết:**
1. File: `src/tools/query-revenue.ts`
```typescript
import { Tool } from '@modelcontextprotocol/sdk/types.js';
import { DatabaseClient } from '../utils/db-client';
interface QueryRevenueParams {
billing_cycle?: string; // "01-12-2025"
service_type?: string; // "cloud_server"
billing_plan?: string; // "on_demand" | "subscription"
email?: string; // customer email
sale_name?: string; // sale person name
customer_type?: string; // "KH mới" | "KH cũ" | "KH quay lại"
aggregation?: 'account' | 'service' | 'billing_plan' | 'sale';
}
export class QueryRevenueTool implements Tool {
name = "query_revenue";
description = `
Query revenue data with flexible filters.
Supports filtering by:
- billing_cycle (DD-MM-YYYY format)
- service_type (cloud_server, dbaas, cdn, ...)
- billing_plan (on_demand, subscription)
- email (customer email)
- sale_name (sales person)
- customer_type (KH mới, KH cũ, KH quay lại)
Returns aggregated revenue data.
`;
inputSchema = {
type: "object",
properties: {
billing_cycle: { type: "string", description: "DD-MM-YYYY" },
service_type: { type: "string" },
billing_plan: { type: "string", enum: ["on_demand", "subscription"] },
email: { type: "string" },
sale_name: { type: "string" },
customer_type: { type: "string" },
aggregation: {
type: "string",
enum: ["account", "service", "billing_plan", "sale"],
default: "account"
}
}
};
async execute(params: QueryRevenueParams) {
const db = new DatabaseClient();
// Build dynamic SQL based on params
let sql = `
SELECT
r.email,
r.billing_cycle,
r.service_type,
SUM(r.total) as total_revenue,
SUM(r.total_paid) as paid_revenue,
SUM(r.total_open) as open_revenue
FROM report_new_revenue r
WHERE 1=1
`;
const whereClauses: string[] = [];
const queryParams: any[] = [];
if (params.billing_cycle) {
whereClauses.push(`r.billing_cycle = $${queryParams.length + 1}`);
queryParams.push(params.billing_cycle);
}
if (params.service_type) {
whereClauses.push(`r.service_type = $${queryParams.length + 1}`);
queryParams.push(params.service_type);
}
// JOIN with dim_billing_plan_mapping if filtering by billing_plan
if (params.billing_plan) {
sql = sql.replace(
'FROM report_new_revenue r',
`FROM report_new_revenue r
JOIN report_cloud_server cs ON r.email = cs.email
AND r.billing_cycle = cs.billing_cycle
AND r.service_type = cs.service_type`
);
whereClauses.push(`cs.billing_plan = $${queryParams.length + 1}`);
queryParams.push(params.billing_plan);
}
if (whereClauses.length > 0) {
sql += ' AND ' + whereClauses.join(' AND ');
}
sql += `
GROUP BY r.email, r.billing_cycle, r.service_type
ORDER BY total_revenue DESC
LIMIT 100
`;
const results = await db.query(sql, queryParams);
return {
total_records: results.length,
data: results,
summary: {
total_revenue: results.reduce((sum, r) => sum + parseFloat(r.total_revenue), 0),
total_paid: results.reduce((sum, r) => sum + parseFloat(r.paid_revenue), 0),
total_open: results.reduce((sum, r) => sum + parseFloat(r.open_revenue), 0)
}
};
}
}
```
**Deliverable:**
- query_revenue tool với dynamic SQL builder
- Support nhiều filter dimensions
#### Task 2.5: Implement MCP Tool - analyze_trend
**Thời gian:** 4-5 giờ
**Chi tiết:**
1. File: `src/tools/analyze-trend.ts`
```typescript
interface AnalyzeTrendParams {
current_cycle: string; // "01-01-2026"
comparison_cycles?: number; // default: 3 (compare với 3 tháng trước)
threshold_percent?: number; // default: 20 (cảnh báo khi thay đổi > 20%)
service_type?: string;
billing_plan?: string;
}
export class AnalyzeTrendTool implements Tool {
name = "analyze_trend";
description = `
Analyze revenue trend by comparing current cycle with previous cycles.
Detects anomalies (growth > threshold or churn risk).
Returns:
- Customers with significant revenue increase
- Customers with significant revenue decrease (churn risk)
- Trend statistics
`;
async execute(params: AnalyzeTrendParams) {
const db = new DatabaseClient();
const threshold = params.threshold_percent || 20;
const comparisonCycles = params.comparison_cycles || 3;
// Query current cycle revenue
const currentRevenue = await db.query(`
SELECT email, service_type, SUM(total) as current_total
FROM report_new_revenue
WHERE billing_cycle = $1
${params.service_type ? 'AND service_type = $2' : ''}
GROUP BY email, service_type
`, [params.current_cycle, params.service_type].filter(Boolean));
// Query previous N cycles
const previousRevenue = await db.query(`
SELECT email, service_type, AVG(total) as avg_total
FROM report_new_revenue
WHERE billing_cycle IN (
SELECT billing_cycle
FROM (
SELECT DISTINCT billing_cycle
FROM report_new_revenue
WHERE billing_cycle < $1
ORDER BY billing_cycle DESC
LIMIT $2
) sub
)
GROUP BY email, service_type
`, [params.current_cycle, comparisonCycles]);
// Calculate change percentage
const analysis = currentRevenue.map(curr => {
const prev = previousRevenue.find(
p => p.email === curr.email && p.service_type === curr.service_type
);
if (!prev) return null;
const changePercent = ((curr.current_total - prev.avg_total) / prev.avg_total) * 100;
return {
email: curr.email,
service_type: curr.service_type,
current_revenue: parseFloat(curr.current_total),
avg_previous_revenue: parseFloat(prev.avg_total),
change_percent: changePercent.toFixed(2),
is_anomaly: Math.abs(changePercent) >= threshold,
trend: changePercent > 0 ? 'increase' : 'decrease'
};
}).filter(item => item !== null && item.is_anomaly);
const increases = analysis.filter(a => a.trend === 'increase');
const decreases = analysis.filter(a => a.trend === 'decrease');
return {
summary: {
total_anomalies: analysis.length,
significant_increases: increases.length,
churn_risks: decreases.length,
threshold_percent: threshold
},
increases: increases.slice(0, 10), // Top 10
decreases: decreases.slice(0, 10),
alert_required: decreases.length > 0
};
}
}
```
**Deliverable:**
- analyze_trend tool với trend calculation logic
- Anomaly detection
#### Task 2.6: Implement MCP Server Main Entry
**Thời gian:** 3 giờ
**Chi tiết:**
1. File: `src/index.ts`
```typescript
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { QueryRevenueTool } from './tools/query-revenue';
import { AnalyzeTrendTool } from './tools/analyze-trend';
import { revenueSchemaResource, serviceMappingResource } from './resources';
const server = new Server(
{
name: 'bizfly-analytics-mcp',
version: '1.0.0',
},
{
capabilities: {
tools: {},
resources: {},
prompts: {}
},
}
);
// Register tools
const queryRevenueTool = new QueryRevenueTool();
const analyzeTrendTool = new AnalyzeTrendTool();
server.setRequestHandler('tools/list', async () => {
return {
tools: [
queryRevenueTool,
analyzeTrendTool
]
};
});
server.setRequestHandler('tools/call', async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case 'query_revenue':
return await queryRevenueTool.execute(args);
case 'analyze_trend':
return await analyzeTrendTool.execute(args);
default:
throw new Error(`Unknown tool: ${name}`);
}
});
// Register resources
server.setRequestHandler('resources/list', async () => {
return {
resources: [
revenueSchemaResource,
serviceMappingResource
]
};
});
server.setRequestHandler('resources/read', async (request) => {
const { uri } = request.params;
switch (uri) {
case 'bizfly://schema/revenue':
return revenueSchemaResource;
case 'bizfly://data/service-mapping':
return serviceMappingResource;
default:
throw new Error(`Unknown resource: ${uri}`);
}
});
// Start server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('BizFly Analytics MCP Server running on stdio');
}
main().catch(console.error);
```
**Deliverable:**
- Working MCP server với stdio transport
- Tools và Resources registered
#### Task 2.7: Testing MCP Server
**Thời gian:** 3-4 giờ
**Chi tiết:**
1. Manual test với Claude Desktop:
- Thêm vào `~/.config/claude-desktop/config.json`:
```json
{
"mcpServers": {
"bizfly-analytics": {
"command": "node",
"args": ["/opt/bizflycloud-analytics/mcp-server/dist/index.js"],
"env": {
"POSTGRES_ANA_HOST": "localhost",
"POSTGRES_ANA_PORT": "5432",
"POSTGRES_ANA_DB": "analytics",
"POSTGRES_ANA_USER": "user",
"POSTGRES_ANA_PASSWORD": "password"
}
}
}
}
```
2. Test cases:
- Query revenue cho billing_cycle = "01-12-2025"
- Query revenue với filter service_type = "cloud_server"
- Query revenue với billing_plan = "on_demand"
- Analyze trend cho current_cycle = "01-01-2026"
- Verify anomaly detection
3. Unit tests (optional):
```bash
npm install --save-dev jest @types/jest ts-jest
```
**Deliverable:**
- Test report
- MCP server config for Claude Desktop
---
### 📅 PHASE 3: IMPLEMENT CHAT AGENT (T6 24/01 - T2 28/01 - 3 NGÀY)
**Mục tiêu:** Xây dựng Telegram Bot hoặc Chat interface để user interact với MCP Server
#### Option A: Telegram Bot (Recommended)
##### Task 3.1: Setup Telegram Bot Project
**Thời gian:** 2 giờ
**Chi tiết:**
1. Create Telegram Bot via BotFather
- Get BOT_TOKEN
- Set webhook hoặc polling mode
2. Project structure:
```
/opt/bizflycloud-analytics/telegram-bot/
├── package.json
├── src/
│ ├── index.ts
│ ├── bot.ts
│ ├── handlers/
│ │ ├── revenue-query.ts
│ │ ├── trend-analysis.ts
│ │ └── alert-approval.ts
│ ├── mcp-client/
│ │ └── client.ts # MCP client to call MCP server
│ └── utils/
│ └── formatter.ts # Format response cho Telegram
└── .env
```
3. Install dependencies:
```bash
npm install telegraf @modelcontextprotocol/sdk dotenv
```
**Deliverable:**
- Telegram bot token
- Project skeleton
##### Task 3.2: Implement MCP Client
**Thời gian:** 3-4 giờ
**Chi tiết:**
1. File: `src/mcp-client/client.ts`
```typescript
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { spawn } from 'child_process';
export class MCPClient {
private client: Client;
async connect() {
// Spawn MCP server process
const serverProcess = spawn('node', [
'/opt/bizflycloud-analytics/mcp-server/dist/index.js'
], {
env: {
...process.env,
POSTGRES_ANA_HOST: process.env.POSTGRES_ANA_HOST,
POSTGRES_ANA_PORT: process.env.POSTGRES_ANA_PORT,
POSTGRES_ANA_DB: process.env.POSTGRES_ANA_DB,
POSTGRES_ANA_USER: process.env.POSTGRES_ANA_USER,
POSTGRES_ANA_PASSWORD: process.env.POSTGRES_ANA_PASSWORD
}
});
const transport = new StdioClientTransport({
stdin: serverProcess.stdin,
stdout: serverProcess.stdout,
stderr: serverProcess.stderr
});
this.client = new Client({
name: 'telegram-bot-client',
version: '1.0.0'
}, {
capabilities: {}
});
await this.client.connect(transport);
}
async callTool(name: string, args: any) {
return await this.client.callTool({ name, arguments: args });
}
async listTools() {
return await this.client.listTools();
}
}
```
**Deliverable:**
- MCP client wrapper cho Telegram bot
##### Task 3.3: Implement Revenue Query Handler
**Thời gian:** 4 giờ
**Chi tiết:**
1. File: `src/handlers/revenue-query.ts`
```typescript
import { Context } from 'telegraf';
import { MCPClient } from '../mcp-client/client';
export async function handleRevenueQuery(ctx: Context, mcpClient: MCPClient) {
const message = ctx.message.text;
// Parse natural language query
// Example: "Có bao nhiêu KH active tháng 12?"
// Extract: billing_cycle = "01-12-2025"
const params = parseQuery(message);
try {
const result = await mcpClient.callTool('query_revenue', params);
// Format response
const response = formatRevenueResponse(result);
await ctx.reply(response, { parse_mode: 'Markdown' });
} catch (error) {
await ctx.reply(`❌ Lỗi: ${error.message}`);
}
}
function parseQuery(message: string): any {
// Simple regex-based parsing
// Production: use NLP or LLM-based parsing
const params: any = {};
// Extract billing_cycle
const cycleMatch = message.match(/tháng (\d+)/);
if (cycleMatch) {
const month = cycleMatch[1].padStart(2, '0');
params.billing_cycle = `01-${month}-2025`;
}
// Extract service_type
if (message.includes('cloud server')) {
params.service_type = 'cloud_server';
}
// Extract billing_plan
if (message.includes('on-demand') || message.includes('ondemand')) {
params.billing_plan = 'on_demand';
} else if (message.includes('subscription')) {
params.billing_plan = 'subscription';
}
return params;
}
function formatRevenueResponse(result: any): string {
const { summary, total_records, data } = result;
let response = `📊 **Kết quả Revenue Query**\n\n`;
response += `📈 **Tổng quan:**\n`;
response += `- Tổng revenue: ${summary.total_revenue.toLocaleString()} VND\n`;
response += `- Đã thanh toán: ${summary.total_paid.toLocaleString()} VND\n`;
response += `- Chưa thanh toán: ${summary.total_open.toLocaleString()} VND\n`;
response += `- Số khách hàng: ${total_records}\n\n`;
response += `🔝 **Top 5 khách hàng:**\n`;
data.slice(0, 5).forEach((item, idx) => {
response += `${idx + 1}. ${item.email}: ${parseFloat(item.total_revenue).toLocaleString()} VND\n`;
});
return response;
}
```
**Deliverable:**
- Revenue query handler với natural language parsing
##### Task 3.4: Implement Trend Analysis Handler
**Thời gian:** 3-4 giờ
**Chi tiết:**
1. File: `src/handlers/trend-analysis.ts`
```typescript
export async function handleTrendAnalysis(ctx: Context, mcpClient: MCPClient) {
const message = ctx.message.text;
// Parse: "Phân tích xu hướng tháng 1"
const params = parseTrendQuery(message);
const result = await mcpClient.callTool('analyze_trend', params);
const response = formatTrendResponse(result);
await ctx.reply(response, { parse_mode: 'Markdown' });
// If alert required, ask for approval
if (result.alert_required) {
await askAlertApproval(ctx, result);
}
}
function formatTrendResponse(result: any): string {
const { summary, increases, decreases } = result;
let response = `📊 **Phân tích xu hướng**\n\n`;
response += `⚠️ **Cảnh báo:**\n`;
response += `- Tổng bất thường: ${summary.total_anomalies}\n`;
response += `- Tăng trưởng cao: ${summary.significant_increases}\n`;
response += `- Nguy cơ churn: ${summary.churn_risks}\n\n`;
if (decreases.length > 0) {
response += `🚨 **Khách hàng có nguy cơ rời bỏ:**\n`;
decreases.forEach(d => {
response += `- ${d.email}: ${d.change_percent}%\n`;
});
}
return response;
}
async function askAlertApproval(ctx: Context, result: any) {
await ctx.reply(
`🔔 Phát hiện ${result.summary.churn_risks} KH có nguy cơ churn.\n` +
`Bạn có muốn gửi alert đến Sales team không?`,
{
reply_markup: {
inline_keyboard: [
[
{ text: '✅ Gửi Alert', callback_data: 'approve_alert' },
{ text: '❌ Không', callback_data: 'reject_alert' }
]
]
}
}
);
}
```
**Deliverable:**
- Trend analysis handler với alert approval flow
##### Task 3.5: Implement Alert Approval System
**Thời gian:** 3 giờ
**Chi tiết:**
1. File: `src/handlers/alert-approval.ts`
```typescript
import { Context } from 'telegraf';
// Store pending alerts in memory (production: use Redis)
const pendingAlerts = new Map();
export function storePendingAlert(chatId: number, alertData: any) {
pendingAlerts.set(chatId, alertData);
}
export async function handleAlertApproval(ctx: Context) {
const chatId = ctx.chat.id;
const alertData = pendingAlerts.get(chatId);
if (!alertData) {
await ctx.answerCbQuery('Alert không tồn tại');
return;
}
// Send alert to Sales group
const SALES_GROUP_CHAT_ID = process.env.SALES_GROUP_CHAT_ID;
let alertMessage = `🚨 **ALERT: Churn Risk**\n\n`;
alertMessage += `⚠️ Các khách hàng có nguy cơ rời bỏ:\n\n`;
alertData.decreases.forEach(d => {
alertMessage += `- **${d.email}**\n`;
alertMessage += ` Revenue giảm: ${d.change_percent}%\n`;
alertMessage += ` Hiện tại: ${d.current_revenue.toLocaleString()} VND\n\n`;
});
alertMessage += `👉 **Action:** Contact ASAP để retain khách hàng`;
await ctx.telegram.sendMessage(
SALES_GROUP_CHAT_ID,
alertMessage,
{ parse_mode: 'Markdown' }
);
await ctx.answerCbQuery('✅ Đã gửi alert đến Sales team');
pendingAlerts.delete(chatId);
}
export async function handleAlertRejection(ctx: Context) {
const chatId = ctx.chat.id;
pendingAlerts.delete(chatId);
await ctx.answerCbQuery('❌ Đã hủy alert');
}
```
**Deliverable:**
- Human-in-the-loop alert approval system
##### Task 3.6: Main Bot Integration
**Thời gian:** 2-3 giờ
**Chi tiết:**
1. File: `src/index.ts`
```typescript
import { Telegraf } from 'telegraf';
import { MCPClient } from './mcp-client/client';
import { handleRevenueQuery } from './handlers/revenue-query';
import { handleTrendAnalysis } from './handlers/trend-analysis';
import { handleAlertApproval, handleAlertRejection } from './handlers/alert-approval';
const bot = new Telegraf(process.env.BOT_TOKEN!);
const mcpClient = new MCPClient();
// Connect to MCP server
mcpClient.connect().then(() => {
console.log('Connected to MCP server');
});
// Command handlers
bot.start((ctx) => {
ctx.reply(
'Chào mừng đến BizFly Analytics Bot!\n\n' +
'Bạn có thể hỏi:\n' +
'- "Có bao nhiêu KH active tháng 12?"\n' +
'- "Phân tích xu hướng tháng 1"\n' +
'- "Doanh thu cloud server on-demand tháng 12"\n'
);
});
bot.on('text', async (ctx) => {
const message = ctx.message.text.toLowerCase();
if (message.includes('phân tích') || message.includes('xu hướng')) {
await handleTrendAnalysis(ctx, mcpClient);
} else {
await handleRevenueQuery(ctx, mcpClient);
}
});
// Callback query handlers (for alert approval)
bot.action('approve_alert', handleAlertApproval);
bot.action('reject_alert', handleAlertRejection);
// Start bot
bot.launch();
process.once('SIGINT', () => bot.stop('SIGINT'));
process.once('SIGTERM', () => bot.stop('SIGTERM'));
```
2. File: `.env`
```
BOT_TOKEN=your_telegram_bot_token
SALES_GROUP_CHAT_ID=-1001234567890
POSTGRES_ANA_HOST=localhost
POSTGRES_ANA_PORT=5432
POSTGRES_ANA_DB=analytics
POSTGRES_ANA_USER=user
POSTGRES_ANA_PASSWORD=password
```
**Deliverable:**
- Working Telegram bot integrated với MCP server
##### Task 3.7: Testing & Deployment
**Thời gian:** 4 giờ
**Chi tiết:**
1. Manual testing:
- Test revenue query với different filters
- Test trend analysis
- Test alert approval flow
- Test error handling
2. Deploy:
```bash
# Build
cd /opt/bizflycloud-analytics/telegram-bot
npm run build
# Run with PM2
pm2 start dist/index.js --name bizfly-telegram-bot
pm2 save
pm2 startup
```
3. Monitor:
```bash
pm2 logs bizfly-telegram-bot
pm2 monit
```
**Deliverable:**
- Deployed Telegram bot
- Monitoring setup
---
#### Option B: N8N Workflow (Alternative)
##### Task 3.1B: Setup N8N Workflow
**Thời gian:** 2-3 giờ
**Chi tiết:**
1. Install N8N:
```bash
npm install -g n8n
n8n start
```
2. Create workflow:
- Trigger: Telegram Bot webhook
- Node 1: Parse message
- Node 2: HTTP Request to MCP Server API (need to add HTTP server to MCP)
- Node 3: Format response
- Node 4: Send Telegram message
**Note:** Requires implementing HTTP API wrapper cho MCP server (thêm 1 ngày)
---
### 📅 PHASE 4: ORCHESTRATION & AUTOMATION (T3 28/01 - T4 29/01 - 2 NGÀY)
**Mục tiêu:** Setup scheduled jobs để tự động phân tích và alert
#### Task 4.1: Airflow DAG for Scheduled Trend Analysis
**Thời gian:** 3-4 giờ
**Chi tiết:**
1. File: `orchestrate/dags/revenue_trend_analysis.py`
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import subprocess
import json
default_args = {
'owner': 'analytics',
'depends_on_past': False,
'start_date': datetime(2026, 1, 20),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'revenue_trend_analysis_daily',
default_args=default_args,
description='Daily revenue trend analysis and alert',
schedule_interval='0 9 * * *', # 9 AM daily
catchup=False
)
def call_mcp_analyze_trend(**context):
"""Call MCP server analyze_trend tool"""
# Get current billing_cycle
current_date = datetime.now()
current_cycle = f"01-{current_date.strftime('%m-%Y')}"
# Call MCP server via subprocess
mcp_client_script = '/opt/bizflycloud-analytics/scripts/mcp_client.py'
result = subprocess.run([
'python3', mcp_client_script,
'analyze_trend',
'--current_cycle', current_cycle,
'--threshold_percent', '20'
], capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"MCP call failed: {result.stderr}")
trend_data = json.loads(result.stdout)
# Store in XCom
context['ti'].xcom_push(key='trend_data', value=trend_data)
return trend_data
def check_alert_required(**context):
"""Check if alert should be sent"""
trend_data = context['ti'].xcom_pull(key='trend_data')
if trend_data['alert_required']:
return 'send_telegram_alert'
else:
return 'log_no_alert'
def send_telegram_alert(**context):
"""Send alert to Telegram"""
trend_data = context['ti'].xcom_pull(key='trend_data')
# Call Telegram bot API
bot_token = os.getenv('BOT_TOKEN')
manager_chat_id = os.getenv('MANAGER_CHAT_ID')
message = format_alert_message(trend_data)
requests.post(
f'https://api.telegram.org/bot{bot_token}/sendMessage',
json={
'chat_id': manager_chat_id,
'text': message,
'parse_mode': 'Markdown',
'reply_markup': {
'inline_keyboard': [[
{'text': '✅ Approve', 'callback_data': 'approve_alert'},
{'text': '❌ Reject', 'callback_data': 'reject_alert'}
]]
}
}
)
# Tasks
analyze_task = PythonOperator(
task_id='analyze_trend',
python_callable=call_mcp_analyze_trend,
dag=dag
)
check_task = BranchPythonOperator(
task_id='check_alert',
python_callable=check_alert_required,
dag=dag
)
alert_task = PythonOperator(
task_id='send_telegram_alert',
python_callable=send_telegram_alert,
dag=dag
)
log_task = PythonOperator(
task_id='log_no_alert',
python_callable=lambda: print("No alert required"),
dag=dag
)
analyze_task >> check_task >> [alert_task, log_task]
```
**Deliverable:**
- Airflow DAG cho scheduled trend analysis
#### Task 4.2: MCP Client Script for Airflow
**Thời gian:** 2 giờ
**Chi tiết:**
1. File: `scripts/mcp_client.py`
```python
#!/usr/bin/env python3
import subprocess
import json
import sys
import argparse
def call_mcp_tool(tool_name, **kwargs):
"""Call MCP server tool via Node.js client"""
client_js = """
const { Client } = require('@modelcontextprotocol/sdk/client');
const { StdioClientTransport } = require('@modelcontextprotocol/sdk/client/stdio');
const { spawn } = require('child_process');
async function main() {
const serverProcess = spawn('node', [
'/opt/bizflycloud-analytics/mcp-server/dist/index.js'
]);
const transport = new StdioClientTransport({
stdin: serverProcess.stdin,
stdout: serverProcess.stdout,
stderr: serverProcess.stderr
});
const client = new Client({
name: 'airflow-client',
version: '1.0.0'
}, {
capabilities: {}
});
await client.connect(transport);
const result = await client.callTool({
name: process.argv[2],
arguments: JSON.parse(process.argv[3])
});
console.log(JSON.stringify(result));
process.exit(0);
}
main().catch(err => {
console.error(err);
process.exit(1);
});
"""
# Write temp client script
with open('/tmp/mcp_client_temp.js', 'w') as f:
f.write(client_js)
# Call via Node.js
result = subprocess.run([
'node',
'/tmp/mcp_client_temp.js',
tool_name,
json.dumps(kwargs)
], capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"MCP call failed: {result.stderr}")
return json.loads(result.stdout)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('tool_name')
parser.add_argument('--current_cycle')
parser.add_argument('--threshold_percent', type=int, default=20)
args = parser.parse_args()
result = call_mcp_tool(
args.tool_name,
current_cycle=args.current_cycle,
threshold_percent=args.threshold_percent
)
print(json.dumps(result))
```
**Deliverable:**
- Python script để Airflow call MCP server
---
## SUMMARY TIMELINE
| Phase | Tasks | Duration | Deadline | Dependencies |
|-------|-------|----------|----------|--------------|
| **Phase 1: Transform Data** | Tạo dim_billing_plan_mapping, refactor reports | **1 ngày** | **T2 20/01** | - |
| **Phase 2: Implement MCP** | MCP Server với tools + resources | **3 ngày** | **T5 23/01** | Phase 1 |
| **Phase 3: Chat Agent** | Telegram Bot + MCP Client | **3 ngày** | **T2 28/01** | Phase 2 |
| **Phase 4: Orchestration** | Airflow DAG scheduled jobs | **2 ngày** | **T4 29/01** | Phase 2, 3 |
**Total: 9 ngày làm việc (T2 20/01 → T4 29/01)**
---
## RISKS & MITIGATION
### Risk 1: Refactor report_new_revenue Breaking Changes
**Impact:** HIGH - Downstream dashboards/reports có thể bị ảnh hưởng
**Mitigation:**
- Option 1: Tạo report mới `report_revenue_by_billing_plan` thay vì modify report_new_revenue
- Option 2: Tạo view backward-compatible
- Option 3: Inform stakeholders trước khi deploy
### Risk 2: MCP Server Performance
**Impact:** MEDIUM - Query slow với data lớn
**Mitigation:**
- Add indexes trên billing_cycle, service_type, email
- Implement query result caching (Redis)
- Limit result set (pagination)
### Risk 3: Natural Language Parsing Accuracy
**Impact:** MEDIUM - Bot hiểu sai câu hỏi
**Mitigation:**
- Phase 1: Regex-based parsing với predefined patterns
- Phase 2: Integrate LLM (GPT/Claude) để parse query
- Provide example queries trong bot /help command
### Risk 4: Telegram Bot Availability
**Impact:** LOW - Bot downtime
**Mitigation:**
- Deploy với PM2 auto-restart
- Setup monitoring (Uptime Robot)
- Implement health check endpoint
---
## APPENDIX
### A. Service Types Complete List (19 services)
```
cloud_server, dbaas, vod, ddos, call_center, traffic_manager,
auto_scaling, cloud_watcher, mail_inbox, simple_storage,
container_registry, kas, kubernetes_engine, mps, load_balancer,
cdn, lms, vpn, cloud_storage
```
### B. Key Tables Reference
- **dim_billing_plan_mapping**: NEW - mapping plan_id -> billing_plan_type
- **report_new_revenue**: Base revenue report (aggregate by account, service, cycle)
- **report_ibs_sales_revenue**: Customer analysis với sales info
- **report_cloud_server**: Cloud server chi tiết với billing_plan
- **dim_accounts**: Account master data
- **dim_first_cycle**: First billing cycle per account
### C. MCP Tools Summary
| Tool | Purpose | Input | Output |
|------|---------|-------|--------|
| **query_revenue** | Query revenue với filters | billing_cycle, service_type, billing_plan, email, sale_name | Revenue data aggregated |
| **analyze_trend** | Phân tích xu hướng, detect anomaly | current_cycle, threshold_percent | Increases, decreases, churn risks |
| **get_customer_details** | (Future) Chi tiết customer | email | Customer profile + revenue history |
| **send_alert** | (Future) Gửi alert | message, recipients | Alert sent status |
### D. Environment Variables Required
```
# Database
POSTGRES_ANA_HOST=localhost
POSTGRES_ANA_PORT=5432
POSTGRES_ANA_DB=analytics
POSTGRES_ANA_USER=user
POSTGRES_ANA_PASSWORD=xxx
# Telegram
BOT_TOKEN=xxx
SALES_GROUP_CHAT_ID=-1001234567890
MANAGER_CHAT_ID=123456789
```
---
**Prepared by:** Claude Sonnet 4.5
**Date:** 17/01/2026
**Version:** 1.0