Skip to main content
Glama

stripe

Official
by stripe
main.py2.72 kB
import env import asyncio from emailer import Emailer, Email from typing import Union, List import support_agent import markdown as markdown import json from agents import ( ItemHelpers, TResponseInputItem, ) env.ensure("STRIPE_SECRET_KEY") env.ensure("OPENAI_API_KEY") email_address = env.ensure("EMAIL_ADDRESS") support_address = env.get_or("SUPPORT_ADDRESS", email_address) email_password = env.ensure("EMAIL_PASSWORD") emailer = Emailer(email_address, email_password, support_address) def unsure(str: str) -> bool: return ( "not sure" in str or "unsure" in str or "don't know" in str or "dont know" in str or "do not know" in str ) async def respond(thread: List[Email]) -> Union[Email, None]: most_recent = thread[-1] print(f"Got unread email:\n {json.dumps(most_recent.to_dict())}") # Loop through the entire thread to add historical context for the agent input_items: list[TResponseInputItem] = [] for email in thread: input_items.append( { "content": ( "This is an earlier email:" f"Email from: {email.from_address}\n" f"To: {email.to_address}\n" f"Subject: {email.subject}\n\n" f"{email.body}" ), "role": "user", } ) input_items.append( { "content": ( "This the latest email" "You can use context from earlier emails" "but reply specifically to the following email:" f"Email from: {most_recent.from_address}\n" f"To: {most_recent.to_address}\n" f"Subject: {most_recent.subject}\n\n" f"{most_recent.body}" ), "role": "user", } ) print(f"Sending to agent:\n {json.dumps(input_items)}") output = await support_agent.run(input_items) body_md = ItemHelpers.text_message_outputs(output.new_items) # Handle answers that the agent doesn't know if unsure(body_md.lower()): print( f"Agent doesn't know, ignore response and keep email in the inbox:\n{body_md}" ) return None # OpenAI often returns the body in html fences, trim those body_html = markdown.markdown(body_md, extensions=["tables"]) return Email( from_address=most_recent.to_address, to_address=most_recent.from_address, subject=most_recent.subject, body=body_html, ) async def main(): await emailer.run(respond, delay=30, mark_read=True) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stripe/agent-toolkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server