name: Integration Test
# This workflow tests the integration with Intercom's real API
# Uses time-based sync (default: 5 minutes) for predictable CI runtime
# Performance requirement: >10 conversations/second
# Timeout set to 15 minutes for fast feedback
on:
workflow_dispatch:
inputs:
max_sync_minutes:
description: 'Maximum sync duration in minutes (default: 5)'
required: false
default: '5'
type: string
run_full_test:
description: 'Run full integration test with all verifications'
required: false
default: true
type: boolean
schedule:
# Weekly run on Monday at 6AM UTC
- cron: '0 6 * * 1'
push:
tags:
- 'v*' # Pre-release trigger for version tags
env:
PYTHONPATH: ${{ github.workspace }}
FORCE_COLOR: 1
jobs:
integration-test:
name: Real API Integration Test
runs-on: ubuntu-latest
timeout-minutes: 15 # Reduced for faster feedback with time-based sync
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0 # Full history for better context
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y sqlite3
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install pytest pytest-asyncio pytest-cov httpx[http2]
- name: Verify package installation
run: |
python -c "import fast_intercom_mcp; print(f'โ
Package imported successfully: {fast_intercom_mcp.__version__ if hasattr(fast_intercom_mcp, \"__version__\") else \"dev\"}')"
python -m fast_intercom_mcp --help
- name: Create test environment
run: |
mkdir -p integration_test_data
cd integration_test_data
# Create minimal .env file for testing
cat > .env << EOF
INTERCOM_ACCESS_TOKEN=${{ secrets.INTERCOM_ACCESS_TOKEN }}
DATABASE_PATH=./test_integration.db
LOG_LEVEL=INFO
API_RATE_LIMIT=10
EOF
echo "โ
Test environment created"
- name: Run integration test
working-directory: integration_test_data
env:
INTERCOM_ACCESS_TOKEN: ${{ secrets.INTERCOM_ACCESS_TOKEN }}
run: |
set -e
echo "๐ Starting Integration Test"
echo "Test environment: $(python --version)"
echo "Max sync minutes: ${{ github.event.inputs.max_sync_minutes || '5' }}"
echo "Full test: ${{ github.event.inputs.run_full_test || 'true' }}"
# Initialize test start time
TEST_START_TIME=$(date +%s)
# Test 1: Package import and CLI availability
echo "๐ฆ Testing package import and CLI..."
python -c "import fast_intercom_mcp.cli; print('โ
CLI module imported')"
# Test 2: Database initialization
echo "๐๏ธ Testing database initialization..."
python -c "
import asyncio
from fast_intercom_mcp.database import DatabaseManager
async def test_db():
db = DatabaseManager('./test_integration.db')
print('โ
Database initialized successfully')
asyncio.run(test_db())
"
# Test 3: API connection test
echo "๐ Testing Intercom API connection..."
python -c "
import asyncio
import os
from fast_intercom_mcp.intercom_client import IntercomClient
async def test_connection():
client = IntercomClient(os.getenv('INTERCOM_ACCESS_TOKEN'))
try:
result = await client.test_connection()
print(f'โ
API connection test: {result}')
return result
except Exception as e:
print(f'โ API connection failed: {e}')
raise
asyncio.run(test_connection())
"
# Test 4: Sync service initialization and status
echo "โ๏ธ Testing sync service initialization..."
python -c "
import asyncio
import os
from fast_intercom_mcp.sync_service import SyncService
from fast_intercom_mcp.database import DatabaseManager
from fast_intercom_mcp.intercom_client import IntercomClient
async def test_sync_service():
db = DatabaseManager('./test_integration.db')
client = IntercomClient(os.getenv('INTERCOM_ACCESS_TOKEN'))
sync_service = SyncService(db, client)
status = sync_service.get_status()
print(f'โ
Sync service status: {status}')
asyncio.run(test_sync_service())
"
# Test 5: Actual sync test with real API data
echo "๐ Running real API sync test..."
# Use time-based sync with configurable duration
MAX_SYNC_MINUTES=${{ github.event.inputs.max_sync_minutes || '5' }}
python -c "
import asyncio
import os
import json
import time
import sqlite3
import threading
from datetime import datetime, timedelta, UTC
from fast_intercom_mcp.sync_service import SyncService
from fast_intercom_mcp.database import DatabaseManager
from fast_intercom_mcp.intercom_client import IntercomClient
def progress_monitor(db_path, sync_start_time, max_minutes, stop_event):
'''Monitor progress and show updates every 10 seconds'''
last_count = 0
last_msg_count = 0
while not stop_event.is_set():
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.execute('SELECT COUNT(*) FROM conversations')
current_count = cursor.fetchone()[0]
cursor = conn.execute('SELECT COUNT(*) FROM messages')
current_msg_count = cursor.fetchone()[0]
elapsed = time.time() - sync_start_time
elapsed_minutes = elapsed / 60
remaining_minutes = max_minutes - elapsed_minutes
if current_count > last_count or current_msg_count > last_msg_count:
conv_rate = current_count / max(elapsed, 1)
msg_rate = current_msg_count / max(elapsed, 1)
print(f'๐ Progress: {current_count:,} conversations, {current_msg_count:,} messages | '
f'Rate: {conv_rate:.1f} conv/sec, {msg_rate:.1f} msg/sec | '
f'Elapsed: {elapsed_minutes:.1f}/{max_minutes} min')
last_count = current_count
last_msg_count = current_msg_count
time.sleep(10)
except Exception as e:
print(f'Monitor error: {e}')
time.sleep(5)
async def run_time_limited_sync(sync_service, max_minutes):
'''Run sync for a maximum time period'''
# Calculate time window - use last 90 days of data to ensure we have enough to sync
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=90)
print(f'๐
Time-limited sync configuration:')
print(f' - Max duration: {max_minutes} minutes')
print(f' - Data window: {start_date.date()} to {end_date.date()} (90 days available)')
print(f' - Performance target: >10 conversations/second')
print(f' - Will sync as much as possible within time limit')
print('')
sync_start = time.time()
max_seconds = max_minutes * 60
# Track what we've synced
total_stats = None
try:
# Use existing sync_period method with timeout
# The sync will process data chronologically and we'll stop when time runs out
total_stats = await asyncio.wait_for(
sync_service.sync_period(start_date, end_date),
timeout=max_seconds
)
print(f'โ
Sync completed within time limit')
except asyncio.TimeoutError:
print(f'โฐ Time limit reached ({max_minutes} minutes) - stopping sync')
# Get partial results from database
with sqlite3.connect('./test_integration.db') as conn:
cursor = conn.execute('SELECT COUNT(*) FROM conversations')
conv_count = cursor.fetchone()[0]
cursor = conn.execute('SELECT COUNT(*) FROM messages')
msg_count = cursor.fetchone()[0]
sync_duration = time.time() - sync_start
# Create stats for partial sync
from fast_intercom_mcp.models import SyncStats
total_stats = SyncStats(
total_conversations=conv_count,
new_conversations=conv_count, # Approximate
updated_conversations=0,
total_messages=msg_count,
duration_seconds=sync_duration,
api_calls_made=int(sync_duration / 3) # Rough estimate: ~1 call per 3 seconds
)
return total_stats
async def run_integration_test():
# Initialize components
db = DatabaseManager('./test_integration.db')
client = IntercomClient(os.getenv('INTERCOM_ACCESS_TOKEN'))
sync_service = SyncService(db, client)
# Get max sync time
max_minutes = int('$MAX_SYNC_MINUTES')
print(f'๐ Starting time-based integration test')
print(f'โฑ๏ธ Maximum sync duration: {max_minutes} minutes')
print('')
# Start progress monitoring in background
sync_start = time.time()
stop_event = threading.Event()
monitor_thread = threading.Thread(
target=progress_monitor,
args=('./test_integration.db', sync_start, max_minutes, stop_event),
daemon=True
)
monitor_thread.start()
try:
# Run time-limited sync
stats = await run_time_limited_sync(sync_service, max_minutes)
sync_duration = stats.duration_seconds
# Stop the monitor
stop_event.set()
# Calculate performance metrics
conv_per_sec = stats.total_conversations / max(sync_duration, 0.1) if stats.total_conversations > 0 else 0
msg_per_sec = stats.total_messages / max(sync_duration, 0.1) if stats.total_messages > 0 else 0
# Collect performance metrics
metrics = {
'test_timestamp': datetime.now(UTC).isoformat(),
'max_sync_minutes': max_minutes,
'actual_sync_minutes': round(sync_duration / 60, 2),
'total_conversations': stats.total_conversations,
'new_conversations': stats.new_conversations,
'updated_conversations': stats.updated_conversations,
'total_messages': stats.total_messages,
'api_calls_made': stats.api_calls_made,
'sync_duration_seconds': round(sync_duration, 2),
'conversations_per_second': round(conv_per_sec, 2),
'messages_per_second': round(msg_per_sec, 2),
'performance_passed': conv_per_sec >= 10 # Performance requirement
}
# Save metrics for artifacts
with open('performance_metrics.json', 'w') as f:
json.dump(metrics, f, indent=2)
# Print formatted results
print()
print('๐ Integration Test Results')
print('=' * 50)
print(f'โ
Test completed successfully')
print(f'โ
Sync duration: {metrics[\"actual_sync_minutes\"]} minutes (limit: {max_minutes})')
print(f'โ
Conversations synced: {stats.total_conversations:,}')
print(f'โ
Messages synced: {stats.total_messages:,}')
print(f'โ
New conversations: {stats.new_conversations:,}')
print(f'โ
Updated conversations: {stats.updated_conversations:,}')
print(f'โ
API calls made: {stats.api_calls_made:,}')
print(f'โ
Sync speed: {conv_per_sec:.1f} conv/sec {"โ
PASS" if conv_per_sec >= 10 else "โ FAIL (need โฅ10)"}')
print(f'โ
Message speed: {msg_per_sec:.1f} msg/sec')
# Verify data integrity
with sqlite3.connect('./test_integration.db') as conn:
cursor = conn.execute('SELECT COUNT(*) FROM conversations')
conv_count = cursor.fetchone()[0]
cursor = conn.execute('SELECT COUNT(*) FROM messages')
msg_count = cursor.fetchone()[0]
print(f'โ
Database integrity: {conv_count:,} conversations, {msg_count:,} messages')
# Performance check
if conv_per_sec < 10:
raise Exception(f'Performance requirement not met: {conv_per_sec:.1f} conv/sec < 10 conv/sec')
print()
print('โ
All checks passed!')
except Exception as e:
stop_event.set() # Stop monitor on error
print(f'โ Test failed: {e}')
# Save error info for debugging
with open('error_report.json', 'w') as f:
json.dump({
'error': str(e),
'error_type': type(e).__name__,
'timestamp': datetime.now(UTC).isoformat(),
'max_sync_minutes': max_minutes
}, f, indent=2)
raise
asyncio.run(run_integration_test())
"
# Calculate total test time
TEST_END_TIME=$(date +%s)
TOTAL_TEST_TIME=$((TEST_END_TIME - TEST_START_TIME))
echo ""
echo "๐ Integration test completed successfully!"
echo "Total test execution time: ${TOTAL_TEST_TIME}s"
- name: Generate test summary
if: always()
working-directory: integration_test_data
run: |
echo "## Integration Test Summary" > test_summary.md
echo "" >> test_summary.md
echo "**Timestamp:** $(date -u '+%Y-%m-%d %H:%M:%S UTC')" >> test_summary.md
echo "**Workflow:** ${{ github.workflow }}" >> test_summary.md
echo "**Run ID:** ${{ github.run_id }}" >> test_summary.md
echo "**Commit:** ${{ github.sha }}" >> test_summary.md
echo "" >> test_summary.md
if [ -f performance_metrics.json ]; then
echo "**Test Result:** โ
SUCCESS" >> test_summary.md
echo "" >> test_summary.md
echo "### Performance Metrics" >> test_summary.md
echo '```json' >> test_summary.md
cat performance_metrics.json >> test_summary.md
echo '```' >> test_summary.md
else
echo "**Test Result:** โ FAILED" >> test_summary.md
if [ -f error_report.json ]; then
echo "" >> test_summary.md
echo "### Error Report" >> test_summary.md
echo '```json' >> test_summary.md
cat error_report.json >> test_summary.md
echo '```' >> test_summary.md
fi
fi
# Display summary
echo ""
echo "๐ TEST SUMMARY"
echo "==============="
cat test_summary.md
- name: Create database snapshot
if: failure()
working-directory: integration_test_data
run: |
if [ -f test_integration.db ]; then
echo "๐ธ Creating database snapshot for debugging..."
sqlite3 test_integration.db ".dump" > database_snapshot.sql
echo "Database snapshot created: database_snapshot.sql"
fi
- name: Upload test artifacts
if: always()
uses: actions/upload-artifact@v4
with:
name: integration-test-results-${{ github.run_id }}
path: |
integration_test_data/performance_metrics.json
integration_test_data/error_report.json
integration_test_data/test_summary.md
integration_test_data/database_snapshot.sql
integration_test_data/test_integration.db
retention-days: 30
- name: Comment on PR (if applicable)
if: github.event_name == 'pull_request' && always()
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
try {
const summary = fs.readFileSync('integration_test_data/test_summary.md', 'utf8');
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## ๐งช Integration Test Results\n\n${summary}`
});
} catch (error) {
console.log('Could not post PR comment:', error.message);
}
- name: Fail job on test failure
if: failure()
run: |
echo "โ Integration test failed. Check the artifacts for detailed error information."
exit 1