#!/usr/bin/env python3
import asyncio
import json
import re
from datetime import datetime, timedelta
from src.mcp_client import create_mcp_client, list_tools, call_tool
def load_config():
try:
with open('config.json', 'r') as f:
return json.load(f)
except:
return {'google': {}}
def analyze_emails_for_actions(emails):
actions = []
if not emails or not emails.get('messages') or len(emails['messages']) == 0:
return actions
for email in emails['messages']:
subject = email.get('subject', '').lower()
body = email.get('body', '').lower()
if 'meeting' in subject or 'schedule' in subject or 'meeting' in body:
actions.append({
'type': 'calendar_check',
'email': email,
'reason': 'Email mentions meeting - checking calendar'
})
if 'task' in subject or 'todo' in subject or 'please add' in body:
actions.append({
'type': 'sheet_add',
'email': email,
'reason': 'Email mentions task - adding to sheet'
})
email_from = email.get('from', '').lower()
if 'urgent' in email_from or 'urgent' in subject:
actions.append({
'type': 'priority',
'email': email,
'reason': 'Urgent email detected'
})
return actions
def extract_meeting_info(email):
subject = email.get('subject', '')
body = email.get('body', '')
time_pattern = r'(\d{1,2}):(\d{2})\s*(am|pm)'
date_pattern = r'(monday|tuesday|wednesday|thursday|friday|saturday|sunday|tomorrow|today)'
time_matches = re.findall(time_pattern, body, re.IGNORECASE)
date_matches = re.findall(date_pattern, body, re.IGNORECASE)
return {
'hasTime': len(time_matches) > 0,
'hasDate': len(date_matches) > 0,
'subject': subject
}
def generate_event_from_email(email, spreadsheet_id):
meeting_info = extract_meeting_info(email)
tomorrow = datetime.now() + timedelta(days=1)
tomorrow = tomorrow.replace(hour=10, minute=0, second=0, microsecond=0)
end_time = tomorrow + timedelta(hours=1)
from_email = email.get('from', '')
email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', from_email)
attendee_email = email_match.group(0) if email_match else None
return {
'summary': email.get('subject', 'Meeting from Email'),
'description': f"Meeting requested via email from {from_email}\n\n{email.get('body', '')[:200]}",
'start': tomorrow.isoformat(),
'end': end_time.isoformat(),
'location': '',
'attendees': [attendee_email] if attendee_email else []
}
def generate_task_from_email(email):
return {
'task': email.get('subject', 'Task from Email'),
'source': email.get('from', 'Unknown'),
'date': datetime.now().isoformat(),
'status': 'Pending',
'description': email.get('body', '')[:200] if email.get('body') else ''
}
async def process_decision(session, decision, config):
try:
if decision['type'] == 'calendar_check':
print('Checking calendar for upcoming events...')
events = await call_tool(session, 'list_calendar_events', {
'maxResults': 5
})
if events.get('success') and events.get('events'):
print(f"Found {len(events['events'])} upcoming events")
if len(events['events']) > 0:
print('Upcoming events:')
for event in events['events']:
print(f" - {event.get('summary', '')} on {event.get('start', '')}")
elif decision['type'] == 'sheet_add':
print('Adding task to spreadsheet...')
task = generate_task_from_email(decision['email'])
values = [[
task['date'],
task['task'],
task['source'],
task['status'],
task['description']
]]
result = await call_tool(session, 'append_sheet', {
'spreadsheetId': config.get('google', {}).get('spreadsheetId'),
'range': 'Sheet1',
'values': values
})
if result.get('success'):
print('Task added to spreadsheet successfully')
else:
print(f"Failed to add task: {result.get('error', 'Unknown error')}")
elif decision['type'] == 'calendar_create':
print('Creating calendar event...')
event_data = generate_event_from_email(decision['email'], config.get('google', {}).get('spreadsheetId'))
result = await call_tool(session, 'create_calendar_event', event_data)
if result.get('success'):
print(f"Event created: {result.get('eventId', '')}")
print(f"Event link: {result.get('htmlLink', 'N/A')}")
else:
print(f"Failed to create event: {result.get('error', 'Unknown error')}")
elif decision['type'] == 'email_response':
print('Sending email response...')
result = await call_tool(session, 'send_email', {
'to': decision['email'].get('from', ''),
'subject': f"Re: {decision['email'].get('subject', '')}",
'body': decision.get('response', 'Thank you for your email. I have processed your request.')
})
if result.get('success'):
print('Email sent successfully')
else:
print(f"Failed to send email: {result.get('error', 'Unknown error')}")
elif decision['type'] == 'priority':
print('Urgent email detected - taking priority action')
task = generate_task_from_email(decision['email'])
task['status'] = 'Urgent'
sheet_result = await call_tool(session, 'append_sheet', {
'spreadsheetId': config.get('google', {}).get('spreadsheetId'),
'range': 'Sheet1',
'values': [[
task['date'],
task['task'],
task['source'],
task['status'],
task['description']
]]
})
if sheet_result.get('success'):
print('Urgent task added to spreadsheet')
except Exception as error:
print(f"Error processing decision {decision.get('type', 'unknown')}: {str(error)}")
async def make_automatic_decisions(session, config):
print('Starting automatic decision-making process...\n')
print('Step 1: Checking emails...')
emails = await call_tool(session, 'list_emails', {
'maxResults': 10,
'query': 'in:inbox'
})
if not emails.get('success'):
print(f"Failed to fetch emails: {emails.get('error', 'Unknown error')}")
return
print(f"Found {len(emails.get('messages', []))} emails\n")
print('Step 2: Analyzing emails for actions...')
actions = analyze_emails_for_actions(emails)
print(f"Identified {len(actions)} potential actions\n")
if len(actions) == 0:
print('No actions required based on email analysis')
return
print('Step 3: Processing decisions...\n')
for action in actions:
print(f"Processing: {action['reason']}")
if action['type'] == 'calendar_check':
meeting_info = extract_meeting_info(action['email'])
if meeting_info['hasTime'] or meeting_info['hasDate']:
action['type'] = 'calendar_create'
action['reason'] = 'Email contains meeting info - creating calendar event'
if action['type'] == 'sheet_add' and action['email'].get('from'):
action['response'] = f"I have added your request \"{action['email'].get('subject', '')}\" to my task list."
await process_decision(session, action, config)
response_action = {
'type': 'email_response',
'email': action['email'],
'response': action['response']
}
await process_decision(session, response_action, config)
else:
await process_decision(session, action, config)
print('')
print('Step 4: Checking calendar status...')
events = await call_tool(session, 'list_calendar_events', {
'maxResults': 5
})
if events.get('success'):
print(f"Calendar has {len(events.get('events', []))} upcoming events")
print('\nAutomatic decision-making process completed')
async def main():
try:
print('Initializing MCP Client...')
session = await create_mcp_client()
print('Fetching available tools...')
tools = await list_tools(session)
print(f"Connected. Available tools: {len(tools)}\n")
config = load_config()
await make_automatic_decisions(session, config)
print('\nAgent session ended')
except Exception as error:
print(f'Agent error: {str(error)}')
import traceback
traceback.print_exc()
exit(1)
if __name__ == '__main__':
asyncio.run(main())