Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
services.py14.8 kB
""" Screening domain services. This module contains pure business logic services that operate on screening entities and value objects without any external dependencies. """ from datetime import datetime from decimal import Decimal from typing import Any, Protocol from .entities import ScreeningResult, ScreeningResultCollection from .value_objects import ( ScreeningCriteria, ScreeningLimits, ScreeningStrategy, SortingOptions, ) class IStockRepository(Protocol): """Protocol defining the interface for stock data access.""" def get_maverick_stocks( self, limit: int = 20, min_score: int | None = None ) -> list[dict[str, Any]]: """Get Maverick bullish stocks.""" ... def get_maverick_bear_stocks( self, limit: int = 20, min_score: int | None = None ) -> list[dict[str, Any]]: """Get Maverick bearish stocks.""" ... def get_trending_stocks( self, limit: int = 20, min_momentum_score: Decimal | None = None, filter_moving_averages: bool = False, ) -> list[dict[str, Any]]: """Get trending stocks.""" ... class ScreeningService: """ Pure domain service for stock screening business logic. This service contains no external dependencies and focuses solely on the business rules and logic for screening operations. """ def __init__(self): """Initialize the screening service.""" self._default_limits = ScreeningLimits() def create_screening_result_from_raw_data( self, raw_data: dict[str, Any], screening_date: datetime | None = None ) -> ScreeningResult: """ Create a ScreeningResult entity from raw database data. This method handles the transformation of raw data into a properly validated domain entity. """ if screening_date is None: screening_date = datetime.utcnow() return ScreeningResult( stock_symbol=raw_data.get("stock", ""), screening_date=screening_date, open_price=Decimal(str(raw_data.get("open", 0))), high_price=Decimal(str(raw_data.get("high", 0))), low_price=Decimal(str(raw_data.get("low", 0))), close_price=Decimal(str(raw_data.get("close", 0))), volume=int(raw_data.get("volume", 0)), ema_21=Decimal(str(raw_data.get("ema_21", 0))), sma_50=Decimal(str(raw_data.get("sma_50", 0))), sma_150=Decimal(str(raw_data.get("sma_150", 0))), sma_200=Decimal(str(raw_data.get("sma_200", 0))), momentum_score=Decimal(str(raw_data.get("momentum_score", 0))), avg_volume_30d=Decimal( str(raw_data.get("avg_vol_30d", raw_data.get("avg_volume_30d", 0))) ), adr_percentage=Decimal(str(raw_data.get("adr_pct", 0))), atr=Decimal(str(raw_data.get("atr", 0))), pattern=raw_data.get("pat"), squeeze=raw_data.get("sqz"), vcp=raw_data.get("vcp"), entry_signal=raw_data.get("entry"), combined_score=int(raw_data.get("combined_score", 0)), bear_score=int(raw_data.get("score", 0)), # Bear score uses 'score' field compression_score=int(raw_data.get("compression_score", 0)), pattern_detected=int(raw_data.get("pattern_detected", 0)), # Bearish-specific fields rsi_14=Decimal(str(raw_data["rsi_14"])) if raw_data.get("rsi_14") is not None else None, macd=Decimal(str(raw_data["macd"])) if raw_data.get("macd") is not None else None, macd_signal=Decimal(str(raw_data["macd_s"])) if raw_data.get("macd_s") is not None else None, macd_histogram=Decimal(str(raw_data["macd_h"])) if raw_data.get("macd_h") is not None else None, distribution_days_20=raw_data.get("dist_days_20"), atr_contraction=raw_data.get("atr_contraction"), big_down_volume=raw_data.get("big_down_vol"), ) def apply_screening_criteria( self, results: list[ScreeningResult], criteria: ScreeningCriteria ) -> list[ScreeningResult]: """ Apply screening criteria to filter results. This method implements all the business rules for filtering screening results based on the provided criteria. """ if not criteria.has_any_filters(): return results filtered_results = results # Momentum Score filters if criteria.min_momentum_score is not None: filtered_results = [ r for r in filtered_results if r.momentum_score >= criteria.min_momentum_score ] if criteria.max_momentum_score is not None: filtered_results = [ r for r in filtered_results if r.momentum_score <= criteria.max_momentum_score ] # Volume filters if criteria.min_volume is not None: filtered_results = [ r for r in filtered_results if r.avg_volume_30d >= criteria.min_volume ] if criteria.max_volume is not None: filtered_results = [ r for r in filtered_results if r.avg_volume_30d <= criteria.max_volume ] # Price filters if criteria.min_price is not None: filtered_results = [ r for r in filtered_results if r.close_price >= criteria.min_price ] if criteria.max_price is not None: filtered_results = [ r for r in filtered_results if r.close_price <= criteria.max_price ] # Score filters if criteria.min_combined_score is not None: filtered_results = [ r for r in filtered_results if r.combined_score >= criteria.min_combined_score ] if criteria.min_bear_score is not None: filtered_results = [ r for r in filtered_results if r.bear_score >= criteria.min_bear_score ] # ADR filters if criteria.min_adr_percentage is not None: filtered_results = [ r for r in filtered_results if r.adr_percentage >= criteria.min_adr_percentage ] if criteria.max_adr_percentage is not None: filtered_results = [ r for r in filtered_results if r.adr_percentage <= criteria.max_adr_percentage ] # Pattern filters if criteria.require_pattern_detected: filtered_results = [r for r in filtered_results if r.pattern_detected > 0] if criteria.require_squeeze: filtered_results = [ r for r in filtered_results if r.squeeze is not None and r.squeeze.strip() ] if criteria.require_vcp: filtered_results = [ r for r in filtered_results if r.vcp is not None and r.vcp.strip() ] if criteria.require_entry_signal: filtered_results = [ r for r in filtered_results if r.entry_signal is not None and r.entry_signal.strip() ] # Moving average filters if criteria.require_above_sma50: filtered_results = [r for r in filtered_results if r.close_price > r.sma_50] if criteria.require_above_sma150: filtered_results = [ r for r in filtered_results if r.close_price > r.sma_150 ] if criteria.require_above_sma200: filtered_results = [ r for r in filtered_results if r.close_price > r.sma_200 ] if criteria.require_ma_alignment: filtered_results = [ r for r in filtered_results if (r.sma_50 > r.sma_150 and r.sma_150 > r.sma_200) ] return filtered_results def sort_screening_results( self, results: list[ScreeningResult], sorting: SortingOptions ) -> list[ScreeningResult]: """ Sort screening results according to the specified options. This method implements the business rules for ranking and ordering screening results. """ def get_sort_value(result: ScreeningResult, field: str) -> Any: """Get the value for sorting from a result.""" if field == "combined_score": return result.combined_score elif field == "bear_score": return result.bear_score elif field == "momentum_score": return result.momentum_score elif field == "close_price": return result.close_price elif field == "volume": return result.volume elif field == "avg_volume_30d": return result.avg_volume_30d elif field == "adr_percentage": return result.adr_percentage elif field == "quality_score": return result.get_quality_score() else: return 0 # Sort by primary field sorted_results = sorted( results, key=lambda r: get_sort_value(r, sorting.field), reverse=sorting.descending, ) # Apply secondary sort if specified if sorting.secondary_field: sorted_results = sorted( sorted_results, key=lambda r: ( get_sort_value(r, sorting.field), get_sort_value(r, sorting.secondary_field), ), reverse=sorting.descending, ) return sorted_results def create_screening_collection( self, results: list[ScreeningResult], strategy: ScreeningStrategy, total_candidates: int, ) -> ScreeningResultCollection: """ Create a ScreeningResultCollection from individual results. This method assembles the aggregate root with proper validation. """ return ScreeningResultCollection( results=results, strategy_used=strategy.value, screening_timestamp=datetime.utcnow(), total_candidates_analyzed=total_candidates, ) def validate_screening_limits(self, requested_limit: int) -> int: """ Validate and adjust the requested result limit. Business rule: Limits must be within acceptable bounds. """ return self._default_limits.validate_limit(requested_limit) def calculate_screening_statistics( self, collection: ScreeningResultCollection ) -> dict[str, Any]: """ Calculate comprehensive statistics for a screening collection. This method provides business intelligence metrics for screening result analysis. """ base_stats = collection.get_statistics() # Add additional business metrics results = collection.results if not results: return base_stats # Quality distribution quality_scores = [r.get_quality_score() for r in results] base_stats.update( { "quality_distribution": { "high_quality": sum(1 for q in quality_scores if q >= 80), "medium_quality": sum(1 for q in quality_scores if 50 <= q < 80), "low_quality": sum(1 for q in quality_scores if q < 50), }, "avg_quality_score": sum(quality_scores) / len(quality_scores), } ) # Risk/reward analysis risk_rewards = [r.calculate_risk_reward_ratio() for r in results] valid_ratios = [rr for rr in risk_rewards if rr > 0] if valid_ratios: base_stats.update( { "risk_reward_analysis": { "avg_ratio": float(sum(valid_ratios) / len(valid_ratios)), "favorable_setups": sum(1 for rr in valid_ratios if rr >= 2), "conservative_setups": sum( 1 for rr in valid_ratios if 1 <= rr < 2 ), "risky_setups": sum(1 for rr in valid_ratios if rr < 1), } } ) # Strategy-specific metrics if collection.strategy_used == ScreeningStrategy.MAVERICK_BULLISH.value: base_stats["momentum_analysis"] = self._calculate_momentum_metrics(results) elif collection.strategy_used == ScreeningStrategy.MAVERICK_BEARISH.value: base_stats["weakness_analysis"] = self._calculate_weakness_metrics(results) elif collection.strategy_used == ScreeningStrategy.TRENDING_STAGE2.value: base_stats["trend_analysis"] = self._calculate_trend_metrics(results) return base_stats def _calculate_momentum_metrics( self, results: list[ScreeningResult] ) -> dict[str, Any]: """Calculate momentum-specific metrics for bullish screens.""" return { "high_momentum": sum(1 for r in results if r.combined_score >= 80), "pattern_breakouts": sum(1 for r in results if r.pattern_detected > 0), "strong_momentum": sum(1 for r in results if r.momentum_score >= 90), } def _calculate_weakness_metrics( self, results: list[ScreeningResult] ) -> dict[str, Any]: """Calculate weakness-specific metrics for bearish screens.""" return { "severe_weakness": sum(1 for r in results if r.bear_score >= 80), "distribution_signals": sum( 1 for r in results if r.distribution_days_20 is not None and r.distribution_days_20 >= 5 ), "breakdown_candidates": sum( 1 for r in results if r.close_price < r.sma_200 ), } def _calculate_trend_metrics( self, results: list[ScreeningResult] ) -> dict[str, Any]: """Calculate trend-specific metrics for trending screens.""" return { "strong_trends": sum(1 for r in results if r.is_trending_stage2()), "perfect_alignment": sum( 1 for r in results if (r.sma_50 > r.sma_150 and r.sma_150 > r.sma_200) ), "elite_momentum": sum(1 for r in results if r.momentum_score >= 95), }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server