Skip to main content
Glama

MCP Project Orchestrator

delivery.py22.6 kB
""" Delivery service integration for PrintCast Agent. Handles shipping and delivery through various carriers: - Czech Post (Česká pošta) - Zásilkovna - DPD - PPL - Generic courier services """ import asyncio import hashlib import json from typing import Any, Dict, List, Optional from datetime import datetime, timedelta from enum import Enum import httpx import structlog from pydantic import BaseModel, Field logger = structlog.get_logger(__name__) class DeliveryMethod(Enum): """Supported delivery methods.""" POST = "post" # Czech Post ZASILKOVNA = "zasilkovna" # Zásilkovna DPD = "dpd" PPL = "ppl" COURIER = "courier" # Generic courier class Address(BaseModel): """Delivery address.""" name: str street: str city: str postal_code: str country: str = "CZ" phone: Optional[str] = None email: Optional[str] = None company: Optional[str] = None class Shipment(BaseModel): """Shipment information.""" shipment_id: str tracking_number: Optional[str] = None carrier: str status: str = "pending" recipient: Address sender: Optional[Address] = None weight_grams: int = 100 dimensions_cm: Optional[Dict[str, float]] = None created_at: datetime = Field(default_factory=datetime.now) shipped_at: Optional[datetime] = None delivered_at: Optional[datetime] = None price: Optional[float] = None currency: str = "CZK" metadata: Dict[str, Any] = Field(default_factory=dict) class DeliveryService: """ Manages delivery and shipping operations. Integrates with multiple Czech and international carriers. """ def __init__(self, config: Dict[str, Any]): """ Initialize delivery service. Args: config: Configuration including: - carriers: Carrier-specific configurations - default_carrier: Default carrier to use - sender_address: Default sender address - api_keys: API keys for each carrier """ self.config = config self.carriers = config.get("carriers", {}) self.default_carrier = config.get("default_carrier", "post") self.sender_address = config.get("sender_address", {}) self.api_keys = config.get("api_keys", {}) self.client: Optional[httpx.AsyncClient] = None self.shipments: Dict[str, Shipment] = {} logger.info( "Delivery service initialized", carriers=list(self.carriers.keys()), default_carrier=self.default_carrier ) async def initialize(self): """Initialize HTTP client and verify carrier APIs.""" try: self.client = httpx.AsyncClient( timeout=30.0, follow_redirects=True ) # Verify carrier APIs for carrier in self.carriers: if carrier in self.api_keys: await self._verify_carrier_api(carrier) except Exception as e: logger.error("Failed to initialize delivery service", error=str(e)) async def shutdown(self): """Cleanup resources.""" if self.client: await self.client.aclose() logger.info("Delivery service shutdown") def is_configured(self) -> bool: """Check if delivery service is configured.""" return bool(self.carriers and self.sender_address) async def _verify_carrier_api(self, carrier: str) -> bool: """ Verify carrier API is accessible. Args: carrier: Carrier name Returns: True if API is accessible """ try: if carrier == "zasilkovna": # Verify Zásilkovna API api_key = self.api_keys.get("zasilkovna") if api_key: response = await self.client.get( "https://api.packeta.com/v1/branches", params={"apiPassword": api_key, "limit": 1} ) if response.status_code == 200: logger.info("Zásilkovna API verified") return True elif carrier == "post": # Czech Post doesn't require API verification for basic services logger.info("Czech Post service available") return True # Add other carrier verifications as needed except Exception as e: logger.warning( "Failed to verify carrier API", carrier=carrier, error=str(e) ) return False async def get_quote( self, address: str, method: str = "post", weight: int = 100, dimensions: Optional[Dict[str, float]] = None ) -> Dict[str, Any]: """ Get delivery quote. Args: address: Delivery address method: Delivery method weight: Package weight in grams dimensions: Package dimensions in cm Returns: Delivery quote with pricing and timing """ try: # Parse address if string if isinstance(address, str): # Simple parsing - in production would use proper address parser parts = address.split(",") parsed_address = Address( name="Recipient", street=parts[0].strip() if len(parts) > 0 else "", city=parts[1].strip() if len(parts) > 1 else "Praha", postal_code=parts[2].strip() if len(parts) > 2 else "10000", country="CZ" ) else: parsed_address = address # Calculate price based on carrier and weight price = await self._calculate_price(method, weight, dimensions) # Estimate delivery time delivery_days = self._estimate_delivery_days(method) estimated_delivery = datetime.now() + timedelta(days=delivery_days) return { "price": price, "currency": "CZK", "estimated_delivery": estimated_delivery.isoformat(), "delivery_days": delivery_days, "carrier": method, "service_type": self._get_service_type(method, weight) } except Exception as e: logger.error("Failed to get delivery quote", error=str(e)) raise async def _calculate_price( self, method: str, weight: int, dimensions: Optional[Dict[str, float]] = None ) -> float: """Calculate delivery price.""" # Simplified pricing - in production would use carrier APIs base_prices = { "post": 89.0, # Czech Post standard letter "zasilkovna": 65.0, # Zásilkovna to pickup point "dpd": 120.0, # DPD standard "ppl": 115.0, # PPL standard "courier": 150.0 # Generic courier } price = base_prices.get(method, 100.0) # Add weight surcharge if weight > 1000: # Over 1kg price += (weight // 1000) * 20 elif weight > 500: # Over 500g price += 15 # Add dimension surcharge for large packages if dimensions: volume = dimensions.get("length", 0) * dimensions.get("width", 0) * dimensions.get("height", 0) if volume > 50000: # Over 50x50x20 cm price += 30 return price def _estimate_delivery_days(self, method: str) -> int: """Estimate delivery time in days.""" estimates = { "post": 2, # Czech Post D+2 "zasilkovna": 1, # Next day to pickup point "dpd": 1, # Next day delivery "ppl": 1, # Next day delivery "courier": 0 # Same day possible } return estimates.get(method, 3) def _get_service_type(self, method: str, weight: int) -> str: """Determine service type based on method and weight.""" if method == "post": if weight <= 50: return "Obyčejné psaní" elif weight <= 500: return "Doporučené psaní" else: return "Balík Do ruky" elif method == "zasilkovna": return "Na výdejní místo" else: return "Standard delivery" async def create_shipment( self, recipient: Address, method: str = "post", weight: int = 100, dimensions: Optional[Dict[str, float]] = None, metadata: Optional[Dict[str, Any]] = None ) -> str: """ Create a new shipment. Args: recipient: Recipient address method: Delivery method weight: Package weight in grams dimensions: Package dimensions metadata: Additional metadata Returns: Shipment ID """ try: # Generate shipment ID shipment_id = f"ship_{datetime.now().strftime('%Y%m%d%H%M%S')}_{hashlib.md5(recipient.name.encode()).hexdigest()[:8]}" # Create shipment record shipment = Shipment( shipment_id=shipment_id, carrier=method, recipient=recipient, sender=Address(**self.sender_address) if self.sender_address else None, weight_grams=weight, dimensions_cm=dimensions, metadata=metadata or {} ) # Calculate price shipment.price = await self._calculate_price(method, weight, dimensions) # Create shipment with carrier tracking_number = await self._create_carrier_shipment(shipment) if tracking_number: shipment.tracking_number = tracking_number shipment.status = "created" self.shipments[shipment_id] = shipment logger.info( "Shipment created", shipment_id=shipment_id, carrier=method, tracking=tracking_number ) return shipment_id except Exception as e: logger.error("Failed to create shipment", error=str(e)) raise async def _create_carrier_shipment(self, shipment: Shipment) -> Optional[str]: """ Create shipment with specific carrier. Args: shipment: Shipment details Returns: Tracking number if available """ try: if shipment.carrier == "zasilkovna": return await self._create_zasilkovna_shipment(shipment) elif shipment.carrier == "post": return await self._create_post_shipment(shipment) else: # Simulated tracking number for other carriers return f"{shipment.carrier.upper()}{datetime.now().strftime('%Y%m%d%H%M%S')}" except Exception as e: logger.error( "Failed to create carrier shipment", carrier=shipment.carrier, error=str(e) ) return None async def _create_zasilkovna_shipment(self, shipment: Shipment) -> Optional[str]: """Create shipment with Zásilkovna.""" api_key = self.api_keys.get("zasilkovna") if not api_key: logger.warning("Zásilkovna API key not configured") return None try: # Prepare packet data packet_data = { "apiPassword": api_key, "packet": { "number": shipment.shipment_id, "name": shipment.recipient.name, "surname": "", # Would need to parse from name "email": shipment.recipient.email or "", "phone": shipment.recipient.phone or "", "addressId": 1, # Would need to select pickup point "currency": shipment.currency, "value": shipment.price, "weight": shipment.weight_grams / 1000, # Convert to kg "eshop": "PrintCast" } } response = await self.client.post( "https://api.packeta.com/v1/packets", json=packet_data ) if response.status_code == 200: result = response.json() return result.get("barcode") else: logger.error( "Zásilkovna API error", status=response.status_code, response=response.text ) return None except Exception as e: logger.error("Failed to create Zásilkovna shipment", error=str(e)) return None async def _create_post_shipment(self, shipment: Shipment) -> Optional[str]: """Create shipment with Czech Post.""" # Czech Post integration would require their B2B API # For now, return simulated tracking number tracking = f"RR{datetime.now().strftime('%Y%m%d%H%M')}CZ" logger.info( "Czech Post shipment simulated", tracking=tracking, recipient=shipment.recipient.city ) return tracking async def ship_package( self, shipment_id: str, pickup_time: Optional[datetime] = None ) -> Dict[str, Any]: """ Mark package as shipped and arrange pickup. Args: shipment_id: Shipment ID pickup_time: Optional pickup time Returns: Shipping confirmation """ shipment = self.shipments.get(shipment_id) if not shipment: raise ValueError(f"Shipment {shipment_id} not found") try: # Update shipment status shipment.status = "shipped" shipment.shipped_at = datetime.now() # Arrange pickup if needed pickup_confirmation = None if pickup_time: pickup_confirmation = await self._arrange_pickup( shipment, pickup_time ) logger.info( "Package shipped", shipment_id=shipment_id, tracking=shipment.tracking_number ) return { "shipment_id": shipment_id, "tracking_number": shipment.tracking_number, "carrier": shipment.carrier, "status": "shipped", "pickup_confirmation": pickup_confirmation, "estimated_delivery": ( shipment.shipped_at + timedelta(days=self._estimate_delivery_days(shipment.carrier)) ).isoformat() } except Exception as e: logger.error("Failed to ship package", error=str(e)) raise async def _arrange_pickup( self, shipment: Shipment, pickup_time: datetime ) -> Optional[str]: """Arrange carrier pickup.""" # In production, would call carrier pickup APIs confirmation = f"PICKUP-{shipment.carrier.upper()}-{pickup_time.strftime('%Y%m%d')}" logger.info( "Pickup arranged", carrier=shipment.carrier, time=pickup_time.isoformat(), confirmation=confirmation ) return confirmation async def track_shipment(self, shipment_id: str) -> Dict[str, Any]: """ Track shipment status. Args: shipment_id: Shipment ID Returns: Tracking information """ shipment = self.shipments.get(shipment_id) if not shipment: raise ValueError(f"Shipment {shipment_id} not found") try: # Get tracking from carrier tracking_info = await self._get_carrier_tracking(shipment) # Update local status if changed if tracking_info.get("delivered"): shipment.status = "delivered" shipment.delivered_at = datetime.now() elif tracking_info.get("in_transit"): shipment.status = "in_transit" return { "shipment_id": shipment_id, "tracking_number": shipment.tracking_number, "status": shipment.status, "carrier": shipment.carrier, "shipped_at": shipment.shipped_at.isoformat() if shipment.shipped_at else None, "delivered_at": shipment.delivered_at.isoformat() if shipment.delivered_at else None, "tracking_events": tracking_info.get("events", []) } except Exception as e: logger.error("Failed to track shipment", error=str(e)) raise async def _get_carrier_tracking(self, shipment: Shipment) -> Dict[str, Any]: """Get tracking info from carrier.""" # In production, would call carrier tracking APIs # For now, return simulated tracking events = [] if shipment.shipped_at: events.append({ "timestamp": shipment.shipped_at.isoformat(), "status": "Picked up", "location": "Praha" }) if shipment.status == "in_transit": events.append({ "timestamp": (shipment.shipped_at + timedelta(hours=4)).isoformat(), "status": "In transit", "location": "Distribution center" }) if shipment.status == "delivered": events.append({ "timestamp": shipment.delivered_at.isoformat(), "status": "Delivered", "location": shipment.recipient.city }) return { "events": events, "in_transit": shipment.status == "in_transit", "delivered": shipment.status == "delivered" } async def cancel_shipment(self, shipment_id: str) -> bool: """ Cancel a shipment. Args: shipment_id: Shipment ID Returns: True if cancelled successfully """ shipment = self.shipments.get(shipment_id) if not shipment: return False if shipment.status in ["delivered", "cancelled"]: return False try: # Cancel with carrier if shipment.tracking_number: await self._cancel_carrier_shipment(shipment) shipment.status = "cancelled" logger.info( "Shipment cancelled", shipment_id=shipment_id ) return True except Exception as e: logger.error( "Failed to cancel shipment", shipment_id=shipment_id, error=str(e) ) return False async def _cancel_carrier_shipment(self, shipment: Shipment): """Cancel shipment with carrier.""" # In production, would call carrier cancellation APIs logger.info( "Carrier shipment cancellation simulated", carrier=shipment.carrier, tracking=shipment.tracking_number ) def get_shipment_status(self, shipment_id: str) -> Optional[Dict[str, Any]]: """Get shipment status.""" shipment = self.shipments.get(shipment_id) if not shipment: return None return { "shipment_id": shipment.shipment_id, "status": shipment.status, "tracking_number": shipment.tracking_number, "carrier": shipment.carrier, "recipient": { "name": shipment.recipient.name, "city": shipment.recipient.city, "country": shipment.recipient.country }, "price": shipment.price, "currency": shipment.currency, "created": shipment.created_at.isoformat(), "shipped": shipment.shipped_at.isoformat() if shipment.shipped_at else None, "delivered": shipment.delivered_at.isoformat() if shipment.delivered_at else None } async def generate_shipping_label( self, shipment_id: str, format: str = "pdf" ) -> bytes: """ Generate shipping label. Args: shipment_id: Shipment ID format: Label format (pdf, zpl, png) Returns: Label data """ shipment = self.shipments.get(shipment_id) if not shipment: raise ValueError(f"Shipment {shipment_id} not found") # In production, would generate actual label # For now, return placeholder label_content = f""" SHIPPING LABEL =============== From: {shipment.sender.name if shipment.sender else 'PrintCast'} To: {shipment.recipient.name} {shipment.recipient.street} {shipment.recipient.postal_code} {shipment.recipient.city} {shipment.recipient.country} Tracking: {shipment.tracking_number or 'N/A'} Carrier: {shipment.carrier.upper()} Weight: {shipment.weight_grams}g """ return label_content.encode("utf-8")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sparesparrow/mcp-project-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server