#!/usr/bin/env python3.11
"""
Secure Stevia Store MCP Server with Database & Security
A production-ready e-commerce MCP server for Organital's stevia products
Includes SQLite database, password hashing, encryption, and security features
"""
import asyncio
import argparse
import sqlite3
import hashlib
import secrets
import bcrypt
import json
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from cryptography.fernet import Fernet
from mcp.server.fastmcp import FastMCP
# Initialize the MCP server
mcp = FastMCP("๐ Secure Stevia Store")
# Database configuration
DATABASE_PATH = "stevia_store.db"
ENCRYPTION_KEY_FILE = "encryption.key"
# Load or generate encryption key
def get_encryption_key():
if os.path.exists(ENCRYPTION_KEY_FILE):
with open(ENCRYPTION_KEY_FILE, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(ENCRYPTION_KEY_FILE, 'wb') as f:
f.write(key)
return key
# Initialize encryption
ENCRYPTION_KEY = get_encryption_key()
cipher_suite = Fernet(ENCRYPTION_KEY)
# In-memory session management
active_sessions = {}
shopping_carts = {} # session_id -> cart
def encrypt_data(data: str) -> str:
"""Encrypt sensitive data"""
return cipher_suite.encrypt(data.encode()).decode()
def decrypt_data(encrypted_data: str) -> str:
"""Decrypt sensitive data"""
return cipher_suite.decrypt(encrypted_data.encode()).decode()
def hash_password(password: str) -> str:
"""Hash password using bcrypt"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def generate_session_token() -> str:
"""Generate secure session token"""
return secrets.token_urlsafe(32)
def init_database():
"""Initialize SQLite database with all required tables"""
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Enable foreign keys
cursor.execute("PRAGMA foreign_keys = ON")
# Users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
full_name TEXT NOT NULL,
phone TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
""")
# Products table
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
category TEXT NOT NULL,
price REAL NOT NULL,
description TEXT,
weight TEXT,
origin TEXT,
kosher BOOLEAN DEFAULT 0,
organic BOOLEAN DEFAULT 0,
rating REAL DEFAULT 0,
reviews INTEGER DEFAULT 0,
stock_quantity INTEGER DEFAULT 0,
sku TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Customers table (for guest checkouts)
cursor.execute("""
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL,
full_name TEXT NOT NULL,
phone TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Addresses table
cursor.execute("""
CREATE TABLE IF NOT EXISTS addresses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
customer_id INTEGER,
address_line TEXT NOT NULL,
city TEXT,
postal_code TEXT,
is_default BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (customer_id) REFERENCES customers(id)
)
""")
# Orders table
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tracking_number TEXT UNIQUE NOT NULL,
user_id INTEGER,
customer_id INTEGER,
customer_name TEXT NOT NULL,
customer_email TEXT NOT NULL,
customer_phone TEXT,
shipping_address TEXT NOT NULL,
notes TEXT,
subtotal REAL NOT NULL,
discount_amount REAL DEFAULT 0,
applied_discount TEXT,
shipping_cost REAL DEFAULT 0,
total_amount REAL NOT NULL,
status TEXT DEFAULT 'pending',
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
estimated_delivery TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (customer_id) REFERENCES customers(id)
)
""")
# Order items table
cursor.execute("""
CREATE TABLE IF NOT EXISTS order_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
product_name TEXT NOT NULL,
price REAL NOT NULL,
quantity INTEGER NOT NULL,
total_price REAL NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(id),
FOREIGN KEY (product_id) REFERENCES products(id)
)
""")
# Sessions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_token TEXT UNIQUE NOT NULL,
user_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
is_active BOOLEAN DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users(id)
)
""")
# FAQ categories table
cursor.execute("""
CREATE TABLE IF NOT EXISTS faq_categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
display_order INTEGER DEFAULT 0
)
""")
# FAQ items table
cursor.execute("""
CREATE TABLE IF NOT EXISTS faq_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category_id INTEGER NOT NULL,
question TEXT NOT NULL,
answer TEXT NOT NULL,
keywords TEXT,
views INTEGER DEFAULT 0,
helpful_votes INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (category_id) REFERENCES faq_categories(id)
)
""")
# Support tickets table
cursor.execute("""
CREATE TABLE IF NOT EXISTS support_tickets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticket_number TEXT UNIQUE NOT NULL,
user_id INTEGER,
customer_email TEXT NOT NULL,
customer_name TEXT NOT NULL,
subject TEXT NOT NULL,
message TEXT NOT NULL,
category TEXT,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'open',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
resolved_at TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
""")
# Support responses table
cursor.execute("""
CREATE TABLE IF NOT EXISTS support_responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticket_id INTEGER NOT NULL,
responder_name TEXT DEFAULT 'ืืขืจืืช ืืชืืืื',
message TEXT NOT NULL,
is_internal BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (ticket_id) REFERENCES support_tickets(id)
)
""")
# Insert sample products
cursor.execute("SELECT COUNT(*) FROM products")
if cursor.fetchone()[0] == 0:
sample_products = [
("ืกืืืืื ืืืขืืช ืืืจืื ืืช 100 ืืจื", "ืืืจื ืืื", 45.90,
"ืกืืืืื ืืืขืืช 100% ืืืจืื ืืช ืืื ืชืืกืคืืช ืืืืืืช, ืืงืืจ ืืฉืืจ ืืืงืืืื ืืคืจืืืืื",
"100 ืืจื", "ืคืจืืืืื", 1, 1, 4.8, 127, 150, "STV-ORG-100"),
("ืกืืืืื ืืืืืืืช - 60 ืืืืืืช", "ืืืฆืจืื ืืืื ืื", 29.90,
"ืืืืืืช ืกืืืืื ื ืืืืช ืืฉืืืืฉ ืืงืคื ืืชื, ืืื ืงืืืจืืืช ืืืชืืืืืช ืืกืืืจืชืืื",
"60 ืืืืืืช", "ืืฉืจืื", 1, 0, 4.5, 89, 200, "STV-TAB-60"),
("ืกืืืืื ื ืืืืืช ืืืขื ืื ืืื - 50ml", "ืืืฆืจืื ืืืื ืื", 39.90,
"ืชืืฆืืช ืกืืืืื ื ืืืืืช ืขื ืืขื ืื ืืื ืืืขื, ืืืืืืืืช ืืืฉืงืืื ืืืชืืื ืื",
"50ml", "ืืฉืจืื", 1, 1, 4.6, 156, 100, "STV-LIQ-VAN"),
("ืืืงืช ืกืืืืื ืืืคืื - 200 ืืจื", "ืืืฉืื ืืืคืื", 55.90,
"ืืืงืช ืกืืืืื ืืืืืืช ืืืคืื ืืืืฉืื, ืขืืืื ืืืื ืืืื ืืืชืืืื ืืื ืืืชืืื ืื",
"200 ืืจื", "ืคืจืืืืื", 1, 1, 4.7, 203, 80, "STV-BAK-200"),
("ืืืจื ืืขืืื - 4 ืกืืืื", "ืืืจืืื", 89.90,
"ืืืจื ืืขืืื ืืืืื 4 ืกืืื ืกืืืืื ืฉืื ืื - ืืืืืืื ืืื ืฉืจืืฆื ืื ืกืืช ืืจืืฉืื ื",
"4 ืืืืืืช", "ืืฉืจืื", 1, 1, 4.9, 95, 50, "STV-PACK-4"),
("ืกืืืืื ืคืจืืืืื ืืืจืื ืืช - 50 ืืจื", "ืคืจืืืืื", 79.90,
"ืกืืืืื ืคืจืืืืื ืืขืื ืืกืืืืื ืืืืืื ืืืืชืจ, ืขืืืจ ืืืืงืืช ืืืืืช ืืืืืจืืช",
"50 ืืจื", "ืืจืืื", 1, 1, 4.9, 67, 30, "STV-PREM-50"),
("ืกืืืืื ื ืืืืืช ืืืขืืช - 30ml", "ืืืฆืจืื ืืืื ืื", 34.90,
"ืชืืฆืืช ืกืืืืื ื ืืืืืช ืืืขืืช ืืื ืชืืกืคืืช, ืืขื ื ืงื ืืืชืืง ืืืื",
"30ml", "ืืฉืจืื", 1, 1, 4.4, 134, 120, "STV-LIQ-NAT"),
("ืกืืืืื ืืฉืชืืื ืงืจื - 100 ืืืืืืช", "ืืืฆืจืื ืืืื ืื", 49.90,
"ืืืืืืช ืกืืืืื ืืืืืืืช ืฉืืชืืืกืกืืช ืืืจ ืืืฉืงืืื ืงืจืื, ืืืืืืืืืช ืืงืืฅ",
"100 ืืืืืืช", "ืืฉืจืื", 1, 0, 4.3, 78, 90, "STV-COLD-100")
]
cursor.executemany("""
INSERT INTO products (name, category, price, description, weight, origin,
kosher, organic, rating, reviews, stock_quantity, sku)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", sample_products)
# Insert FAQ categories
cursor.execute("SELECT COUNT(*) FROM faq_categories")
if cursor.fetchone()[0] == 0:
faq_categories = [
("ืขื ืกืืืืื", "ืืืืข ืืืื ืขื ืืฆืื ืืืชืจืื ืืชืื", 1),
("ืฉืืืืฉ ืืืชืืื ืื", "ืืื ืืืฉืชืืฉ ืืกืืืืื ืืืชืืื ืื", 2),
("ืืืื ืืช ืืืฉืืืืื", "ืืืืข ืขื ืชืืืื ืืืืื ื ืืืืฉืืื", 3),
("ืชืฉืืืืื", "ืืืฆืขื ืชืฉืืื ืืืืืจืืช", 4),
("ืืขืืืช ืืื ืืืช", "ืืขืืืช ืืืชืจ ืืืืขืจืืช", 5),
("ืืืฆืจืื", "ืฉืืืืช ืขื ืืืืฆืจืื ืืืืืื", 6)
]
cursor.executemany("""
INSERT INTO faq_categories (name, description, display_order)
VALUES (?, ?, ?)
""", faq_categories)
# Insert FAQ items
cursor.execute("SELECT COUNT(*) FROM faq_items")
if cursor.fetchone()[0] == 0:
faq_items = [
# ืขื ืกืืืืื
(1, "ืื ืื ืกืืืืื?",
"ืกืืืืื ืืื ืฆืื ืืืขื ืืืฉืคืืช ืืืจืฆืืช ืืืืื ืืืจืื ืืืจืืงื. ืืขืืื ืฉืื ืืืืืื ืืืืจืื ืืืชืืงืื ืืืขืืื ืฉืื ืืชืืงืื ืคื 200-300 ืืืกืืืจ ืื ืืื ืงืืืจืืืช.",
"ืกืืืืื, ืฆืื, ืืืขื, ืืืชืืง"),
(1, "ืืื ืกืืืืื ืืืืื?",
"ืื! ืกืืืืื ืืืืจืช ืืืืืื ืขื ืืื ื-FDA, ืืจืฉืืช ืืืืจืืคืืช ืืืืื ืืืฉืจื ืืืจืืืืช ืืืฉืจืื. ืืื ืืฉืืฉืช ืืืืช ืฉื ืื ืืืจืื ืืืจืืงื.",
"ืืืืืืช, FDA, ืืืฉืืจ, ืืืืฉืจ"),
(1, "ืืื ืกืืืืื ืืฉืคืืขื ืขื ืจืืช ืืกืืืจ?",
"ืกืืืืื ืื ืืขืื ืืช ืจืืช ืืกืืืจ ืืื ืืืฃ ืขืฉืืื ืืขืืืจ ืืืืืกืืช ืจืืืช ืืกืืืจ. ืืชืืืื ืืืืื ืกืืืจืช.",
"ืกืืืจืช, ืจืืช ืกืืืจ, ืืจืืืืช"),
(1, "ืืื ืืฉ ืืกืืืืื ืืขื ืืืืื?",
"ืกืืืืื ืืืืืชืืช ืืื ืฉืื ื ืืืขื ืืื ืืฉืืืจื ืืขื ืืืืื. ืืฉืื ืืงื ืืช ืืืฆืจืื ืืืืืชืืื ืืืืฉืชืืฉ ืืืื ืื ืื ื ืืื ืื.",
"ืืขื ืืืืื, ืืืืืช, ืืื ืื"),
# ืฉืืืืฉ ืืืชืืื ืื
(2, "ืืื ืืฉืชืืฉืื ืืกืืืืื?",
"ืชืืื ืืกืื: ืืืงื - ืืคืืช ืงืื ื ืืืงืื ืืฃ ืกืืืจ, ืืืืืืช - ืืืช ืืืฉืงื, ื ืืืื - 3-5 ืืืคืืช ืืืฉืงื. ืืชืืืื ืืืขื ืืืืกืืคื ืืคื ืืืขื.",
"ืฉืืืืฉ, ืืื ืื, ืืืงื, ืืืืืืช, ื ืืืื"),
(2, "ืืื ืืคืฉืจ ืืืคืืช ืขื ืกืืืืื?",
"ืืืืื! ืืฉืชืืฉื ืืืืงืช ืืกืืืืื ืืืืืืืช ืืืคืื ืฉืื ื. ืืื ืขืืืื ืืืื ืืืชืืืื ืืื ืกืืื ืืืชืืื ืื.",
"ืืคืื, ืืืฉืื, ืืชืืื ืื, ืืื"),
(2, "ืืื ืืืืืคืื ืกืืืจ ืืกืืืืื?",
"ืืืจื ืืื: ืืืก ืกืืืจ = ืืฃ ืฉื ืืืงืช ืกืืืืื. ืืืฉืงืืืช: ืืฃ ืกืืืจ = ืืคืืช ืกืืืืื. ืืืืจื ืฉืื ื ืืืื ืืืจืื ืืืจืืช ืืคืืจื.",
"ืืืจื, ืกืืืจ, ืืืก, ืืื ืื"),
(2, "ืืื ืืคืฉืจ ืืชืช ืืืืืื?",
"ืื! ืกืืืืื ืืืืื ืืืืืื ืืืขืฆื ืขืืืคื ืขื ืกืืืจ ืืืืื ื ืืจืืืืชืืช. ืืื ืื ืืืจืืช ืืขืฉืฉืช ืืื ืืฉืคืืขื ืขื ืืืชื ืืืืช.",
"ืืืืื, ืืืืืืช, ืขืฉืฉืช"),
# ืืืื ืืช ืืืฉืืืืื
(3, "ืืื ืืื ืืืงื ืืืฉืืื?",
"ืืืฉืืื ืืืงื 2-5 ืืื ืขืกืงืื ืืื ืืืจืฅ. ืืืืจ ืืืจืื - ืขื 2 ืืืื, ืืฆืคืื ืืืืจืื - ืขื 5 ืืืื.",
"ืืฉืืื, ืืื, ืืื ืขืกืงืื"),
(3, "ืื ืขืืื ืืืฉืืื?",
"ืืืฉืืื ืขืืื 25 ืฉ\"ื, ืืื ืื ืืื ื ืขื ืืืื ืืช ืืขื 150 ืฉ\"ื!",
"ืขืืืช ืืฉืืื, ืืฉืืื ืืื ื"),
(3, "ืืื ืื ื ืืืื ืืขืงืื ืืืจ ืืืืื ื?",
"ืชืงืื ืืกืคืจ ืืขืงื ืืืืจ ืืจืืืฉื. ืืฉืชืืฉ ืืคืื ืงืฆืื track_order ืขื ืืืกืคืจ ืฉืงืืืืช.",
"ืืขืงื, ืืกืคืจ ืืขืงื, ืกืืืืก"),
(3, "ืืื ืืคืฉืจ ืืฉื ืืช ืืชืืืช ืืฉืืื?",
"ื ืืชื ืืฉื ืืช ืืชืืืช ืจืง ืขื ืฉืืืืฆืจ ื ืฉืื. ืฆืืจ ืงืฉืจ ืขืื ื ืืืืคืื 03-1234567 ืื ืคืชื ืืจืืืก ืชืืืื.",
"ืฉืื ืื ืืชืืืช, ืขืืืื"),
# ืชืฉืืืืื
(4, "ืืื ืืคืฉืจ ืืฉืื?",
"ืื ืื ื ืืงืืืื ืืจืืืกื ืืฉืจืื (ืืืื, ืืืกืืจืงืืจื), ืืื, ืคืืืืืงืก ืืืขืืจื ืื ืงืืืช.",
"ืชืฉืืื, ืืจืืืก ืืฉืจืื, ืืื"),
(4, "ืืื ืืคืฉืจ ืืืืืืจ ืืืฆืจืื?",
"ืื! ืืืืจื ืชืื 14 ืืืื ืืื ืฉืืืืช. ืืืืฆืจ ืฆืจืื ืืืืืช ืืืจืืื ืืงืืจืืช.",
"ืืืืจื, ืืืืจ ืืกืคื"),
(4, "ืืชื ืืืืื ืืจืืืก ืืืฉืจืื?",
"ืืจืืืก ืืืฉืจืื ืืืืื ืจืง ืืืืจ ืฉืืืืฆืจ ื ืืจื ืื ืฉืื. ืื ื ืืืื ืืจืืฉ.",
"ืืืื, ืืจืืืก ืืฉืจืื, ืืชื"),
# ืืขืืืช ืืื ืืืช
(5, "ืืืชืจ ืื ืขืืื ืืื",
"ื ืกื ืืจืขื ื ืืช ืืขืืื ืื ืื ืงืืช cookies. ืื ืืืขืื ื ืืฉืืช, ืฆืืจ ืงืฉืจ ืขืื ื ืขื ืคืจืื ืืืขืื.",
"ืืขืืืช ืืื ืืืช, ืืชืจ"),
(5, "ืื ืืฆืืื ืืืชืืืจ ืืืขืจืืช",
"ืืืืง ืฉืืืืืืื ืืืกืืกืื ื ืืื ืื. ืื ืฉืืืช ืกืืกืื, ืืฉืชืืฉ ืืืคืฉืจืืช ืืืคืืก ืกืืกืื.",
"ืืชืืืจืืช, ืกืืกืื, ืืืืืื"),
# ืืืฆืจืื
(6, "ืืืื ืืืฆืจ ืืื ืืืืืฅ ืืืชืืืืื?",
"ืืืจื ืืืขืืื ืืื ืืื ืืชืืื ืืืชืืืืื - ืชืืื ืื ืกืืช 4 ืกืืืื ืฉืื ืื ืืืจืืืช ืื ืืื ืืชืืื ืื.",
"ืืชืืืืื, ืืืจื ืืขืืื, ืืืืฆื"),
(6, "ืืชื ืืืืฆืจืื ืืืืจื ืืืืื?",
"ืื ืื ื ืืขืืื ืื ืืืื ืืืืคื ืงืืืข. ืื ืืืฆืจ ืืื, ืืืจื ืืื ืืื ืืืืจ ืชืื 3-7 ืืืื.",
"ืืืื, ืืื, ืืืจื")
]
cursor.executemany("""
INSERT INTO faq_items (category_id, question, answer, keywords)
VALUES (?, ?, ?, ?)
""", faq_items)
conn.commit()
conn.close()
# Discount codes
PROMOTIONS = {
"WELCOME10": {
"name": "ืืจืืืื ืืืืื",
"description": "ืื ืื 10% ืืืงืืืืช ืืืฉืื",
"discount_percent": 10,
"min_amount": 50
},
"ORGANIC15": {
"name": "ืืืจืื ื 15%",
"description": "ืื ืื 15% ืขื ืืืฆืจืื ืืืจืื ืืื",
"discount_percent": 15,
"min_amount": 100
},
"BULK20": {
"name": "ืงื ืืื ืืืืืช",
"description": "ืื ืื 20% ืขื ืืืื ืืช ืืขื 300 ืฉ\"ื",
"discount_percent": 20,
"min_amount": 300
}
}
def sanitize_input(text: str) -> str:
"""Sanitize user input to prevent injection attacks"""
if not isinstance(text, str):
return ""
# Remove potentially dangerous characters
dangerous_chars = ['<', '>', '"', "'", '&', ';', '(', ')', '|', '`']
for char in dangerous_chars:
text = text.replace(char, '')
return text.strip()
def validate_email(email: str) -> bool:
"""Basic email validation"""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_phone(phone: str) -> bool:
"""Basic Israeli phone validation"""
import re
# Israeli phone patterns: 05X-XXXXXXX or 03-XXXXXXX etc.
pattern = r'^0\d{1,2}-?\d{7,8}$'
return re.match(pattern, phone.replace('-', '').replace(' ', '')) is not None
@mcp.tool()
def register_user(email: str, password: str, full_name: str, phone: str = "") -> str:
"""
๐ ืจืืฉืื ืืฉืชืืฉ ืืืฉ ืืืขืจืืช
Args:
email: ืืชืืืช ืืืืืื (ืืืื)
password: ืกืืกืื (ืืื ืืืื 8 ืชืืืื)
full_name: ืฉื ืืื
phone: ืืกืคืจ ืืืคืื (ืืืคืฆืืื ืื)
Returns:
ืืืฉืืจ ืืจืืฉืื ืืืืืข ืขื ืืืฉืชืืฉ
"""
# Input validation
email = sanitize_input(email).lower()
password = sanitize_input(password)
full_name = sanitize_input(full_name)
phone = sanitize_input(phone)
if not validate_email(email):
return "โ ืืชืืืช ืืืืืื ืื ืชืงืื ื"
if len(password) < 8:
return "โ ืืกืืกืื ืืืืืช ืืืืืช ืืืืจื ืฉื ืืคืืืช 8 ืชืืืื"
if len(full_name) < 2:
return "โ ืฉื ืืื ืืืื ืืืืื ืืคืืืช 2 ืชืืืื"
if phone and not validate_phone(phone):
return "โ ืืกืคืจ ืืืคืื ืื ืชืงืื (ืคืืจืื ืืฉืจืืื ื ืืจืฉ)"
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Check if user already exists
cursor.execute("SELECT id FROM users WHERE email = ?", (email,))
if cursor.fetchone():
conn.close()
return "โ ืืฉืชืืฉ ืขื ืืืืืื ืื ืืืจ ืงืืื ืืืขืจืืช"
# Hash password
password_hash = hash_password(password)
# Insert new user
cursor.execute("""
INSERT INTO users (email, password_hash, full_name, phone)
VALUES (?, ?, ?, ?)
""", (email, password_hash, full_name, phone))
user_id = cursor.lastrowid
conn.commit()
conn.close()
return f"""โ
**ืจืืฉืื ืืืฆืื!**
๐ค **ืคืจืื ืืืฉืชืืฉ:**
๐ง ืืืืืื: {email}
๐จโ๐ผ ืฉื: {full_name}
{'๐ฑ ืืืคืื: ' + phone if phone else ''}
๐ **ืืฆืืืช ืืืืจืฉื ืืื ืืช ืืกืืืืื ืฉื ืืืจืื ืืื!**
**ืืฉืืืื ืืืืื:**
1. ืืฉืชืืฉ ื-login ืืื ืืืชืืืจ
2. ืขืืื ืืืืฆืจืื ืขื browse_products
3. ืืืกืฃ ืืืฆืจืื ืืขืืื ืืืืื!
๐ **ืืื ืืก:** ืงืื ืื ืื WELCOME10 ืขืืืจ ืืืื ื ืจืืฉืื ื!"""
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืจืฉืื: {str(e)}"
@mcp.tool()
def login_user(email: str, password: str) -> str:
"""
๐ ืืชืืืจืืช ืืืขืจืืช
Args:
email: ืืชืืืช ืืืืืื
password: ืกืืกืื
Returns:
ืืืฉืืจ ืืชืืืจืืช ืืืืงื Session
"""
email = sanitize_input(email).lower()
password = sanitize_input(password)
if not validate_email(email):
return "โ ืืชืืืช ืืืืืื ืื ืชืงืื ื"
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Get user
cursor.execute("""
SELECT id, password_hash, full_name, is_active
FROM users WHERE email = ?
""", (email,))
user = cursor.fetchone()
if not user:
conn.close()
return "โ ืืืืืื ืื ืกืืกืื ืฉืืืืื"
user_id, stored_hash, full_name, is_active = user
if not is_active:
conn.close()
return "โ ืืฉืืื ืืืฉืชืืฉ ืืืฉืืช. ืคื ื ืืชืืืื"
# Verify password
if not verify_password(password, stored_hash):
conn.close()
return "โ ืืืืืื ืื ืกืืกืื ืฉืืืืื"
# Generate session token
session_token = generate_session_token()
expires_at = datetime.now() + timedelta(days=7) # 7 days session
# Store session in database
cursor.execute("""
INSERT INTO sessions (session_token, user_id, expires_at)
VALUES (?, ?, ?)
""", (session_token, user_id, expires_at))
# Update last login
cursor.execute("""
UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?
""", (user_id,))
conn.commit()
conn.close()
# Store in memory for quick access
active_sessions[session_token] = {
"user_id": user_id,
"email": email,
"full_name": full_name,
"expires_at": expires_at
}
# Initialize cart for user
shopping_carts[session_token] = {}
return f"""โ
**ืืชืืืจืืช ืืืฆืืืช!**
๐ ืืจืื ืืื, {full_name}!
๐ **Session Token:** `{session_token}`
โฐ **ืชืคืืื:** {expires_at.strftime('%d/%m/%Y %H:%M')}
**ืืืืฉื ืงื ืืื:**
โข `browse_products` - ืขืืื ืืืืฆืจืื
โข `search_products [ืืืื]` - ืืืคืืฉ
โข `view_profile` - ืฆืคืืื ืืคืจืืคืื
โข `view_cart` - ืขืืืช ืงื ืืืช
๐ **ืืืค:** ืื ืืคืขืืืืช ืฉืื ืืขืช ืืฉืืจื ืืืกื ืื ืชืื ืื!"""
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืชืืืจืืช: {str(e)}"
def get_user_from_session(session_token: str) -> Optional[Dict]:
"""Get user info from session token"""
if not session_token or session_token not in active_sessions:
return None
session = active_sessions[session_token]
if datetime.now() > session["expires_at"]:
# Session expired
del active_sessions[session_token]
if session_token in shopping_carts:
del shopping_carts[session_token]
return None
return session
@mcp.tool()
def browse_products(category: str = "", session_token: str = "") -> str:
"""
๐๏ธ ืขืืื ืืงืืืื ืืืฆืจื ืืกืืืืื ืืืืืืื
Args:
category: ืกืื ืื ืืคื ืงืืืืจืื (ืืืคืฆืืื ืื)
session_token: ืืืงื ืืชืืืจืืช (ืืืคืฆืืื ืื)
Returns:
ืจืฉืืืช ืืืืฆืจืื ืืืืื ืื ืขื ืคืจืืื ืืืืื
"""
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Build query
if category:
category = sanitize_input(category)
cursor.execute("""
SELECT id, name, category, price, description, weight, origin,
kosher, organic, rating, reviews, stock_quantity
FROM products
WHERE category = ? AND stock_quantity > 0
ORDER BY name
""", (category,))
else:
cursor.execute("""
SELECT id, name, category, price, description, weight, origin,
kosher, organic, rating, reviews, stock_quantity
FROM products
WHERE stock_quantity > 0
ORDER BY category, price
""")
products = cursor.fetchall()
conn.close()
if not products:
return f"๐ฆ ืื ื ืืฆืื ืืืฆืจืื {'ืืงืืืืจืื ' + category if category else 'ืืืื ืื'}"
# Check if user is logged in
user_session = get_user_from_session(session_token) if session_token else None
user_greeting = f"ืฉืืื {user_session['full_name']}! " if user_session else ""
result = f"๐ฟ **{user_greeting}ืืจืืืื ืืืืื ืืื ืืช ืืกืืืืื ืืืืืืืืช ืฉื ืืืจืื ืืื!**\n\n"
if category:
result += f"๐ **ืงืืืืจืื: {category}**\n\n"
else:
result += "๐ **ืื ืืืืฆืจืื ืฉืื ื:**\n\n"
current_category = ""
for product in products:
(prod_id, name, cat, price, desc, weight, origin,
kosher, organic, rating, reviews, stock) = product
if cat != current_category:
current_category = cat
result += f"\n๐ท๏ธ **{current_category}:**\n{'='*50}\n"
# Stock indicator
if stock > 20:
stock_indicator = "โ
ืืืืื ืจื"
elif stock > 5:
stock_indicator = f"๐ก ื ืืชืจื {stock} ืืืืืืช"
else:
stock_indicator = f"๐ด ื ืืชืจื ืจืง {stock} ืืืืืืช!"
# Product details
result += f"**#{prod_id} - {name}**\n"
result += f"๐ฐ **ืืืืจ: โช{price:.2f}** | {stock_indicator}\n"
result += f"๐ ืืฉืงื: {weight} | ๐ ืืงืืจ: {origin}\n"
# Certifications
certifications = []
if kosher:
certifications.append("โก๏ธ ืืฉืจ")
if organic:
certifications.append("๐ฑ ืืืจืื ื")
if certifications:
result += f"๐ {' | '.join(certifications)}\n"
# Rating
stars = "โญ" * int(rating)
result += f"โญ {rating}/5 ({reviews} ืืืงืืจืืช) {stars}\n"
result += f"๐ {desc}\n"
if user_session:
result += f"๐ *ืืืืกืคื ืืขืืื: add_to_cart ืขื ID #{prod_id} ืื-session_token ืฉืื*\n"
else:
result += f"๐ *ืืืืกืคื ืืขืืื: ืืชืืืจ ืงืืื ืขื login_user*\n"
result += f"๐ *ืืคืจืืื ืืืืื: get_product_details ืขื ID #{prod_id}*\n\n"
result += "\n" + "="*60 + "\n"
result += "๐ **ืืขืจืืช ืืืืืืืช** - ืื ืื ืชืื ืื ืืืฆืคื ืื ืืฉืืืจืื ืืืกื ื ืชืื ืื\n"
result += "๐ **ืืฆืขืืช ืืืืืืืช:**\n"
result += "โข ๐ ืืฉืืื ืืื ื ืืขื โช150\n"
result += "โข ๐ ืื ืื 10% ืืืงืืืืช ืืืฉืื (ืงืื: WELCOME10)\n"
result += "โข ๐ ืืืขืืฅ ืืื ื: 03-1234567\n"
result += "โข ๐ ืืืืจื ืชืื 14 ืืืื"
return result
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืขืื ืช ืืืฆืจืื: {str(e)}"
@mcp.tool()
def add_to_cart(product_id: int, quantity: int = 1, session_token: str = "") -> str:
"""
๐ ืืืกืคืช ืืืฆืจ ืืขืืืช ืงื ืืืช ืืืืืืืช
Args:
product_id: ืืืื ืืืืฆืจ
quantity: ืืืืช (ืืจืืจืช ืืืื: 1)
session_token: ืืืงื ืืชืืืจืืช (ืืืื)
Returns:
ืืืฉืืจ ืืืกืคื ืืืฆื ืืขืืื
"""
# Validate session
user_session = get_user_from_session(session_token)
if not user_session:
return "๐ ื ืืจืฉืช ืืชืืืจืืช ืืืขืจืืช. ืืฉืชืืฉ ื-login_user ืชืืืื"
if quantity <= 0:
return "โ ืืืืช ืืืืืช ืืืืืช ืืืืืืช"
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Get product details
cursor.execute("""
SELECT id, name, price, stock_quantity
FROM products WHERE id = ? AND stock_quantity > 0
""", (product_id,))
product = cursor.fetchone()
conn.close()
if not product:
return f"โ ืื ื ืืฆื ืืืฆืจ ืืืื ืขื ืืืื #{product_id}"
prod_id, name, price, stock = product
if quantity > stock:
return f"โ ๏ธ ืืืืื ืจืง {stock} ืืืืืืช ืืืืืฆืจ '{name}'"
# Initialize cart if needed
if session_token not in shopping_carts:
shopping_carts[session_token] = {}
cart = shopping_carts[session_token]
# Add or update product in cart
if str(product_id) in cart:
total_quantity = cart[str(product_id)]['quantity'] + quantity
if total_quantity > stock:
return f"โ ๏ธ ืื ื ืืชื ืืืืกืืฃ {quantity} ืืืืืืช ื ืืกืคืืช. ืืืืื {stock} ืืืืืืช ืืืขืืื ืืืจ {cart[str(product_id)]['quantity']}"
cart[str(product_id)]['quantity'] = total_quantity
cart[str(product_id)]['total_price'] = total_quantity * price
else:
cart[str(product_id)] = {
'name': name,
'price': price,
'quantity': quantity,
'total_price': quantity * price
}
# Calculate cart summary
total_items = sum(item['quantity'] for item in cart.values())
total_price = sum(item['total_price'] for item in cart.values())
shipping_cost = 0 if total_price >= 150 else 25
final_total = total_price + shipping_cost
result = f"โ
**ืืืืฆืจ ื ืืกืฃ ืืขืืื ืืืืืืืืช!**\n\n"
result += f"๐ค ืืฉืชืืฉ: {user_session['full_name']}\n"
result += f"๐ **{name}**\n"
result += f"๐ฆ ืืืืช: {quantity} ืืืืืืช\n"
result += f"๐ฐ ืืืืจ: โช{(quantity * price):.2f}\n\n"
result += "๐๏ธ **ืกืืืื ืืขืืื:**\n"
result += f"๐ ืกื\"ื ืคืจืืืื: {total_items}\n"
result += f"๐ต ืกื\"ื ืืืฆืจืื: โช{total_price:.2f}\n"
if shipping_cost > 0:
result += f"๐ ืืฉืืื: โช{shipping_cost:.2f}\n"
result += f"๐ก ืืืกืฃ โช{(150 - total_price):.2f} ืืืฉืืื ืืื ื!\n"
else:
result += "๐ ืืฉืืื ืืื ื!\n"
result += f"๐ณ **ืกื\"ื ืืชืฉืืื: โช{final_total:.2f}**\n\n"
result += f"๐ ืืขืืื ืฉืืืจื ืืืืื ืืืกื ืื ืชืื ืื\n"
result += "๐ ืืฆืคืืื ืืขืืื: view_cart ืขื session_token"
return result
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืืกืคื ืืขืืื: {str(e)}"
@mcp.tool()
def secure_checkout(customer_name: str, customer_email: str, customer_phone: str,
shipping_address: str, session_token: str, notes: str = "",
discount_code: str = "") -> str:
"""
๐ณ ืืืฆืืข ืืืื ื ืืืืืืืช ืขื ืฉืืืจื ืืืกื ื ืชืื ืื
Args:
customer_name: ืฉื ืืื
customer_email: ืืชืืืช ืืืืืื
customer_phone: ืืกืคืจ ืืืคืื
shipping_address: ืืชืืืช ืืืื ืืืฉืืื
session_token: ืืืงื ืืชืืืจืืช (ืืืื)
notes: ืืขืจืืช ืืืืืืืช (ืืืคืฆืืื ืื)
discount_code: ืงืื ืื ืื (ืืืคืฆืืื ืื)
Returns:
ืคืจืื ืืืืื ื ืืืืืื ืืืกืคืจ ืืขืงื ืืืืืื
"""
# Validate session
user_session = get_user_from_session(session_token)
if not user_session:
return "๐ ื ืืจืฉืช ืืชืืืจืืช ืืืขืจืืช. ืืฉืชืืฉ ื-login_user ืชืืืื"
# Validate cart
if session_token not in shopping_carts or not shopping_carts[session_token]:
return "๐ ืืขืืื ืจืืงื. ืืืกืฃ ืืืฆืจืื ืืขืืื ืชืืืื"
# Sanitize inputs
customer_name = sanitize_input(customer_name)
customer_email = sanitize_input(customer_email).lower()
customer_phone = sanitize_input(customer_phone)
shipping_address = sanitize_input(shipping_address)
notes = sanitize_input(notes)
discount_code = sanitize_input(discount_code).upper()
# Validate inputs
if not customer_name or len(customer_name) < 2:
return "โ ืฉื ืืงืื ืืืื ืืืืื ืืคืืืช 2 ืชืืืื"
if not validate_email(customer_email):
return "โ ืืชืืืช ืืืืืื ืื ืชืงืื ื"
if not validate_phone(customer_phone):
return "โ ืืกืคืจ ืืืคืื ืื ืชืงืื (ืคืืจืื ืืฉืจืืื ื ืืจืฉ)"
if not shipping_address or len(shipping_address) < 10:
return "โ ืืชืืืช ืืฉืืื ืืืืืช ืืืืื ืืคืืืช 10 ืชืืืื"
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cart = shopping_carts[session_token]
# Calculate totals
subtotal = sum(item['total_price'] for item in cart.values())
discount_amount = 0
applied_discount = ""
# Apply discount if valid
if discount_code and discount_code in PROMOTIONS:
promo = PROMOTIONS[discount_code]
if subtotal >= promo["min_amount"]:
discount_amount = subtotal * (promo["discount_percent"] / 100)
applied_discount = f"{promo['name']} ({promo['discount_percent']}%)"
total_after_discount = subtotal - discount_amount
shipping_cost = 0 if total_after_discount >= 150 else 25
final_total = total_after_discount + shipping_cost
# Generate secure tracking number
tracking_number = f"ORG{datetime.now().strftime('%Y%m%d')}{secrets.randbelow(9999):04d}"
# Insert customer if doesn't exist
cursor.execute("""
INSERT OR IGNORE INTO customers (email, full_name, phone)
VALUES (?, ?, ?)
""", (customer_email, customer_name, customer_phone))
cursor.execute("SELECT id FROM customers WHERE email = ?", (customer_email,))
customer_id = cursor.fetchone()[0]
# Create order
cursor.execute("""
INSERT INTO orders (
tracking_number, user_id, customer_id, customer_name, customer_email,
customer_phone, shipping_address, notes, subtotal, discount_amount,
applied_discount, shipping_cost, total_amount, status, estimated_delivery
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
tracking_number, user_session['user_id'], customer_id, customer_name,
customer_email, customer_phone, encrypt_data(shipping_address), notes,
subtotal, discount_amount, applied_discount, shipping_cost, final_total,
"ืืขืืื", datetime.now() + timedelta(days=3)
))
order_id = cursor.lastrowid
# Insert order items
for product_id, item in cart.items():
cursor.execute("""
INSERT INTO order_items (
order_id, product_id, product_name, price, quantity, total_price
) VALUES (?, ?, ?, ?, ?, ?)
""", (order_id, int(product_id), item['name'], item['price'],
item['quantity'], item['total_price']))
conn.commit()
conn.close()
# Clear cart after successful order
shopping_carts[session_token] = {}
# Format order confirmation
result = f"๐ **ืืืื ื ืืืฆืขื ืืืฆืืื!**\n\n"
result += f"๐ **ืืกืคืจ ืืืื ื:** #{order_id}\n"
result += f"๐ฆ **ืืกืคืจ ืืขืงื ืืืืืื:** {tracking_number}\n"
result += f"๐ค **ืืงืื:** {customer_name}\n"
result += f"๐ง **ืืืืืื:** {customer_email}\n"
result += f"๐ฑ **ืืืคืื:** {customer_phone}\n"
result += f"๐ **ืืชืืืช ืืฉืืื:** {shipping_address}\n"
if notes:
result += f"๐ **ืืขืจืืช:** {notes}\n"
result += f"\n๐ฐ **ืคืืจืื ืขืืืืืช:**\n"
result += f"๐ต ืกื\"ื ืืืฆืจืื: โช{subtotal:.2f}\n"
if discount_amount > 0:
result += f"๐ซ ืื ืื ({applied_discount}): -โช{discount_amount:.2f}\n"
if shipping_cost > 0:
result += f"๐ ืืฉืืื: โช{shipping_cost:.2f}\n"
else:
result += f"๐ ืืฉืืื: ืืื ื! ๐\n"
result += f"๐ณ **ืกื\"ื ืืชืฉืืื: โช{final_total:.2f}**\n\n"
result += f"๐ฆ **ืคืจืื ืืฉืืื:**\n"
result += f"โฐ ืืื ืขืืืื: 1-2 ืืื ืขืกืงืื\n"
result += f"๐ ืืื ืืฉืืื: 2-5 ืืื ืขืกืงืื\n"
result += f"๐
ืืฉืืื ืืฉืืขืจ: {(datetime.now() + timedelta(days=3)).strftime('%d/%m/%Y')}\n\n"
result += f"๐ **ืืืืื:**\n"
result += f"โข ืืืืื ื ืฉืืืจื ืืืืื ืืืกื ื ืชืื ืื ืืืฆืคื\n"
result += f"โข ืคืจืื ืืืชืืืช ืืืฆืคื ืื\n"
result += f"โข ืืกืคืจ ืืขืงื ืืืืืื ืืืืืืื\n\n"
result += f"๐ **ืืฆืืจืช ืงืฉืจ:**\n"
result += f"โข ืืืคืื: 03-1234567\n"
result += f"โข ืืืืืื: support@organital.co.il\n"
result += f"โข ืืขืงื ืืืื ื: track_order ืขื ืืกืคืจ {tracking_number}"
return result
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืืฆืืข ืืืื ื: {str(e)}"
@mcp.tool()
def track_order(tracking_number: str, session_token: str = "") -> str:
"""
๐ฆ ืืขืงื ืืืจ ืืืื ื ืืืขืจืืช ืืืืืืืืช
Args:
tracking_number: ืืกืคืจ ืืขืงื
session_token: ืืืงื ืืชืืืจืืช (ืืืคืฆืืื ืื - ืืืืื ืืช ืฉื ืืืฉืชืืฉ)
Returns:
ืกืืืืก ืืืืื ื ืืคืจืื ืืฉืืื
"""
tracking_number = sanitize_input(tracking_number)
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Get order details
query = """
SELECT o.id, o.tracking_number, o.customer_name, o.total_amount,
o.status, o.order_date, o.estimated_delivery, o.user_id
FROM orders o
WHERE o.tracking_number = ?
"""
cursor.execute(query, (tracking_number,))
order = cursor.fetchone()
if not order:
conn.close()
return f"โ ืื ื ืืฆืื ืืืื ื ืขื ืืกืคืจ ืืขืงื: {tracking_number}"
order_id, track_num, customer_name, total, status, order_date, est_delivery, order_user_id = order
# Check if user has permission to view this order
user_session = get_user_from_session(session_token) if session_token else None
if user_session and user_session['user_id'] != order_user_id:
conn.close()
return "โ ืืื ืื ืืจืฉืื ืืฆืคืืช ืืืืื ื ืื"
# Get order items
cursor.execute("""
SELECT product_name, quantity, price, total_price
FROM order_items
WHERE order_id = ?
""", (order_id,))
items = cursor.fetchall()
conn.close()
# Parse dates
order_date_dt = datetime.fromisoformat(order_date.replace('Z', '+00:00')) if 'Z' in order_date else datetime.fromisoformat(order_date)
est_delivery_dt = datetime.fromisoformat(est_delivery.replace('Z', '+00:00')) if 'Z' in est_delivery else datetime.fromisoformat(est_delivery)
result = f"๐ฆ **ืืขืงื ืืืื ื ืืืืืื**\n\n"
result += f"๐ **ืืกืคืจ ืืขืงื:** {track_num}\n"
result += f"๐ **ืืกืคืจ ืืืื ื:** #{order_id}\n"
result += f"๐ค **ืืงืื:** {customer_name}\n"
result += f"๐
**ืชืืจืื ืืืื ื:** {order_date_dt.strftime('%d/%m/%Y %H:%M')}\n"
# Status with emoji
status_emoji = {
"ืืขืืื": "โณ",
"ื ืฉืื": "๐",
"ืืืจื": "๐ฆ",
"ื ืืกืจ": "โ
",
"ืืืืื": "โ"
}
result += f"๐ **ืกืืืืก:** {status_emoji.get(status, '๐')} {status}\n"
result += f"๐ **ืืฉืืื ืืฉืืขืจ:** {est_delivery_dt.strftime('%d/%m/%Y')}\n\n"
result += "๐๏ธ **ืคืจืืื ืืืืื ื:**\n"
result += "=" * 40 + "\n"
for item in items:
product_name, quantity, price, total_price = item
result += f"โข **{product_name}**\n"
result += f" ๐ฆ ืืืืช: {quantity} | ๐ฐ โช{price:.2f} | ืกื\"ื: โช{total_price:.2f}\n\n"
result += f"๐ณ **ืกื\"ื ืืืื ื: โช{total:.2f}**\n\n"
# Progress tracking
if status == "ืืขืืื":
result += "โณ **ืฉืื ื ืืืื:** ืืืืื ื ืืขืืืื ืืืืกื\n"
result += "๐ **ืืฉืื ืืื:** ืืจืืื ืืืกืืจื ืืืืจืช ืืฉืืืืืืืช\n"
elif status == "ื ืฉืื":
result += "๐ **ืฉืื ื ืืืื:** ืืืืื ื ื ืืกืจื ืืืืจืช ืืฉืืืืืืืช\n"
result += "๐ **ืืฉืื ืืื:** ืืืจื ืืืื\n"
elif status == "ืืืจื":
result += "๐ฆ **ืฉืื ื ืืืื:** ืืืืื ื ืืืจื ืืืื\n"
result += "๐ **ืืฉืื ืืื:** ืืกืืจื ืืืขื\n"
elif status == "ื ืืกืจ":
result += "โ
**ืืืืื ื ื ืืกืจื ืืืฆืืื!**\n"
result += "๐ ืชืืื ืฉืงื ืืช ืืืืจืื ืืื!\n"
result += f"\n๐ **ืฉืืจืืช ืืงืืืืช:** 03-1234567\n"
result += "๐ ืืขืจืืช ืืขืงื ืืืืืืืช ืืืืฆืคื ืช"
return result
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืขืงื ืืืื ื: {str(e)}"
@mcp.tool()
def search_faq(query: str = "", category: str = "") -> str:
"""
๐ ืืืคืืฉ ืืืจืื ืืฉืืืืช ืื ืคืืฆืืช
Args:
query: ืืืืช ืืืคืืฉ ืื ืฉืืื
category: ืกืื ืื ืืคื ืงืืืืจืื (ืืืคืฆืืื ืื)
Returns:
ืชืืฆืืืช ืืืคืืฉ ืขื ืชืฉืืืืช ืจืืืื ืืืืช
"""
query = sanitize_input(query)
category = sanitize_input(category)
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
if not query and not category:
# Show all categories
cursor.execute("""
SELECT name, description
FROM faq_categories
ORDER BY display_order
""")
categories = cursor.fetchall()
result = "โ **ืืจืื ืืฉืืืืช ืื ืคืืฆืืช - ืืืจืื ืืื**\n\n"
result += "๐ **ืงืืืืจืืืช ืขืืจื:**\n\n"
for cat_name, cat_desc in categories:
result += f"๐ธ **{cat_name}**\n"
result += f" {cat_desc}\n\n"
result += "๐ **ืืื ืืืฉืชืืฉ:**\n"
result += "โข ืืคืฉ ืฉืืื: `search_faq \"ืืื ืืฉืชืืฉืื ืืกืืืืื\"`\n"
result += "โข ืขืืื ืืงืืืืจืื: `search_faq category=\"ืขื ืกืืืืื\"`\n"
result += "โข ืงืื ืขืืจื: `create_support_ticket`"
conn.close()
return result
# Search with query
if query:
if category:
cursor.execute("""
SELECT f.question, f.answer, c.name, f.id
FROM faq_items f
JOIN faq_categories c ON f.category_id = c.id
WHERE (f.question LIKE ? OR f.answer LIKE ? OR f.keywords LIKE ?)
AND c.name = ?
ORDER BY f.views DESC, f.helpful_votes DESC
""", (f"%{query}%", f"%{query}%", f"%{query}%", category))
else:
cursor.execute("""
SELECT f.question, f.answer, c.name, f.id
FROM faq_items f
JOIN faq_categories c ON f.category_id = c.id
WHERE f.question LIKE ? OR f.answer LIKE ? OR f.keywords LIKE ?
ORDER BY f.views DESC, f.helpful_votes DESC
""", (f"%{query}%", f"%{query}%", f"%{query}%"))
else:
# Show category content
cursor.execute("""
SELECT f.question, f.answer, c.name, f.id
FROM faq_items f
JOIN faq_categories c ON f.category_id = c.id
WHERE c.name = ?
ORDER BY f.views DESC
""", (category,))
items = cursor.fetchall()
if not items:
conn.close()
return f"๐ ืื ื ืืฆืื ืชืืฆืืืช ืขืืืจ '{query}' {f'ืืงืืืืจืื {category}' if category else ''}\n\n๐ก ื ืกื ืืืืื ืืืจืืช ืื ืฆืืจ ืงืฉืจ ืขืื ื: `create_support_ticket`"
# Update view counts
for item in items:
cursor.execute("UPDATE faq_items SET views = views + 1 WHERE id = ?", (item[3],))
conn.commit()
conn.close()
result = f"โ **ืชืืฆืืืช ืืืคืืฉ ืืืจืื ืืขืืจื**\n\n"
if query:
result += f"๐ **ืืืคืืฉ ืขืืืจ:** {query}\n"
if category:
result += f"๐ **ืงืืืืจืื:** {category}\n"
result += f"๐ **ื ืืฆืื {len(items)} ืชืืฆืืืช**\n\n"
for i, (question, answer, cat_name, item_id) in enumerate(items, 1):
result += f"**โ ืฉืืื #{i}: {question}**\n"
result += f"๐ ืงืืืืจืื: {cat_name}\n\n"
result += f"โ
**ืชืฉืืื:**\n{answer}\n\n"
result += f"๐ *ืืฉืื ืื? ืืฉืชืืฉ ื-`rate_faq_helpful {item_id}`*\n"
result += "โ" * 50 + "\n\n"
result += "๐ **ืื ืืฆืืช ืื ืฉืืืคืฉืช?**\n"
result += "โข ืคืชื ืืจืืืก ืชืืืื: `create_support_ticket`\n"
result += "โข ืืคืฉ ืฉืื: `search_faq \"ืืืื ืืืจืช\"`\n"
result += "โข ืฆืืจ ืงืฉืจ: 03-1234567"
return result
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืืคืืฉ: {str(e)}"
@mcp.tool()
def create_support_ticket(subject: str, message: str, customer_name: str,
customer_email: str, category: str = "ืืืื",
session_token: str = "") -> str:
"""
๐ซ ืืฆืืจืช ืืจืืืก ืชืืืื ืืืฉ
Args:
subject: ื ืืฉื ืืคื ืืื
message: ืชืืืืจ ืืืขืื ืื ืืฉืืื
customer_name: ืฉื ืืื
customer_email: ืืชืืืช ืืืืืื
category: ืงืืืืจืืืช ืืคื ืืื (ืืืื/ืืื ื/ืืืื ื/ืชืฉืืื)
session_token: ืืืงื ืืชืืืจืืช (ืืืคืฆืืื ืื)
Returns:
ืืกืคืจ ืืจืืืก ืืชืืืื ืืคืจืื ืืืขืงื
"""
# Sanitize inputs
subject = sanitize_input(subject)
message = sanitize_input(message)
customer_name = sanitize_input(customer_name)
customer_email = sanitize_input(customer_email).lower()
category = sanitize_input(category)
# Validate inputs
if not subject or len(subject) < 3:
return "โ ื ืืฉื ืืคื ืืื ืืืื ืืืืื ืืคืืืช 3 ืชืืืื"
if not message or len(message) < 10:
return "โ ืชืืืืจ ืืืขืื ืืืื ืืืืื ืืคืืืช 10 ืชืืืื"
if not customer_name or len(customer_name) < 2:
return "โ ืฉื ืืืื ืืืืื ืืคืืืช 2 ืชืืืื"
if not validate_email(customer_email):
return "โ ืืชืืืช ืืืืืื ืื ืชืงืื ื"
# Check if user is logged in
user_session = get_user_from_session(session_token) if session_token else None
user_id = user_session['user_id'] if user_session else None
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Generate ticket number
ticket_number = f"TK{datetime.now().strftime('%Y%m%d')}{secrets.randbelow(9999):04d}"
# Determine priority based on keywords
priority = "medium"
urgent_keywords = ["ืืืืฃ", "ืืขืื", "ืื ืขืืื", "ืชืงืืข", "ืฉืืืื"]
high_keywords = ["ืชืฉืืื", "ืืจืืืก ืืฉืจืื", "ืืืื ื", "ืืืืจ"]
if any(keyword in message.lower() or keyword in subject.lower() for keyword in urgent_keywords):
priority = "high"
elif any(keyword in message.lower() or keyword in subject.lower() for keyword in high_keywords):
priority = "medium"
else:
priority = "low"
# Insert support ticket
cursor.execute("""
INSERT INTO support_tickets (
ticket_number, user_id, customer_email, customer_name,
subject, message, category, priority, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (ticket_number, user_id, customer_email, customer_name,
subject, message, category, priority, "open"))
ticket_id = cursor.lastrowid
# Auto-response based on category and keywords
auto_response = ""
if "ืืฉืืื" in message.lower() or "ืืขืงื" in message.lower():
auto_response = "ืชืืื ืืขืงืื ืืืจ ืืืืื ื ืฉืื ืืืืฆืขืืช ืืกืคืจ ืืืขืงื ืฉืงืืืืช ืืืืื. ืื ืืื ืื ืืกืคืจ ืืขืงื, ื ืขืืืจ ืื ืืืฆืื ืืืชื."
elif "ืืืืจ" in message.lower() or "ืืืืจื" in message.lower():
auto_response = "ืืืืจืืช ืืชืืคืฉืจืืช ืชืื 14 ืืืื ืืงืืืช ืืืืฆืจ. ืืืืฆืจ ืฆืจืื ืืืืืช ืืืจืืื ืืงืืจืืช ืืืืฆื ืฉืื."
elif "ืกืืืืื" in message.lower() and ("ืืื" in message.lower() or "ืืฉืชืืฉ" in message.lower()):
auto_response = "ืืฉืืืืฉ ืืกืืืืื: ืืชืื ืขื ืืืืช ืงืื ื ืืืืกืฃ ืืคื ืืืขื. ืืืงื - ืืคืืช ืงืื ื ืืืงืื ืืฃ ืกืืืจ, ืืืืืืช - ืืืช ืืืฉืงื."
else:
auto_response = "ืชืืื ืขื ืคื ืืืชื! ื ืืืืจ ืืืื ืชืื 24 ืฉืขืืช. ืืื ืชืืื, ืืืืง ืืช ืืจืื ืืฉืืืืช ืื ืคืืฆืืช ืฉืื ื."
if auto_response:
cursor.execute("""
INSERT INTO support_responses (ticket_id, responder_name, message)
VALUES (?, ?, ?)
""", (ticket_id, "ืืขืจืืช ืืืืืืืืช", auto_response))
conn.commit()
conn.close()
# Priority emoji
priority_emoji = {"high": "๐ด", "medium": "๐ก", "low": "๐ข"}
result = f"๐ซ **ืืจืืืก ืชืืืื ื ืืฆืจ ืืืฆืืื!**\n\n"
result += f"๐ **ืืกืคืจ ืืจืืืก:** {ticket_number}\n"
result += f"๐ง **ืืืืืื:** {customer_email}\n"
result += f"๐ค **ืฉื:** {customer_name}\n"
result += f"๐ **ืงืืืืจืื:** {category}\n"
result += f"{priority_emoji.get(priority, '๐ก')} **ืขืืืคืืช:** {priority}\n"
result += f"๐ **ื ืืฉื:** {subject}\n\n"
result += f"๐ฌ **ืืืืขืชื:**\n{message}\n\n"
if auto_response:
result += f"๐ค **ืชืืืื ืืืืืืืืช:**\n{auto_response}\n\n"
result += f"โฐ **ืืื ืชืืืื ืืฉืืขืจ:**\n"
if priority == "high":
result += "โข ืขืืืคืืช ืืืืื: ืชืื 4 ืฉืขืืช\n"
elif priority == "medium":
result += "โข ืขืืืคืืช ืืื ืื ืืช: ืชืื 12 ืฉืขืืช\n"
else:
result += "โข ืขืืืคืืช ื ืืืื: ืชืื 24 ืฉืขืืช\n"
result += f"\n๐ **ืืฆืืจืช ืงืฉืจ ื ืืกืคืช:**\n"
result += f"โข ืืืคืื: 03-1234567 (ื'-ื' 9:00-17:00)\n"
result += f"โข ืืขืงื ืืจืืืก: `track_support_ticket {ticket_number}`\n"
result += f"โข ืขืืืื ืคืจืืื: `update_support_ticket {ticket_number}`"
return result
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืฆืืจืช ืืจืืืก ืชืืืื: {str(e)}"
@mcp.tool()
def track_support_ticket(ticket_number: str, session_token: str = "") -> str:
"""
๐ ืืขืงื ืืืจ ืืจืืืก ืชืืืื
Args:
ticket_number: ืืกืคืจ ืืจืืืก ืืชืืืื
session_token: ืืืงื ืืชืืืจืืช (ืืืคืฆืืื ืื)
Returns:
ืกืืืืก ืืืจืืืก ืืื ืืชืืืืืช
"""
ticket_number = sanitize_input(ticket_number)
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Get ticket details
cursor.execute("""
SELECT id, customer_name, customer_email, subject, message, category,
priority, status, created_at, updated_at, user_id
FROM support_tickets
WHERE ticket_number = ?
""", (ticket_number,))
ticket = cursor.fetchone()
if not ticket:
conn.close()
return f"โ ืื ื ืืฆื ืืจืืืก ืชืืืื ืขื ืืกืคืจ: {ticket_number}"
(ticket_id, customer_name, customer_email, subject, message, category,
priority, status, created_at, updated_at, ticket_user_id) = ticket
# Check permissions
user_session = get_user_from_session(session_token) if session_token else None
if user_session and user_session['user_id'] != ticket_user_id:
conn.close()
return "โ ืืื ืื ืืจืฉืื ืืฆืคืืช ืืืจืืืก ืชืืืื ืื"
# Get responses
cursor.execute("""
SELECT responder_name, message, created_at, is_internal
FROM support_responses
WHERE ticket_id = ?
ORDER BY created_at ASC
""", (ticket_id,))
responses = cursor.fetchall()
conn.close()
# Status and priority emojis
status_emoji = {
"open": "๐ข ืคืชืื",
"in_progress": "๐ก ืืืืคืื",
"waiting_customer": "โณ ืืืชืื ืืชืืืื",
"resolved": "โ
ื ืคืชืจ",
"closed": "๐ ืกืืืจ"
}
priority_emoji = {"high": "๐ด ืืืืื", "medium": "๐ก ืืื ืื ืืช", "low": "๐ข ื ืืืื"}
result = f"๐ **ืืขืงื ืืจืืืก ืชืืืื**\n\n"
result += f"๐ซ **ืืกืคืจ ืืจืืืก:** {ticket_number}\n"
result += f"๐ค **ืฉื:** {customer_name}\n"
result += f"๐ง **ืืืืืื:** {customer_email}\n"
result += f"๐ **ืงืืืืจืื:** {category}\n"
result += f"๐ **ืกืืืืก:** {status_emoji.get(status, status)}\n"
result += f"โก **ืขืืืคืืช:** {priority_emoji.get(priority, priority)}\n\n"
result += f"๐ **ื ืืฉื:** {subject}\n"
result += f"๐ฌ **ืืืืขื ืืงืืจืืช:**\n{message}\n\n"
created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) if 'Z' in created_at else datetime.fromisoformat(created_at)
result += f"๐
**ื ืืฆืจ:** {created_dt.strftime('%d/%m/%Y %H:%M')}\n"
if updated_at != created_at:
updated_dt = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) if 'Z' in updated_at else datetime.fromisoformat(updated_at)
result += f"๐ **ืขืืืื:** {updated_dt.strftime('%d/%m/%Y %H:%M')}\n"
result += "\n" + "="*50 + "\n"
result += "๐ฌ **ืืืกืืืจืืืช ืชืืืืืช:**\n\n"
if responses:
for responder, resp_message, resp_time, is_internal in responses:
if is_internal:
continue # Skip internal messages for customers
resp_dt = datetime.fromisoformat(resp_time.replace('Z', '+00:00')) if 'Z' in resp_time else datetime.fromisoformat(resp_time)
if responder == "ืืขืจืืช ืืืืืืืืช":
result += f"๐ค **{responder}** - {resp_dt.strftime('%d/%m/%Y %H:%M')}\n"
else:
result += f"๐จโ๐ผ **{responder}** - {resp_dt.strftime('%d/%m/%Y %H:%M')}\n"
result += f"{resp_message}\n\n"
else:
result += "๐ญ ืืื ืชืืืืืช ืขืืืื\n\n"
# Next steps based on status
if status == "open":
result += "โณ **ืืืจืืืก ืฉืื ืืืชืื ืืืืคืื ืฆืืืช ืืชืืืื**\n"
elif status == "waiting_customer":
result += "โ ๏ธ **ืื ืื ื ืืืชืื ืื ืืชืืืืชื!** ืืฉื ืขื ืืฉืืืืช ืฉื ืฉืืื\n"
elif status == "resolved":
result += "โ
**ืืืจืืืก ื ืคืชืจ!** ืื ืืฉ ืื ืฉืืืืช ื ืืกืคืืช, ืคืชื ืืจืืืก ืืืฉ\n"
result += f"\n๐ **ืืฆืืจืช ืงืฉืจ ื ืืกืคืช:** 03-1234567"
return result
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืขืงื ืืจืืืก ืชืืืื: {str(e)}"
@mcp.tool()
def rate_faq_helpful(faq_id: int) -> str:
"""
๐ ืืืจืื ืฉืืื ืืืืขืืื ืืืจืื ืืขืืจื
Args:
faq_id: ืืืื ืืฉืืื ืืืจืื ืืขืืจื
Returns:
ืืืฉืืจ ืืืืจืื
"""
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# Check if FAQ exists
cursor.execute("SELECT question FROM faq_items WHERE id = ?", (faq_id,))
faq = cursor.fetchone()
if not faq:
conn.close()
return f"โ ืื ื ืืฆืื ืฉืืื ืขื ืืืื #{faq_id}"
# Update helpful votes
cursor.execute("""
UPDATE faq_items
SET helpful_votes = helpful_votes + 1
WHERE id = ?
""", (faq_id,))
conn.commit()
conn.close()
return f"๐ **ืชืืื ืขื ืืืฉืื!**\n\nืืืจืืช ืืช ืืฉืืื '{faq[0]}' ืืืืขืืื.\n\n๐ก ืืืฉืื ืฉืื ืขืืืจ ืื ื ืืฉืคืจ ืืช ืืจืื ืืขืืจื!"
except sqlite3.Error as e:
return f"โ ืฉืืืื ืืืกื ืื ืชืื ืื: {str(e)}"
except Exception as e:
return f"โ ืฉืืืื ืืืืจืื: {str(e)}"
@mcp.tool()
def get_support_categories() -> str:
"""
๐ ืืฆืืช ืงืืืืจืืืช ืืชืืืื ืืืืื ืืช
Returns:
ืจืฉืืืช ืงืืืืจืืืช ืืชืืืื ืืืืืืข ืขืืืื
"""
categories_info = {
"ืืืื": "ืฉืืืืช ืืืืืืช ืขื ืืื ืืช ืืืฉืืจืืชืื",
"ืืื ื": "ืืขืืืช ืืื ืืืช ืืืชืจ ืืืืขืจืืช ืืืืื ืืช",
"ืืืื ื": "ืฉืืืืช ืขื ืืืื ืืช, ืืฉืืืืื ืืืขืงื",
"ืชืฉืืื": "ืืขืืืช ืืชืฉืืื, ืืืืจืืช ืืืืืืื",
"ืืืฆืจืื": "ืฉืืืืช ืขื ืืืืฆืจืื, ืืืืื ืืืืคืจืืื",
"ืืฉืืื": "ืืขืืืช ืืืชืืืจืืช, ืจืืฉืื ืืคืจืืคืื ืืฉืชืืฉ"
}
result = "๐ **ืงืืืืจืืืช ืชืืืื - ืืืจืื ืืื**\n\n"
result += "ืืืจ ืืช ืืงืืืืจืื ืืืชืืืื ืืืืชืจ ืืคื ืืืชื:\n\n"
for category, description in categories_info.items():
result += f"๐ธ **{category}**\n"
result += f" {description}\n\n"
result += "๐ซ **ืืืฆืืจืช ืืจืืืก ืชืืืื:**\n"
result += "```\ncreate_support_ticket(\n"
result += " subject=\"ื ืืฉื ืืคื ืืื\",\n"
result += " message=\"ืชืืืืจ ืืืขืื ืื ืืฉืืื\",\n"
result += " customer_name=\"ืืฉื ืฉืื\",\n"
result += " customer_email=\"ืืืืืืื ืฉืื\",\n"
result += " category=\"ืืืช ืืืงืืืืจืืืช ืืืขืื\"\n"
result += ")\n```\n\n"
result += "๐ **ืืืืคืืฉ ืืืืจ:** `search_faq \"ืืฉืืื ืฉืื\"`"
return result
# Initialize database on import
init_database()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Secure Stevia Store MCP Server")
args = parser.parse_args()
print("๐ Starting Secure Stevia Store MCP Server...", flush=True)
mcp.run()