Skip to main content
Glama
liuguoping1024

SWLC MCP Server

predict_lottery

Generate predicted lottery numbers for various games using historical data analysis and customizable strategies.

Instructions

预测彩票号码,基于历史数据生成预测结果

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
lottery_typeYes彩票类型
methodNo预测方法rule
countNo预测组数
strategyNo预测策略all

Implementation Reference

  • Registration of the 'predict_lottery' MCP tool including input schema definition
        name="predict_lottery",
        description="预测彩票号码,基于历史数据生成预测结果",
        inputSchema={
            "type": "object",
            "properties": {
                "lottery_type": {
                    "type": "string",
                    "enum": ["双色球", "福彩3D", "七乐彩", "快乐8"],
                    "description": "彩票类型"
                },
                "method": {
                    "type": "string",
                    "enum": ["rule"],
                    "default": "rule",
                    "description": "预测方法"
                },
                "count": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 20,
                    "default": 5,
                    "description": "预测组数"
                },
                "strategy": {
                    "type": "string",
                    "enum": ["all", "balanced", "cold_recovery", "hot_focus", "interval_balance", "contrarian"],
                    "default": "all",
                    "description": "预测策略"
                }
            },
            "required": ["lottery_type"]
        }
    ),
  • Main MCP tool handler for 'predict_lottery': fetches historical data from database/service, converts to dicts, calls PredictionManager.predict(), formats results as text.
    elif name == "predict_lottery":
        lottery_type = arguments.get("lottery_type")
        method = arguments.get("method", "rule")
        count = arguments.get("count", 5)
        strategy = arguments.get("strategy", "all")
        
        try:
            # 获取历史数据用于预测
            historical_data = await lottery_service.get_historical_data(lottery_type, 120)
            if not historical_data:
                return [types.TextContent(type="text", text=f"获取{lottery_type}历史数据失败,无法进行预测")]
            
            # 转换为字典格式
            history_dict = [{
                'period': r.period,
                'numbers': r.numbers,
                'special_numbers': r.special_numbers,
                'draw_date': r.draw_date
            } for r in historical_data]
            
            # 执行预测
            predictions = await lottery_service.prediction_manager.predict(
                lottery_type, history_dict, method=method, count=count, strategy=strategy
            )
            
            if predictions:
                text_lines = [f"{lottery_type}预测结果(方法:{method},策略:{strategy}):\n"]
                for i, pred in enumerate(predictions, 1):
                    if pred.special_numbers:
                        numbers_str = f"{' '.join(pred.numbers)} + {' '.join(pred.special_numbers)}"
                    else:
                        numbers_str = ' '.join(pred.numbers)
                    text_lines.append(f"预测 {i}: {numbers_str} (置信度: {pred.confidence:.2%})")
                
                return [types.TextContent(type="text", text="\n".join(text_lines))]
            else:
                return [types.TextContent(type="text", text=f"{lottery_type}预测失败")]
                
        except Exception as e:
            logger.error(f"预测失败: {e}")
            return [types.TextContent(type="text", text=f"预测失败:{str(e)}")]
  • Core prediction logic in RuleBasedPredictor class implementing rule-based strategies (balanced, cold recovery, hot focus, etc.) based on historical frequency analysis for generating lottery predictions.
    class RuleBasedPredictor:
        """基于规则的预测算法"""
        
        def predict(self, lottery_type: str, historical_data: List[Dict], count: int = 5, strategy: Optional[str] = None) -> List[PredictionResult]:
            """基于历史数据的规则预测,支持策略"""
            if lottery_type not in {"ssq", "3d", "qlc", "kl8", "双色球", "福彩3D", "七乐彩", "快乐8"}:
                raise ValueError(f"不支持的彩票类型: {lottery_type}")
            
            # 仅实现双色球策略,其它类型先回退到简单规则
            if lottery_type in ("ssq", "双色球"):
                return self._predict_ssq_with_strategies(historical_data, strategy, count)
            
            # 非双色球回退:简单随机+频率权重
            return self._predict_fallback(historical_data, count)
        
        def _predict_ssq_with_strategies(self, historical_data: List[Dict], strategy: Optional[str], count: int) -> List[PredictionResult]:
            # 计算频次
            freq: Dict[str, int] = {}
            blue_freq: Dict[str, int] = {}
            for d in historical_data:
                for n in d.get('numbers', []):
                    freq[n] = freq.get(n, 0) + 1
                for n in (d.get('special_numbers') or []):
                    blue_freq[n] = blue_freq.get(n, 0) + 1
            # 归一化全集
            all_red = [f"{i:02d}" for i in range(1, 34)]
            for n in all_red:
                freq.setdefault(n, 0)
            all_blue = [f"{i:02d}" for i in range(1, 17)]
            for n in all_blue:
                blue_freq.setdefault(n, 0)
            
            hot = sorted(freq.items(), key=lambda x: (-x[1], int(x[0])))
            cold = sorted(freq.items(), key=lambda x: (x[1], int(x[0])))
            medium = [k for k, v in sorted(freq.items(), key=lambda x: (abs(x[1] - (sum(freq.values())/len(freq) if freq else 0)), int(x[0])))]
            blue_sorted = sorted(blue_freq.items(), key=lambda x: (-x[1], int(x[0])))
            
            def pick_distinct(candidates: List[str], k: int, exclude: Optional[set] = None) -> List[str]:
                res = []
                seen = set(exclude or set())
                for n in candidates:
                    if n in seen:
                        continue
                    res.append(n)
                    seen.add(n)
                    if len(res) >= k:
                        break
                # 不足则随机补齐
                if len(res) < k:
                    pool = [x for x in all_red if x not in seen]
                    random.shuffle(pool)
                    res.extend(pool[:(k - len(res))])
                return sorted(res, key=lambda x: int(x))
            
            def blue_pick(top_k: int = 4) -> str:
                pool = [b for b, _ in blue_sorted[:top_k]] or all_blue
                return random.choice(pool)
            
            strategies = {
                'balanced': '热冷均衡型',
                'cold_recovery': '冷门回补型',
                'hot_focus': '热门集中型',
                'interval_balance': '区间平衡型',
                'contrarian': '反向思维型'
            }
            
            results: List[PredictionResult] = []
            to_run: List[str]
            if strategy in (None, '', 'all'):
                to_run = ['balanced', 'cold_recovery', 'hot_focus', 'interval_balance', 'contrarian']
            else:
                to_run = [strategy]
            
            for strat in to_run:
                if strat == 'balanced':
                    red = pick_distinct([k for k, _ in hot] + [k for k, _ in cold], 6)
                elif strat == 'cold_recovery':
                    red = pick_distinct([k for k, _ in cold] + medium + [k for k, _ in hot], 6)
                elif strat == 'hot_focus':
                    red = pick_distinct([k for k, _ in hot] + medium, 6)
                elif strat == 'interval_balance':
                    # 每区间各取2个:1-11,12-22,23-33,优先中频次
                    seg1 = [n for n in medium if 1 <= int(n) <= 11] + [k for k, _ in hot if 1 <= int(k) <= 11]
                    seg2 = [n for n in medium if 12 <= int(n) <= 22] + [k for k, _ in hot if 12 <= int(k) <= 22]
                    seg3 = [n for n in medium if 23 <= int(n) <= 33] + [k for k, _ in hot if 23 <= int(k) <= 33]
                    r1 = pick_distinct(seg1, 2)
                    r2 = pick_distinct(seg2, 2, exclude=set(r1))
                    r3 = pick_distinct(seg3, 2, exclude=set(r1 + r2))
                    red = sorted(list(set(r1 + r2 + r3)), key=lambda x: int(x))
                elif strat == 'contrarian':
                    # 避开最热与连号,偏向冷门与分散
                    base = [k for k, _ in cold] + medium
                    cand = []
                    for n in base:
                        if cand and int(n) == int(cand[-1]) + 1:
                            continue
                        cand.append(n)
                    red = pick_distinct(cand, 6)
                else:
                    red = pick_distinct([k for k, _ in hot] + [k for k, _ in cold], 6)
                blue = blue_pick()
                results.append(PredictionResult(
                    numbers=red,
                    special_numbers=[blue],
                    confidence=0.0,
                    method=strategies.get(strat, strat),
                    timestamp=datetime.now().isoformat(),
                    metadata={"strategy": strat}
                ))
            
            # 若用户只选单一策略且需要多组,生成该策略的多个变体
            if strategy not in (None, '', 'all') and len(results) < count:
                extra_needed = count - len(results)
                for i in range(extra_needed):
                    # 为同一策略生成不同的变体,增加随机性
                    if strategy == 'balanced':
                        # 热冷均衡型:调整热门和冷门的比例
                        hot_ratio = 0.3 + (i * 0.1)  # 0.3, 0.4, 0.5, 0.6, 0.7
                        hot_count = max(1, min(5, int(6 * hot_ratio)))
                        cold_count = 6 - hot_count
                        red = pick_distinct([k for k, _ in hot][:hot_count*2] + [k for k, _ in cold][:cold_count*2], 6)
                    elif strategy == 'cold_recovery':
                        # 冷门回补型:调整冷门比例
                        cold_ratio = 0.4 + (i * 0.1)  # 0.4, 0.5, 0.6, 0.7, 0.8
                        cold_count = max(2, min(5, int(6 * cold_ratio)))
                        hot_count = 6 - cold_count
                        red = pick_distinct([k for k, _ in cold][:cold_count*2] + [k for k, _ in hot][:hot_count*2], 6)
                    elif strategy == 'hot_focus':
                        # 热门集中型:调整热门比例
                        hot_ratio = 0.5 + (i * 0.1)  # 0.5, 0.6, 0.7, 0.8, 0.9
                        hot_count = max(3, min(6, int(6 * hot_ratio)))
                        cold_count = 6 - hot_count
                        red = pick_distinct([k for k, _ in hot][:hot_count*2] + [k for k, _ in cold][:cold_count*2], 6)
                    elif strategy == 'interval_balance':
                        # 区间平衡型:调整各区间的数量
                        seg1_count = 2 + (i % 2)  # 2或3
                        seg2_count = 2 + ((i + 1) % 2)  # 2或3
                        seg3_count = 6 - seg1_count - seg2_count
                        seg1 = [n for n in medium if 1 <= int(n) <= 11] + [k for k, _ in hot if 1 <= int(k) <= 11]
                        seg2 = [n for n in medium if 12 <= int(n) <= 22] + [k for k, _ in hot if 12 <= int(k) <= 22]
                        seg3 = [n for n in medium if 23 <= int(n) <= 33] + [k for k, _ in hot if 23 <= int(k) <= 33]
                        r1 = pick_distinct(seg1, seg1_count)
                        r2 = pick_distinct(seg2, seg2_count, exclude=set(r1))
                        r3 = pick_distinct(seg3, seg3_count, exclude=set(r1 + r2))
                        red = sorted(list(set(r1 + r2 + r3)), key=lambda x: int(x))
                    elif strategy == 'contrarian':
                        # 反向思维型:调整避开热门的程度
                        avoid_hot_ratio = 0.3 + (i * 0.1)  # 0.3, 0.4, 0.5, 0.6, 0.7
                        avoid_count = int(len(hot) * avoid_hot_ratio)
                        avoid_set = set([k for k, _ in hot[:avoid_count]])
                        base = [k for k, _ in cold if k not in avoid_set] + [k for k, _ in medium if k not in avoid_set]
                        red = pick_distinct(base, 6)
                    else:
                        # 默认策略的变体
                        jitter = [k for k, _ in hot][:10] + [k for k, _ in cold][:10]
                        random.shuffle(jitter)
                        red = pick_distinct(jitter, 6)
                    
                    blue = blue_pick()
                    results.append(PredictionResult(
                        numbers=red,
                        special_numbers=[blue],
                        confidence=0.0,
                        method=strategies.get(strategy, strategy),
                        timestamp=datetime.now().isoformat(),
                        metadata={"strategy": strategy, "variant": i + 1}
                    ))
            return results
        
        def _predict_fallback(self, historical_data: List[Dict], count: int) -> List[PredictionResult]:
            results: List[PredictionResult] = []
            for _ in range(count):
                numbers = sorted(random.sample(range(1, 34), 6))
                blue = random.randint(1, 16)
                results.append(PredictionResult(
                    numbers=[f"{n:02d}" for n in numbers],
                    special_numbers=[f"{blue:02d}"],
                    confidence=0.0,
                    method='rule',
                    timestamp=datetime.now().isoformat(),
                    metadata={"fallback": True}
                ))
            return results
  • PredictionManager wrapper that initializes and calls RuleBasedPredictor.predict(), used by the tool handler.
    class PredictionManager:
        """预测管理器"""
        
        def __init__(self):
            self.rule_predictor = RuleBasedPredictor()
        
        async def predict(self, lottery_type: str, historical_data: List[Dict], 
                         method: str = 'rule', count: int = 5, strategy: Optional[str] = None) -> List[PredictionResult]:
            """执行预测"""
            try:
                # 目前仅实现规则+策略
                return self.rule_predictor.predict(lottery_type, historical_data, count=count, strategy=strategy)
            except Exception as e:
                logger.error(f"预测失败: {e}")
                return self.rule_predictor.predict(lottery_type, historical_data, count=count, strategy=strategy)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liuguoping1024/swlc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server