Bilan complet toutes phases
ffbb_bilanRetrieve a complete team record across all phases in one call. Aggregates wins, losses, draws, and points scored/allowed from all competitions. Query by club name, ID, or team category.
Instructions
Bilan complet d'une équipe toutes phases confondues en UN seul appel.
⚡ C'est l'outil à utiliser en priorité pour toute question de type
"quel est le bilan de X cette saison ?" ou "résultats de U11M1".
Encapsule en interne : recherche club → équipes → classements de toutes
les phases en parallèle → agrégation V/D/N et paniers marqués/encaissés.
Retourne :
- bilan_total : total V/D/N, paniers marqués/encaissés, différence
- phases : détail par compétition/phase (position, V/D/N, paniers)Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| club_name | No | Nom du club (ex: 'Stade Clermontois', 'ASVEL'). | |
| organisme_id | No | ID FFBB du club (alternative plus rapide à club_name). | |
| categorie | No | Catégorie + genre + numéro d'équipe (ex: 'U11M1', 'U13F2', 'U15M', 'Senior'). | |
| force_refresh | No | Si True, contourne le cache pour récupérer des données fraîches. |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/ffbb_mcp/services.py:1560-1768 (handler)Core service function `ffbb_bilan_service` that executes the bilan logic: resolves club, fetches équipes, aggregates poules data across all phases, and returns bilan_total + phases + equipes_bilan.
async def ffbb_bilan_service( club_name: str | None = None, organisme_id: int | str | None = None, categorie: str | None = None, force_refresh: bool = False, ) -> dict[str, Any]: """ Bilan complet d'une équipe toutes phases confondues en un seul appel. Workflow interne : search → equipes → poules (parallèle) → agrégation V/D/N + paniers. Args: force_refresh: Si True, bypass le cache pour obtenir des données fraîches. """ cache_key = f"bilan:{organisme_id or ''}:{_normalize_name(club_name or '')}:{_normalize_name(categorie or '')}" async def _fetch() -> dict[str, Any]: # 1. Résoudre l'organisme_id (CENTRALISÉ) resolved_clubs, org_data = await _resolve_club_and_org( club_name=club_name, organisme_id=organisme_id, categorie=categorie ) target_org_ids = [str(c["organisme_id"]) for c in resolved_clubs] club_nom = resolved_clubs[0]["nom"] if resolved_clubs else (club_name or "") if not target_org_ids: return {"error": f"Club '{club_name}' introuvable"} # 2. Récupérer les équipes filtrées en parallèle eq_tasks = [] for oid in target_org_ids: # On passe org_data seulement si c'est l'organisme cible direct # pour optimiser l'appel interne. is_target = organisme_id and str(oid) == str(organisme_id) pass_org = org_data if is_target else None eq_tasks.append( ffbb_equipes_club_service( organisme_id=oid, filtre=categorie, org_data=pass_org ) ) eq_results = await asyncio.gather(*eq_tasks, return_exceptions=True) equipes: list[dict[str, Any]] = [] for res in eq_results: if isinstance(res, list): # FILTRE: on ignore les dictionnaires d'erreur (Hinting) pour le traitement interne equipes.extend( [e for e in res if isinstance(e, dict) and "error" not in e] ) elif isinstance(res, Exception): logger.error("Erreur lors de la récupération des équipes: %s", res) if not equipes: return {"error": f"Aucune équipe trouvée pour la catégorie '{categorie}'"} # Dédupliquer les équipes par engagement_id pour éviter les doublons de matchs deduped_equipes: list[dict[str, Any]] = [] seen_engagement_ids: set[str] = set() for equipe in equipes: if not isinstance(equipe, dict): continue engagement_id = equipe.get("engagement_id") if engagement_id is None: deduped_equipes.append(equipe) continue engagement_key = str(engagement_id) if engagement_key in seen_engagement_ids: continue seen_engagement_ids.add(engagement_key) deduped_equipes.append(equipe) equipes = deduped_equipes # 3. Récupérer toutes les poules concernées unique_poule_ids = list( dict.fromkeys(str(e.get("poule_id")) for e in equipes if e.get("poule_id")) ) # FIX: print() → logger.debug() (les print polluaient stdout/Coolify en prod) logger.debug(f"ffbb_bilan: club_nom={club_nom} cible_orgs={target_org_ids}") logger.debug( f"ffbb_bilan: equipes_count={len(equipes)} unique_poules={unique_poule_ids}" ) async def _fetch_poule_bilan(pid: str) -> dict[str, Any] | Exception: try: return await get_poule_service(pid) except Exception as e: return e poules_raw = await asyncio.gather( *[_fetch_poule_bilan(pid) for pid in unique_poule_ids], return_exceptions=True, ) logger.debug("ffbb_bilan: poules_raw=%s", poules_raw) poules_map: dict[str, dict[str, Any]] = { pid: pd # type: ignore for pid, pd in zip(unique_poule_ids, poules_raw, strict=False) if not isinstance(pd, Exception) and pd } # type: ignore logger.debug("ffbb_bilan: poules_map_keys=%s", list(poules_map.keys())) # Map poule_id → engagement_ids du club + nom compétition + numero_equipe poule_to_eng: dict[str, set[str]] = {} poule_to_comp: dict[str, str] = {} eng_to_num: dict[str, str] = {} # engagement_id → numero_equipe org_ids_str = set(target_org_ids) for e in equipes: pid = str(e.get("poule_id", "")) eid = str(e.get("engagement_id", "")) num = str(e.get("numero_equipe") or "") if pid and eid: poule_to_eng.setdefault(pid, set()).add(eid) if num: eng_to_num[eid] = num if pid and e.get("competition"): poule_to_comp[pid] = e["competition"] # 4. Agréger par phase phases: list[dict[str, Any]] = [] totaux = _new_bilan_totals() for pid, poule_data in poules_map.items(): if not isinstance(poule_data, dict): continue eng_ids_here = poule_to_eng.get(pid, set()) for entry in poule_data.get("classements", []) or []: if not isinstance(entry, dict): continue eng = entry.get("id_engagement", {}) or {} entry_eng_id = str(eng.get("id", "")) entry_org_id = str(entry.get("organisme_id", "")) if entry_eng_id in eng_ids_here: pass elif entry_org_id in org_ids_str: logger.debug( "ffbb_bilan: fallback org_id utilisé pour entry_eng_id=%s org_id=%s", entry_eng_id, entry_org_id, ) else: continue stats = _extract_and_accumulate_bilan(entry, totaux) # Résolution du numéro d'équipe : priorité au mapping issu de # ffbb_equipes_club_service (le plus fiable), sinon lecture directe # dans l'entrée de classement retournée par l'API (fallback propre). num_equipe = eng_to_num.get(entry_eng_id) or str( eng.get("numero_equipe") or "" ) phases.append( { "competition": poule_to_comp.get(pid, ""), "poule_id": pid, "numero_equipe": num_equipe, "position": entry.get("position"), **stats, } ) # Tri déterministe : par compétition puis par numéro d'équipe pour que # les phases d'une même équipe soient toujours regroupées dans l'ordre. phases.sort(key=lambda x: (x["competition"], x["numero_equipe"] or "")) # Structure groupée par numéro d'équipe pour éliminer toute ambiguïté # lorsqu'un club engage plusieurs équipes dans la même catégorie. equipes_bilan: dict[str, Any] = {} for p in phases: num = p["numero_equipe"] or "1" if num not in equipes_bilan: equipes_bilan[num] = { "numero_equipe": num, "bilan": _new_bilan_totals(), "phases": [], } equipes_bilan[num]["phases"].append(p) b = equipes_bilan[num]["bilan"] for f in _BILAN_STAT_FIELDS: b[f] += p[f] # Identifier la phase la plus récente pour l'équipe principale phase_courante = None if phases: target_phases = [ p for p in phases if str(p.get("numero_equipe", "1")) == "1" ] phase_courante = target_phases[-1] if target_phases else phases[-1] return { "club": club_nom, "categorie": categorie or "", "bilan_total": totaux, "phase_courante": phase_courante, "equipes_bilan": equipes_bilan, "phases": phases, } # Force refresh : bypass le cache et appel direct if force_refresh and state.cache_bilan is not None: logger.debug(f"force_refresh=True, bypass cache pour {cache_key}") state.cache_bilan.pop(cache_key, None) return await _dedupe_inflight( cache=state.cache_bilan, cache_key=cache_key, inflight_map=state.inflight_bilan, make_coro=_fetch, cache_name="bilan", ) - src/ffbb_mcp/server.py:467-524 (registration)MCP tool registration of `ffbb_bilan` via `@mcp.tool()` decorator. Defines the tool name, title, annotations, input schema (club_name, organisme_id, categorie, force_refresh), and delegates to `ffbb_bilan_service`.
@mcp.tool( name="ffbb_bilan", title="Bilan complet toutes phases", annotations=_READONLY_ANNOTATIONS, ) @zipai_surgical async def ffbb_bilan( club_name: Annotated[ str | None, Field(description="Nom du club (ex: 'Stade Clermontois', 'ASVEL')."), ] = None, organisme_id: Annotated[ int | str | None, Field(description="ID FFBB du club (alternative plus rapide à club_name)."), ] = None, categorie: Annotated[ str | None, Field( description=( "Catégorie + genre + numéro d'équipe (ex: 'U11M1', 'U13F2', 'U15M', 'Senior')." ), ), ] = None, force_refresh: Annotated[ bool, Field( description="Si True, contourne le cache pour récupérer des données fraîches." ), ] = False, ctx: Context[Any, Any, Any] | None = None, ) -> dict[str, Any]: """Bilan complet d'une équipe toutes phases confondues en UN seul appel. ⚡ C'est l'outil à utiliser en priorité pour toute question de type "quel est le bilan de X cette saison ?" ou "résultats de U11M1". Encapsule en interne : recherche club → équipes → classements de toutes les phases en parallèle → agrégation V/D/N et paniers marqués/encaissés. Retourne : - bilan_total : total V/D/N, paniers marqués/encaissés, différence - phases : détail par compétition/phase (position, V/D/N, paniers) """ try: if ctx: await ctx.report_progress(0, total=3, message="Résolution du club…") effective_refresh = force_refresh result = await ffbb_bilan_service( club_name=club_name, organisme_id=organisme_id, categorie=categorie, force_refresh=effective_refresh, ) if ctx: await ctx.report_progress(3, total=3, message="Bilan prêt.") return result except Exception as e: raise handle_api_error(e) from e - src/ffbb_mcp/services.py:293-301 (schema)Helper tuple `_BILAN_STAT_FIELDS` defining the schema fields (match_joues, gagnes, perdus, nuls, paniers_marques, paniers_encaisses, difference) used for bilan aggregation.
_BILAN_STAT_FIELDS: tuple[str, ...] = ( "match_joues", "gagnes", "perdus", "nuls", "paniers_marques", "paniers_encaisses", "difference", ) - src/ffbb_mcp/services.py:304-315 (helper)Helper `_new_bilan_totals` and `_extract_and_accumulate_bilan` for initializing and accumulating bilan statistics from classement entries.
def _new_bilan_totals() -> dict[str, int]: return dict.fromkeys(_BILAN_STAT_FIELDS, 0) def _extract_and_accumulate_bilan( entry: dict[str, Any], totaux: dict[str, int] ) -> dict[str, int]: stats = {f: int(entry.get(f) or 0) for f in _BILAN_STAT_FIELDS} for f, v in stats.items(): totaux[f] += v return stats - src/ffbb_mcp/services.py:137-181 (helper)TTL cache setup for bilan data: `_ttu_bilan` and `state.cache_bilan` (TLRUCache) with deduplication inflight map `state.inflight_bilan`.
def _ttu_bilan(k, v, now): ttl = ( v.get("_ttl", get_static_ttl("bilan")) if isinstance(v, dict) else get_static_ttl("bilan") ) return now + ttl def _ttu_poule(k, v, now): # Retrieve ttl from the wrapped object, fallback to default get_static_ttl ttl = ( v.get("_ttl", get_static_ttl("poule")) if isinstance(v, dict) else get_static_ttl("poule") ) return now + ttl def _ttu_calendrier(k, v, now): return now + _read_positive_int_env( "FFBB_CACHE_TTL_CALENDRIER", get_static_ttl("calendrier") ) # TTL des caches froids : le TTL est calculé une seule fois à l'initialisation (au chargement du module). # Changer la variable d'environnement à chaud n'aura pas d'effet sur ces caches. C'est le comportement intentionnel # pour éviter des ré-évaluations constantes pour des données relativement statiques ou basées sur un restart du serveur. state.cache_lives = TTLCache( maxsize=1, ttl=_read_positive_int_env("FFBB_CACHE_TTL_LIVES", get_static_ttl("lives")), ) state.cache_search = TTLCache( maxsize=256, ttl=_read_positive_int_env("FFBB_CACHE_TTL_SEARCH", get_static_ttl("search")), ) state.cache_detail = TTLCache( maxsize=128, ttl=_read_positive_int_env("FFBB_CACHE_TTL_DETAIL", get_static_ttl("organisme")), ) state.cache_calendrier = TLRUCache( maxsize=64, ttu=_ttu_calendrier, ) state.cache_bilan = TLRUCache(maxsize=64, ttu=_ttu_bilan)