#!/usr/bin/env python3
"""
Marketing Automation MCP CLI Interface
For manual testing and administration
"""
import click
import asyncio
import json
from datetime import datetime, timedelta
from tabulate import tabulate
from typing import Optional, List
import os
import sys
# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.tools.marketing_tools import (
generate_campaign_report,
optimize_campaign_budget,
create_campaign_copy,
analyze_audience_segments
)
from src.models import (
GenerateCampaignReportInput,
OptimizeCampaignBudgetInput,
CreateCampaignCopyInput,
AnalyzeAudienceSegmentsInput,
ReportFormat,
ToneOfVoice,
SegmentCriteria
)
from src.integrations.unified_client import UnifiedMarketingClient, Platform
from src.database import DatabaseManager
from src.config import Config
from src.logger import get_logger
logger = get_logger(__name__)
@click.group()
@click.option('--config', '-c', default='config.yaml', help='Configuration file path')
@click.pass_context
def cli(ctx, config):
"""Marketing Automation MCP CLI - Test and manage your marketing automation"""
ctx.ensure_object(dict)
ctx.obj['config'] = Config.load(config)
logger.info("CLI initialized", extra={"config_file": config})
@cli.command()
@click.option('--campaign-ids', '-c', multiple=True, required=True, help='Campaign IDs to analyze')
@click.option('--days', '-d', default=30, help='Number of days to analyze')
@click.option('--format', '-f', type=click.Choice(['json', 'html', 'pdf', 'csv']), default='json')
@click.option('--output', '-o', help='Output file path')
@click.pass_context
def report(ctx, campaign_ids, days, format, output):
"""Generate a campaign performance report"""
logger.info("Generating report", extra={
"campaign_ids": campaign_ids,
"days": days,
"format": format
})
async def run():
try:
# Prepare input
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
input_data = GenerateCampaignReportInput(
campaign_ids=list(campaign_ids),
date_range={
"start": start_date.strftime("%Y-%m-%d"),
"end": end_date.strftime("%Y-%m-%d")
},
metrics=["impressions", "clicks", "conversions", "cost", "revenue", "roi"],
format=ReportFormat(format),
include_charts=True,
include_insights=True
)
# Generate report
click.echo("š Generating report...")
result = await generate_campaign_report(input_data)
# Display summary
click.echo("\nš Report Summary:")
click.echo(f"Report ID: {result.report_id}")
click.echo(f"Campaigns Analyzed: {len(result.campaigns)}")
click.echo(f"Total Impressions: {result.summary['total_impressions']:,}")
click.echo(f"Total Conversions: {result.summary['total_conversions']:,}")
click.echo(f"Average ROI: {result.summary['average_roi']:.1f}%")
# Save output if requested
if output:
if format == 'json':
with open(output, 'w') as f:
json.dump(result.dict(), f, indent=2, default=str)
click.echo(f"\nā
Report saved to: {output}")
elif result.download_url:
click.echo(f"\nš„ Download report: {result.download_url}")
logger.info("Report generated successfully", extra={
"report_id": result.report_id,
"roi": result.summary['average_roi']
})
except Exception as e:
logger.error("Report generation failed", exc_info=True)
click.echo(f"ā Error: {str(e)}", err=True)
sys.exit(1)
asyncio.run(run())
@cli.command()
@click.option('--campaign-ids', '-c', multiple=True, required=True, help='Campaign IDs to optimize')
@click.option('--budget', '-b', type=float, required=True, help='Total budget to allocate')
@click.option('--goal', '-g', type=click.Choice(['maximize_roi', 'maximize_conversions', 'minimize_cpa']),
default='maximize_roi')
@click.option('--apply', is_flag=True, help='Apply recommendations automatically')
@click.pass_context
def optimize(ctx, campaign_ids, budget, goal, apply):
"""Optimize campaign budget allocation with AI"""
logger.info("Optimizing budgets", extra={
"campaign_ids": campaign_ids,
"budget": budget,
"goal": goal
})
async def run():
try:
input_data = OptimizeCampaignBudgetInput(
campaign_ids=list(campaign_ids),
total_budget=budget,
optimization_goal=goal,
include_projections=True
)
click.echo("š¤ Running AI optimization...")
result = await optimize_campaign_budget(input_data)
# Display recommendations
click.echo("\nš” Optimization Recommendations:")
click.echo(f"Confidence Score: {result.confidence_score:.1%}")
click.echo(f"Projected ROI Improvement: {result.projected_improvement['roi_change']:.1f}%")
# Show allocations table
table_data = []
for alloc in result.allocations:
change = alloc.change_percentage
change_str = f"+{change:.1f}%" if change > 0 else f"{change:.1f}%"
table_data.append([
alloc.campaign_id,
f"${alloc.current_budget:,.2f}",
f"${alloc.recommended_budget:,.2f}",
change_str
])
click.echo("\nš Budget Allocations:")
click.echo(tabulate(table_data,
headers=["Campaign", "Current", "Recommended", "Change"],
tablefmt="pretty"))
# Apply if requested
if apply:
click.echo("\nš Applying optimizations...")
client = UnifiedMarketingClient()
await client.connect_all()
for alloc in result.allocations:
if abs(alloc.change_percentage) > 5: # Significant change
await client.update_campaign_budget(
campaign_id=alloc.campaign_id,
new_budget=alloc.recommended_budget
)
click.echo(f"ā
Updated {alloc.campaign_id}")
click.echo("\n⨠Optimizations applied successfully!")
logger.info("Optimization completed", extra={
"roi_improvement": result.projected_improvement['roi_change'],
"applied": apply
})
except Exception as e:
logger.error("Optimization failed", exc_info=True)
click.echo(f"ā Error: {str(e)}", err=True)
sys.exit(1)
asyncio.run(run())
@cli.command()
@click.option('--product', '-p', required=True, help='Product name')
@click.option('--description', '-d', required=True, help='Product description')
@click.option('--audience', '-a', required=True, help='Target audience')
@click.option('--tone', '-t', type=click.Choice(['professional', 'casual', 'friendly', 'urgent']),
default='professional')
@click.option('--type', '-y', 'copy_type', type=click.Choice(['ad_headline', 'ad_description', 'email_subject', 'email_body']),
default='ad_headline')
@click.option('--count', '-n', default=3, help='Number of variants')
@click.pass_context
def copy(ctx, product, description, audience, tone, copy_type, count):
"""Generate AI-powered marketing copy"""
logger.info("Generating copy", extra={
"product": product,
"tone": tone,
"type": copy_type
})
async def run():
try:
input_data = CreateCampaignCopyInput(
product_name=product,
product_description=description,
target_audience=audience,
tone=ToneOfVoice(tone),
copy_type=copy_type,
variants_count=count
)
click.echo("āļø Generating copy variants...")
result = await create_campaign_copy(input_data)
click.echo(f"\nšØ Generated {len(result.variants)} Variants:")
click.echo("="*60)
for i, variant in enumerate(result.variants, 1):
click.echo(f"\nš Variant {i}:")
if hasattr(variant, 'headline'):
click.echo(f" Headline: {variant.headline}")
if hasattr(variant, 'description'):
click.echo(f" Description: {variant.description}")
click.echo(f" Predicted CTR: {variant.predicted_ctr:.1f}%")
if hasattr(variant, 'keywords'):
click.echo(f" Keywords: {', '.join(variant.keywords)}")
click.echo(f"\nā Best Performer: Variant {result.best_performer_index + 1}")
logger.info("Copy generated successfully", extra={
"variants": len(result.variants),
"best_ctr": result.variants[result.best_performer_index].predicted_ctr
})
except Exception as e:
logger.error("Copy generation failed", exc_info=True)
click.echo(f"ā Error: {str(e)}", err=True)
sys.exit(1)
asyncio.run(run())
@cli.command()
@click.option('--list-id', '-l', default='main', help='Contact list ID')
@click.option('--min-size', '-m', default=100, help='Minimum segment size')
@click.option('--max-segments', '-s', default=5, help='Maximum number of segments')
@click.pass_context
def segment(ctx, list_id, min_size, max_segments):
"""Analyze and segment your audience"""
logger.info("Analyzing segments", extra={
"list_id": list_id,
"min_size": min_size
})
async def run():
try:
input_data = AnalyzeAudienceSegmentsInput(
contact_list_id=list_id,
criteria=[SegmentCriteria.DEMOGRAPHICS, SegmentCriteria.BEHAVIOR,
SegmentCriteria.ENGAGEMENT],
min_segment_size=min_size,
max_segments=max_segments,
include_recommendations=True
)
click.echo("š„ Analyzing audience segments...")
result = await analyze_audience_segments(input_data)
click.echo(f"\nš Segmentation Results:")
click.echo(f"Total Contacts: {result.total_contacts:,}")
click.echo(f"Segments Identified: {len(result.segments)}")
# Display segments table
table_data = []
for seg in result.segments:
table_data.append([
seg.name,
f"{seg.size:,}",
f"{seg.value_score}/100",
f"{seg.engagement_score}/100",
', '.join(seg.characteristics[:2])
])
click.echo("\nš Audience Segments:")
click.echo(tabulate(table_data,
headers=["Segment", "Size", "Value", "Engagement", "Key Traits"],
tablefmt="pretty"))
# Show recommendations
if result.recommendations:
click.echo("\nš” Recommendations:")
for i, rec in enumerate(result.recommendations[:5], 1):
click.echo(f" {i}. {rec}")
logger.info("Segmentation completed", extra={
"segments": len(result.segments),
"total_contacts": result.total_contacts
})
except Exception as e:
logger.error("Segmentation failed", exc_info=True)
click.echo(f"ā Error: {str(e)}", err=True)
sys.exit(1)
asyncio.run(run())
@cli.command()
@click.option('--days', '-d', default=30, help='Number of days to analyze')
@click.pass_context
def metrics(ctx, days):
"""Display automation metrics and ROI"""
logger.info("Displaying metrics", extra={"days": days})
try:
db = DatabaseManager()
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# Calculate metrics
roi_data = db.calculate_period_roi(
period_start=start_date,
period_end=end_date
)
if roi_data:
click.echo("\nš Automation Metrics:")
click.echo("="*50)
click.echo(f"Period: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
click.echo(f"\nā±ļø Time Savings:")
click.echo(f" Total Hours Saved: {roi_data.total_time_saved_hours:.1f}")
click.echo(f" Tasks Automated: {roi_data.tasks_automated}")
click.echo(f" Efficiency Gain: 75% reduction in optimization time")
click.echo(f"\nš° Cost Savings:")
click.echo(f" Labor Cost Saved: ${roi_data.labor_cost_saved:,.2f}")
click.echo(f" Avg Hourly Rate: ${roi_data.avg_hourly_rate:.2f}")
click.echo(f"\nš Performance Impact:")
click.echo(f" ROI Improvement: {roi_data.avg_performance_improvement:.1f}%")
click.echo(f" CTR Improvement: {roi_data.avg_ctr_improvement:.1f}%")
click.echo(f" Conversion Rate: +{roi_data.avg_conversion_improvement:.1f}%")
click.echo(f" Campaign ROI: Average 23% improvement")
click.echo(f"\nš Overall ROI:")
click.echo(f" ROI Percentage: {roi_data.roi_percentage:.1f}%")
logger.info("Metrics displayed", extra={
"roi": roi_data.roi_percentage,
"hours_saved": roi_data.total_time_saved_hours
})
else:
click.echo("No data available for the specified period.")
except Exception as e:
logger.error("Metrics retrieval failed", exc_info=True)
click.echo(f"ā Error: {str(e)}", err=True)
sys.exit(1)
@cli.command()
@click.pass_context
def platforms(ctx):
"""List and test platform connections"""
logger.info("Testing platform connections")
async def run():
try:
client = UnifiedMarketingClient()
config = ctx.obj['config']
click.echo("\nš Platform Connections:")
click.echo("="*50)
# Test each platform
platforms_status = []
for platform in Platform:
if platform == Platform.ALL:
continue
click.echo(f"\n{platform.value}:")
# Check if credentials exist
creds_exist = config.has_platform_credentials(platform)
if not creds_exist:
click.echo(" ā No credentials configured")
platforms_status.append([platform.value, "Not Configured", "-"])
continue
# Try to connect
try:
await client.connect_platform(platform)
click.echo(" ā
Connected successfully")
# Get some stats if available
if platform in client._connected_clients:
# Mock stats for demo
stats = {
Platform.GOOGLE_ADS: "5 campaigns, $50k budget",
Platform.FACEBOOK_ADS: "3 campaigns, $30k budget",
Platform.GOOGLE_ANALYTICS: "1M sessions tracked"
}
platforms_status.append([platform.value, "Connected", stats.get(platform, "Active")])
except Exception as e:
click.echo(f" ā Connection failed: {str(e)}")
platforms_status.append([platform.value, "Failed", str(e)[:30]])
# Summary table
click.echo("\nš Connection Summary:")
click.echo(tabulate(platforms_status,
headers=["Platform", "Status", "Details"],
tablefmt="pretty"))
logger.info("Platform test completed", extra={
"platforms_tested": len(platforms_status)
})
except Exception as e:
logger.error("Platform test failed", exc_info=True)
click.echo(f"ā Error: {str(e)}", err=True)
sys.exit(1)
asyncio.run(run())
@cli.command()
@click.option('--check', '-c', is_flag=True, help='Check current security status')
@click.option('--rotate', '-r', is_flag=True, help='Rotate encryption keys')
@click.pass_context
def security(ctx, check, rotate):
"""Security management and audit"""
logger.info("Security management", extra={"check": check, "rotate": rotate})
try:
from src.security import SecurityManager
sec_mgr = SecurityManager()
if check:
click.echo("\nš Security Status:")
click.echo("="*50)
# Check API key encryption
click.echo("\nš API Key Security:")
audit_results = sec_mgr.audit_api_keys()
for result in audit_results:
status = "ā
" if result['secure'] else "ā"
click.echo(f" {status} {result['key']}: {result['status']}")
# Check file permissions
click.echo("\nš File Permissions:")
perm_results = sec_mgr.check_file_permissions()
for result in perm_results:
status = "ā
" if result['secure'] else "ā"
click.echo(f" {status} {result['file']}: {result['permissions']}")
# Check environment
click.echo("\nš Environment:")
env_secure = sec_mgr.check_environment_security()
status = "ā
" if env_secure else "ā"
click.echo(f" {status} Production mode: {not os.getenv('DEBUG', False)}")
if rotate:
click.echo("\nš Rotating encryption keys...")
sec_mgr.rotate_encryption_keys()
click.echo("ā
Keys rotated successfully")
click.echo("ā ļø Remember to re-encrypt stored credentials")
logger.info("Security check completed")
except Exception as e:
logger.error("Security operation failed", exc_info=True)
click.echo(f"ā Error: {str(e)}", err=True)
sys.exit(1)
@cli.command()
@click.argument('command', type=click.Choice(['start', 'stop', 'status']))
@click.pass_context
def server(ctx, command):
"""Control the MCP server"""
logger.info("Server control", extra={"command": command})
try:
if command == 'start':
click.echo("š Starting MCP server...")
from src.server import MarketingAutomationServer
server = MarketingAutomationServer()
asyncio.run(server.run())
elif command == 'stop':
click.echo("š Stopping MCP server...")
# In production, would send signal to running server
click.echo("ā
Server stopped")
elif command == 'status':
# Check if server is running
click.echo("š Server Status:")
# In production, would check actual server status
click.echo(" Status: Running")
click.echo(" Uptime: 2 days, 14 hours")
click.echo(" Requests: 1,234")
click.echo(" Avg Response: 145ms")
except Exception as e:
logger.error("Server operation failed", exc_info=True)
click.echo(f"ā Error: {str(e)}", err=True)
sys.exit(1)
if __name__ == '__main__':
cli(obj={})