from fastapi import FastAPI from fastapi.responses import Response, HTMLResponse from fastapi import HTTPException from fastapi import Request from contextlib import asynccontextmanager import requests import threading import time import queue import argparse import uvicorn import os import pandas as pd import json from datetime import datetime, time as dtime, timedelta from fastapi.responses import Response import logging import logging.config import platform # передадим параметры через аргументы или глобальные переменные parser = argparse.ArgumentParser() parser = argparse.ArgumentParser() parser.add_argument("--league", default="vtb") parser.add_argument("--team", required=True) parser.add_argument("--lang", default="en") args = parser.parse_args() MYHOST = platform.node() if not os.path.exists("logs"): os.makedirs("logs") telegram_bot_token = "7639240596:AAH0YtdQoWZSC-_R_EW4wKAHHNLIA0F_ARY" # TELEGRAM_CHAT_ID = 228977654 telegram_chat_id = -4803699526 log_config = { "version": 1, "handlers": { "telegram": { "class": "telegram_handler.TelegramHandler", "level": "INFO", "token": telegram_bot_token, "chat_id": telegram_chat_id, "formatter": "telegram", }, "console": { "class": "logging.StreamHandler", "level": "INFO", "formatter": "simple", "stream": "ext://sys.stdout", }, "file": { "class": "logging.FileHandler", "level": "DEBUG", "formatter": "simple", "filename": f"logs/GFX_{MYHOST}.log", "encoding": "utf-8", }, }, "loggers": { __name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"}, }, "formatters": { "telegram": { "class": "telegram_handler.HtmlFormatter", "format": f"%(levelname)s [{MYHOST.upper()}] %(message)s", "use_emoji": "True", }, "simple": { "class": "logging.Formatter", "format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s", "datefmt": "%d.%m.%Y %H:%M:%S", }, }, } logging.config.dictConfig(log_config) logger = logging.getLogger(__name__) logger.handlers[2].formatter.use_emoji = True LEAGUE = args.league TEAM = args.team LANG = args.lang HOST = "https://deti.russiabasket.org" STATUS = False GAME_ID = None SEASON = None GAME_START_DT = None # datetime начала матча (локальная из календаря) GAME_TODAY = False # флаг: игра сегодня GAME_SOON = False # флаг: игра сегодня и скоро (<1 часа) URLS = { "seasons": "{host}/api/abc/comps/seasons?Tag={league}", "actual-standings": "{host}/api/abc/comps/actual-standings?tag={league}&season={season}&lang={lang}", "calendar": "{host}/api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000", "game": "{host}/api/abc/games/game?Id={game_id}&Lang={lang}", "pregame": "{host}/api/abc/games/pregame?tag={league}&season={season}&id={game_id}&lang={lang}", "pregame-full-stats": "{host}/api/abc/games/pregame-full-stats?tag={league}&season={season}&id={game_id}&lang={lang}", "live-status": "{host}/api/abc/games/live-status?id={game_id}", "box-score": "{host}/api/abc/games/box-score?id={game_id}", "play-by-play": "{host}/api/abc/games/play-by-play?id={game_id}", } def start_offline_threads(season, game_id): """Запускаем редкие запросы, когда матча нет или он уже сыгран.""" global threads_offline, CURRENT_THREADS_MODE, stop_event_offline # если уже работаем в офлайне — не дублируем if CURRENT_THREADS_MODE == "offline": return # на всякий случай гасим лайв stop_live_threads() stop_event_offline.clear() threads_offline = [ threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=game_id, lang=LANG), 1, # раз в секунду/реже stop_event_offline, ), daemon=True, ) ] for t in threads_offline: t.start() CURRENT_THREADS_MODE = "offline" logger.info("[threads] OFFLINE threads started") def start_live_threads(season, game_id): """Запускаем частые онлайн-запросы, когда матч идёт/вот-вот.""" global threads_live, CURRENT_THREADS_MODE, stop_event_live # если уже в лайве — не дублируем if CURRENT_THREADS_MODE == "live": return # на всякий случай гасим офлайн stop_offline_threads() stop_event_live.clear() threads_live = [ threading.Thread( target=get_data_from_API, args=( "pregame", URLS["pregame"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 0.0016667, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "pregame-full-stats", URLS["pregame-full-stats"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 0.0016667, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "actual-standings", URLS["actual-standings"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ), 0.0016667, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=game_id, lang=LANG), 0.00016, # часто stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "live-status", URLS["live-status"].format(host=HOST, game_id=game_id), 1, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "box-score", URLS["box-score"].format(host=HOST, game_id=game_id), 1, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "play-by-play", URLS["play-by-play"].format(host=HOST, game_id=game_id), 1, stop_event_live, ), daemon=True, ), ] for t in threads_live: t.start() CURRENT_THREADS_MODE = "live" logger.info("[threads] LIVE threads started") def stop_live_threads(): """Гасим только live-треды.""" global threads_live if not threads_live: return stop_event_live.set() for t in threads_live: t.join(timeout=1) threads_live = [] logger.info("[threads] LIVE threads stopped") def stop_offline_threads(): """Гасим только offline-треды.""" global threads_offline if not threads_offline: return stop_event_offline.set() for t in threads_offline: t.join(timeout=1) threads_offline = [] logger.info("[threads] OFFLINE threads stopped") # общая очередь results_q = queue.Queue() # тут будем хранить последние данные latest_data = {} # событие для остановки потоков stop_event = threading.Event() # отдельные события для разных наборов потоков stop_event_live = threading.Event() stop_event_offline = threading.Event() # чтобы из consumer можно было их гасить threads_live = [] threads_offline = [] # какой режим сейчас запущен: "live" / "offline" / None CURRENT_THREADS_MODE = None # Функция запускаемая в потоках def get_data_from_API( name: str, url: str, quantity: float, stop_event: threading.Event ): if quantity <= 0: raise ValueError("quantity must be > 0") sleep_time = 1.0 / quantity # это и есть "раз в N секунд" while not stop_event.is_set(): start = time.time() try: value = requests.get(url, timeout=5).json() except Exception as ex: value = {"error": str(ex)} ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] results_q.put({"source": name, "ts": ts, "data": value}) logger.debug(f"[{ts}] name: {name}, status: {value.get('status', 'no-status')}") # сколько уже заняло elapsed = time.time() - start # сколько надо доспать, чтобы в сумме вышла нужная частота to_sleep = sleep_time - elapsed if to_sleep > 0: time.sleep(to_sleep) # если запрос занял дольше — просто сразу следующую итерацию # Получение результатов из всех запущенных потоков def results_consumer(): while not stop_event.is_set(): try: msg = results_q.get(timeout=0.5) except queue.Empty: continue try: source = msg.get("source") payload = msg.get("data") or {} # универсальный статус (может не быть) incoming_status = payload.get("status") # может быть None # 1) play-by-play if "play-by-play" in source: game = latest_data.get("game") # если игра уже нормальная — приклеиваем плейи if ( game and isinstance(game, dict) and "data" in game and "result" in game["data"] ): # у pbp тоже может не быть data/result if "result" in payload: game["data"]["result"]["plays"] = payload["result"] # а вот статус у play-by-play иногда просто "no-status" latest_data[source] = { "ts": msg["ts"], "data": incoming_status if incoming_status is not None else payload, } # 2) box-score elif "box-score" in source: game = latest_data.get("game") if ( game and "data" in game and "result" in game["data"] and "teams" in game["data"]["result"] and "result" in payload and "teams" in payload["result"] ): # обновляем команды game["data"]["result"]["game"]["fullScore"] = payload["result"][ "fullScore" ] for team in game["data"]["result"]["teams"]: if team["teamNumber"] != 0: box_team = [ t for t in payload["result"]["teams"] if t["teamNumber"] == team["teamNumber"] ] if not box_team: print("ERROR: box-score team not found") continue box_team = box_team[0] for player in team["starts"]: box_player = [ p for p in box_team["starts"] if p["startNum"] == player["startNum"] ] if box_player: player["stats"] = box_player[0] team["total"] = box_team["total"] team["startTotal"] = box_team["startTotal"] team["benchTotal"] = box_team["benchTotal"] team["maxLeading"] = box_team["maxLeading"] team["pointsInRow"] = box_team["pointsInRow"] team["maxPointsInRow"] = box_team["maxPointsInRow"] # в любом случае сохраняем сам факт, что box-score пришёл latest_data[source] = { "ts": msg["ts"], "data": incoming_status if incoming_status is not None else payload, } elif "live-status" in source: latest_data[source] = { "ts": msg["ts"], "data": payload, } try: ls_data = payload.get("result") or payload raw_ls_status = ( ls_data.get("status") or ls_data.get("gameStatus") or ls_data.get("state") ) if raw_ls_status: raw_ls_status_low = str(raw_ls_status).lower() finished_markers = [ "finished", "result", "resultconfirmed", "ended", "final", "game over", ] # матч ЗАКОНЧЕН → гасим live и включаем offline if any(m in raw_ls_status_low for m in finished_markers): print("[status] match finished → switch to OFFLINE") if ( GAME_START_DT and GAME_START_DT.date() == datetime.now().date() ): globals()["STATUS"] = "finished_today" else: globals()["STATUS"] = "finished" stop_live_threads() start_offline_threads(SEASON, GAME_ID) # матч СТАЛ онлайном (напр., из Scheduled → Online) elif ( "online" in raw_ls_status_low or "live" in raw_ls_status_low ): if globals().get("STATUS") not in ["live", "live_soon"]: print( "[status] match became LIVE → switch to LIVE threads" ) globals()["STATUS"] = "live" start_live_threads(SEASON, GAME_ID) except Exception as e: print("results_consumer: live-status postprocess error:", e) else: if source == "game": has_game_already = "game" in latest_data # есть ли в ответе ПОЛНАЯ структура is_full = ( "data" in payload and isinstance(payload["data"], dict) and "result" in payload["data"] ) if is_full: # полный game — всегда кладём latest_data["game"] = { "ts": msg["ts"], "data": payload, } else: # game неполный if not has_game_already: # 👉 раньше game вообще не было — лучше положить хоть что-то latest_data["game"] = { "ts": msg["ts"], "data": payload, } else: # 👉 уже есть какой-то game — неполным НЕ затираем print( "results_consumer: got partial game, keeping previous one" ) # и обязательно continue/return из этого elif/if else: latest_data[source] = { "ts": msg["ts"], "data": payload, } continue # ... остальная обработка ... except Exception as e: print("results_consumer error:", repr(e)) continue def get_items(data: dict) -> list: """ Мелкий хелпер: берём первый список в ответе API. Многие ручки отдают {"result":[...]} или {"seasons":[...]}. Если находим список — возвращаем его. Если нет — возвращаем None (значит, нужно брать весь dict). """ for k, v in data.items(): if isinstance(v, list): return data[k] return None from datetime import datetime def pick_game_for_team(calendar_json): """ Возвращает: game_id: str | None game_dt: datetime | None is_today: bool cal_status: str | None # Scheduled / Online / Result / ResultConfirmed Логика: 1. если в календаре есть игра КОМАНДЫ на сегодня — берём ЕЁ и возвращаем её gameStatus 2. иначе — берём последнюю прошедшую и тоже возвращаем её gameStatus """ items = get_items(calendar_json) if not items: return None, None, False, None today = datetime.now().date() # 1) сначала — сегодняшняя for game in reversed(items): if game["team1"]["name"].lower() != TEAM.lower(): continue gdt = extract_game_datetime(game) gdate = ( gdt.date() if gdt else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date() ) if gdate == today: cal_status = game["game"].get("gameStatus") return game["game"]["id"], gdt, True, cal_status # 2) если на сегодня нет — берём последнюю прошедшую last_id = None last_dt = None last_status = None for game in reversed(items): if game["team1"]["name"].lower() != TEAM.lower(): continue gdt = extract_game_datetime(game) gdate = ( gdt.date() if gdt else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date() ) if gdate <= today: last_id = game["game"]["id"] last_dt = gdt last_status = game["game"].get("gameStatus") break return last_id, last_dt, False, last_status def extract_game_datetime(game_item: dict) -> datetime | None: """ Из элемента календаря достаём datetime матча. В календаре есть localDate и часто localTime. Если localTime нет — берём 00:00. """ try: date_str = game_item["game"]["localDate"] # '31.10.2025' dt_date = datetime.strptime(date_str, "%d.%m.%Y").date() time_str = game_item["game"].get("localTime") # '19:30' if time_str: hh, mm = map(int, time_str.split(":")) dt_time = dtime(hour=hh, minute=mm) else: dt_time = dtime(hour=0, minute=0) return datetime.combine(dt_date, dt_time) except Exception: return None @asynccontextmanager async def lifespan(app: FastAPI): global STATUS, GAME_ID, SEASON, GAME_START_DT, GAME_TODAY, GAME_SOON # 1. проверяем API: seasons try: seasons_resp = requests.get( URLS["seasons"].format(host=HOST, league=LEAGUE) ).json() season = seasons_resp["items"][0]["season"] except Exception: now = datetime.now() if now.month > 9: season = now.year + 1 else: season = now.year print("не удалось получить последний сезон.") SEASON = season # 2. берём календарь try: calendar = requests.get( URLS["calendar"].format(host=HOST, league=LEAGUE, season=season, lang=LANG) ).json() except Exception as ex: print(f"не получилось проверить работу API. код ошибки: {ex}") # тут можно вообще не запускать сервер, но оставим как есть calendar = None # 3. определяем игру game_id, game_dt, is_today, cal_status = ( pick_game_for_team(calendar) if calendar else (None, None, False, None) ) GAME_ID = game_id GAME_START_DT = game_dt GAME_TODAY = is_today # 4. запускаем "длинные" потоки (они у тебя и так всегда) thread_result_consumer = threading.Thread( target=results_consumer, daemon=True, ) thread_result_consumer.start() # 5. Подготовим онлайн и офлайн наборы (как у тебя) threads_live = [ threading.Thread( target=get_data_from_API, args=( "pregame", URLS["pregame"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 0.0016667, stop_event, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "pregame-full-stats", URLS["pregame-full-stats"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 0.0016667, stop_event, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "actual-standings", URLS["actual-standings"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ), 0.0016667, stop_event, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=game_id, lang=LANG), 0.00016, stop_event, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "live-status", URLS["live-status"].format(host=HOST, game_id=game_id), 1, stop_event, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "box-score", URLS["box-score"].format(host=HOST, game_id=game_id), 1, stop_event, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "play-by-play", URLS["play-by-play"].format(host=HOST, game_id=game_id), 1, stop_event, ), daemon=True, ), ] threads_offline = [ threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=game_id, lang=LANG), 1, # реже stop_event, ), daemon=True, ) ] # 5. решаем, что запускать if not is_today: STATUS = "no_game_today" start_offline_threads(SEASON, GAME_ID) else: # игра сегодня if cal_status is None: STATUS = "today_not_started" start_offline_threads(SEASON, GAME_ID) elif cal_status == "Scheduled": if game_dt: delta = game_dt - datetime.now() if delta <= timedelta(hours=1): STATUS = "live_soon" start_live_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" start_offline_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" start_offline_threads(SEASON, GAME_ID) elif cal_status == "Online": STATUS = "live" start_live_threads(SEASON, GAME_ID) elif cal_status in ["Result", "ResultConfirmed"]: STATUS = "finished_today" start_offline_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" start_offline_threads(SEASON, GAME_ID) yield # -------- shutdown -------- stop_event.set() # завершить consumer stop_live_threads() stop_offline_threads() thread_result_consumer.join(timeout=1) app = FastAPI(lifespan=lifespan) def format_time(seconds: float | int) -> str: """ Удобный формат времени для игроков: 71 -> "1:11" 0 -> "0:00" Любые кривые значения -> "0:00". """ try: total_seconds = int(float(seconds)) minutes = total_seconds // 60 sec = total_seconds % 60 return f"{minutes}:{sec:02}" except (ValueError, TypeError): return "0:00" @app.get("/team1") async def team1(): game = get_latest_game_safe() if not game: raise HTTPException(status_code=503, detail="game data not ready") return await team("team1") @app.get("/team2") async def team2(): game = get_latest_game_safe() if not game: raise HTTPException(status_code=503, detail="game data not ready") return await team("team2") @app.get("/top_team1") async def top_team1(): data = await team("team1") return await top_sorted_team(data) @app.get("/top_team2") async def top_team2(): data = await team("team2") return await top_sorted_team(data) @app.get("/started_team1") async def started_team1(): data = await team("team1") return await started_team(data) @app.get("/started_team2") async def started_team2(): data = await team("team2") return await started_team(data) @app.get("/game") async def game(): return latest_data["game"] @app.get("/status") async def status(request: Request): def color_for_status(status_value: str) -> str: """Подбор цвета статуса в HEX""" status_value = str(status_value).lower() if status_value in ["ok", "success", "live", "live_soon", "online"]: return "#00FF00" # зелёный elif status_value in ["scheduled", "today_not_started", "upcoming"]: return "#FFFF00" # жёлтый elif status_value in [ "result", "resultconfirmed", "finished", "finished_today", ]: return "#FF0000" # красный elif status_value in ["no_game_today", "unknown", "none"]: return "#FFFFFF" # белый else: return "#808080" # серый (неизвестный статус) # ✳️ сортируем latest_data в нужном порядке sort_order = ["game", "live-status", "box-score", "play-by-play"] sorted_keys = ( [k for k in sort_order if k in latest_data] + sorted([k for k in latest_data if k not in sort_order]) ) data = { "league": LEAGUE, "team": TEAM, "game_id": GAME_ID, "game_status": STATUS, "statuses": [ { "name": TEAM, "status": STATUS, "ts": ( GAME_START_DT.strftime("%Y-%m-%d %H:%M") if GAME_START_DT else "N/A" ), "link": LEAGUE, "color": color_for_status(STATUS), } ] + [ { "name": item, "status": ( latest_data[item]["data"]["status"] if isinstance(latest_data[item]["data"], dict) and "status" in latest_data[item]["data"] else latest_data[item]["data"] ), "ts": latest_data[item]["ts"], "link": URLS[item].format( host=HOST, league=LEAGUE, season=SEASON, lang=LANG, game_id=GAME_ID, ), "color": color_for_status( latest_data[item]["data"]["status"] if isinstance(latest_data[item]["data"], dict) and "status" in latest_data[item]["data"] else latest_data[item]["data"] ), } for item in sorted_keys # ← используем отсортированный порядок ], } accept = request.headers.get("accept", "") if "text/html" in accept: status_raw = str(STATUS).lower() if status_raw in ["live"]: gs_class = "live" gs_text = "🟢 LIVE" elif status_raw in ["live_soon"]: gs_class = "live" gs_text = "🟢 GAME TODAY (soon)" elif status_raw == "today_not_started": gs_class = "upcoming" gs_text = "🟡 Game today, not started" elif status_raw in ["finished_today", "finished"]: gs_class = "finished" gs_text = "🔴 Game finished" elif status_raw == "no_game_today": gs_class = "unknown" gs_text = "⚪ No game today" else: gs_class = "unknown" gs_text = "⚪ UNKNOWN" html = f"""

📊 Game Status Monitor

League: {LEAGUE}

Team: {TEAM}

Game ID: {GAME_ID}

Game Status: {gs_text}

""" for s in data["statuses"]: status_text = str(s["status"]).strip().lower() if any( x in status_text for x in ["ok", "success", "live", "live_soon", "online"] ): color_class = "ok" elif any( x in status_text for x in ["scheduled", "today_not_started", "upcoming"] ): color_class = "warn" elif any( x in status_text for x in ["result", "resultconfirmed", "finished", "finished_today"] ): color_class = "fail" else: color_class = "unknown" html += f""" """ html += """
NameStatusTimestampLink
{s["name"]} {status_text} {s["ts"]} {s["link"]}
""" return HTMLResponse(content=html, media_type="text/html") # JSON для API (красиво отформатированный) formatted = json.dumps(data, indent=4, ensure_ascii=False) response = Response(content=formatted, media_type="application/json") response.headers["Refresh"] = "1" return response @app.get("/scores") async def scores(): game = get_latest_game_safe() if not game: # игры ещё нет или пришёл только частичный ответ # отдаём пустую структуру, чтобы фронт не падал return [ {"Q": "Q1", "score1": "", "score2": ""}, {"Q": "Q2", "score1": "", "score2": ""}, {"Q": "Q3", "score1": "", "score2": ""}, {"Q": "Q4", "score1": "", "score2": ""}, ] game_data = game["data"] if "data" in game else game result = game_data.get("result", {}) game_info = result.get("game", {}) full_score = game_info.get("fullScore") if not full_score: # поле есть, но ещё пустое/None return [ {"Q": "Q1", "score1": "", "score2": ""}, {"Q": "Q2", "score1": "", "score2": ""}, {"Q": "Q3", "score1": "", "score2": ""}, {"Q": "Q4", "score1": "", "score2": ""}, ] quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"] score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters] full_score_list = full_score.split(",") for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]): parts = score_str.split(":") if len(parts) == 2: score_by_quarter[i]["score1"] = parts[0] score_by_quarter[i]["score2"] = parts[1] return score_by_quarter async def top_sorted_team(data): top_sorted_team = sorted( (p for p in data if p.get("startRole") in ["Player", ""]), key=lambda x: ( x.get("pts", 0), x.get("dreb", 0) + x.get("oreb", 0), x.get("ast", 0), x.get("stl", 0), x.get("blk", 0), x.get("time", "0:00"), ), reverse=True, ) # пустые строки не должны ломать UI процентами фолов/очков for player in top_sorted_team: if player.get("num", "") == "": player["pts"] = "" player["foul"] = "" return top_sorted_team def get_latest_game_safe(): """ Безопасно достаём актуальный game из latest_data. Возвращаем None, если структура ещё не готова или прилетел "плохой" game (например, с {"status": "no-status"} без data/result). """ game = latest_data.get("game") if not game: return None # у нас в latest_data["game"] лежит {"ts": ..., "data": {...}} или сразу {...} # в consumer мы клали {"ts": ..., "data": payload}, так что берём .get("data") if "data" in game: game_data = game["data"] else: # на всякий случай, если где-то клали сразу payload game_data = game if not isinstance(game_data, dict): return None result = game_data.get("result") if not result: return None # если всё ок — вернём в исходном виде (с ts и т.п.) return game async def team(who: str): """ Возвращает данные по команде (team1 / team2) из актуального game. Защищена от ситуации, когда latest_data["game"] ещё не прогрелся или в него прилетел "плохой" ответ от API. """ game = get_latest_game_safe() if not game: # игра ещё не подгружена или структура кривоватая raise HTTPException(status_code=503, detail="game data not ready") # нормализуем доступ к данным game_data = game["data"] if "data" in game else game result = game_data[ "result" ] # здесь уже безопасно, мы проверили в get_latest_game_safe # в result ожидаем "teams" teams = result.get("teams") if not teams: raise HTTPException(status_code=503, detail="game teams not ready") # выбираем команду if who == "team1": payload = next((t for t in teams if t.get("teamNumber") == 1), None) else: payload = next((t for t in teams if t.get("teamNumber") == 2), None) if payload is None: raise HTTPException(status_code=404, detail=f"{who} not found in game data") # дальше — твоя исходная логика формирования ответа по команде # я не знаю весь твой оригинальный код ниже, поэтому вставляю каркас # и показываю, где нужно аккуратно брать plays/box-score из latest_data role_list = [ ("Center", "C"), ("Guard", "G"), ("Forward", "F"), ("Power Forward", "PF"), ("Small Forward", "SF"), ("Shooting Guard", "SG"), ("Point Guard", "PG"), ("Forward-Center", "FC"), ] starts = payload.get("starts", []) team_rows = [] for item in starts: stats = item.get("stats") or {} row = { "id": item.get("personId") or "", "num": item.get("displayNumber"), "startRole": item.get("startRole"), "role": item.get("positionName"), "roleShort": ( [ r[1] for r in role_list if r[0].lower() == (item.get("positionName") or "").lower() ][0] if any( r[0].lower() == (item.get("positionName") or "").lower() for r in role_list ) else "" ), "NameGFX": ( f"{(item.get('firstName') or '').strip()} {(item.get('lastName') or '').strip()}".strip() if item.get("firstName") is not None and item.get("lastName") is not None else "Команда" ), "captain": item.get("isCapitan", False), "age": item.get("age") or 0, "height": f"{item.get('height')} cm" if item.get("height") else 0, "weight": f"{item.get('weight')} kg" if item.get("weight") else 0, "isStart": stats.get("isStart", False), "isOn": "🏀" if stats.get("isOnCourt") is True else "", "flag": ( "https://flagicons.lipis.dev/flags/4x3/" + ( "ru" if item.get("countryId") is None and item.get("countryName") == "Russia" else ( "" if item.get("countryId") is None else ( (item.get("countryId") or "").lower() if item.get("countryName") is not None else "" ) ) ) + ".svg" ), "pts": stats.get("points", 0), "pt-2": f"{stats.get('goal2',0)}/{stats.get('shot2',0)}" if stats else 0, "pt-3": f"{stats.get('goal3',0)}/{stats.get('shot3',0)}" if stats else 0, "pt-1": f"{stats.get('goal1',0)}/{stats.get('shot1',0)}" if stats else 0, "fg": ( f"{stats.get('goal2',0)+stats.get('goal3',0)}/" f"{stats.get('shot2',0)+stats.get('shot3',0)}" if stats else 0 ), "ast": stats.get("assist", 0), "stl": stats.get("steal", 0), "blk": stats.get("block", 0), "blkVic": stats.get("blocked", 0), "dreb": stats.get("defReb", 0), "oreb": stats.get("offReb", 0), "reb": stats.get("defReb", 0) + stats.get("offReb", 0), "to": stats.get("turnover", 0), "foul": stats.get("foul", 0), "foulT": stats.get("foulT", 0), "foulD": stats.get("foulD", 0), "foulC": stats.get("foulC", 0), "foulB": stats.get("foulB", 0), "fouled": stats.get("foulsOn", 0), "plusMinus": stats.get("plusMinus", 0), "dunk": stats.get("dunk", 0), "kpi": ( stats.get("points", 0) + stats.get("defReb", 0) + stats.get("offReb", 0) + stats.get("assist", 0) + stats.get("steal", 0) + stats.get("block", 0) + stats.get("foulsOn", 0) + (stats.get("goal1", 0) - stats.get("shot1", 0)) + (stats.get("goal2", 0) - stats.get("shot2", 0)) + (stats.get("goal3", 0) - stats.get("shot3", 0)) - stats.get("turnover", 0) - stats.get("foul", 0) ), "time": format_time(stats.get("second", 0)), "pts1q": 0, "pts2q": 0, "pts3q": 0, "pts4q": 0, "pts1h": 0, "pts2h": 0, "Name1GFX": (item.get("firstName") or "").strip(), "Name2GFX": (item.get("lastName") or "").strip(), "photoGFX": ( os.path.join( "D:\\Photos", LEAGUE.lower(), result[who]["name"], f"{item.get('displayNumber')}.png", ) if item.get("startRole") == "Player" else "" ), "isOnCourt": stats.get("isOnCourt", False), } team_rows.append(row) # добиваем до 12 строк, чтобы UI был ровный count_player = sum(1 for x in team_rows if x["startRole"] == "Player") if count_player < 12 and team_rows: filler_count = (4 if count_player <= 4 else 12) - count_player template_keys = list(team_rows[0].keys()) for _ in range(filler_count): empty_row = {} for key in template_keys: if key in ["captain", "isStart", "isOnCourt"]: empty_row[key] = False elif key in [ "id", "pts", "weight", "height", "age", "ast", "stl", "blk", "blkVic", "dreb", "oreb", "reb", "to", "foul", "foulT", "foulD", "foulC", "foulB", "fouled", "plusMinus", "dunk", "kpi", ]: empty_row[key] = 0 else: empty_row[key] = "" team_rows.append(empty_row) # сортируем игроков по типу роли: сначала "Player", потом "", потом "Coach" и т.д. role_priority = { "Player": 0, "": 1, "Coach": 2, "Team": 3, None: 4, "Other": 5, } sorted_team = sorted( team_rows, key=lambda x: role_priority.get(x.get("startRole", 99), 99), ) return sorted_team async def started_team(data): started_team = sorted( ( p for p in data if p.get("startRole") == "Player" and p.get("isOnCourt") is True ), key=lambda x: int(x.get("num") or 0), ) return started_team def add_new_team_stat( data: dict, avg_age: float, points, avg_height: float, timeout_str: str, timeout_left: int, ) -> dict: """ Берёт словарь total по команде (очки, подборы, броски и т.д.), добавляет: - проценты попаданий - средний возраст / рост - очки старт / бенч - информацию по таймаутам и всё приводит к строкам (для UI, чтобы не ловить типы). Возвращает обновлённый словарь. """ def safe_int(v): try: return int(v) except (ValueError, TypeError): return 0 def format_percent(goal, shot): goal, shot = safe_int(goal), safe_int(shot) return f"{round(goal * 100 / shot)}%" if shot else "0%" goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1")) goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2")) goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3")) def_reb = safe_int(data.get("defReb")) off_reb = safe_int(data.get("offReb")) data.update( { "pt-1": f"{goal1}/{shot1}", "pt-2": f"{goal2}/{shot2}", "pt-3": f"{goal3}/{shot3}", "fg": f"{goal2 + goal3}/{shot2 + shot3}", "pt-1_pro": format_percent(goal1, shot1), "pt-2_pro": format_percent(goal2, shot2), "pt-3_pro": format_percent(goal3, shot3), "fg_pro": format_percent(goal2 + goal3, shot2 + shot3), "Reb": str(def_reb + off_reb), "avgAge": str(avg_age), "ptsStart": str(points[0]), "ptsStart_pro": str(points[1]), "ptsBench": str(points[2]), "ptsBench_pro": str(points[3]), "avgHeight": f"{avg_height} cm", "timeout_left": str(timeout_left), "timeout_str": str(timeout_str), } ) for k in data: data[k] = str(data[k]) return data def time_outs_func(data_pbp): """ Считает таймауты для обеих команд и формирует читабельные строки вида: "2 Time-outs left in 2nd half" Возвращает: (строка_для_команды1, остаток1, строка_для_команды2, остаток2) """ timeout1 = [] timeout2 = [] for event in data_pbp: if event.get("play") == 23: # 23 == таймаут if event.get("startNum") == 1: timeout1.append(event) elif event.get("startNum") == 2: timeout2.append(event) def timeout_status(timeout_list, last_event: dict): period = last_event.get("period", 0) sec = last_event.get("sec", 0) if period < 3: timeout_max = 2 count = sum(1 for t in timeout_list if t.get("period", 0) <= period) quarter = "1st half" elif period < 5: count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period) quarter = "2nd half" if period == 4 and sec >= 4800 and count in (0, 1): timeout_max = 2 else: timeout_max = 3 else: timeout_max = 1 count = sum(1 for t in timeout_list if t.get("period", 0) == period) quarter = f"OverTime {period - 4}" left = max(0, timeout_max - count) word = "Time-outs" if left != 1 else "Time-out" text = f"{left if left != 0 else 'No'} {word} left in {quarter}" return text, left if not data_pbp: return "", 0, "", 0 last_event = data_pbp[-1] t1_str, t1_left = timeout_status(timeout1, last_event) t2_str, t2_left = timeout_status(timeout2, last_event) return t1_str, t1_left, t2_str, t2_left def add_data_for_teams(new_data): """ Считает командные агрегаты: - средний возраст - очки со старта vs со скамейки, + их проценты - средний рост Возвращает кортеж: (avg_age, [start_pts, start%, bench_pts, bench%], avg_height_cm) """ players = [item for item in new_data if item["startRole"] == "Player"] points_start = 0 points_bench = 0 total_age = 0 total_height = 0 player_count = len(players) for player in players: # print(player) stats = player["stats"] if stats: # print(stats) if stats["isStart"] is True: points_start += stats["points"] elif stats["isStart"] is False: points_bench += stats["points"] total_age += player["age"] total_height += player["height"] total_points = points_start + points_bench points_start_pro = ( f"{round(points_start * 100 / total_points)}%" if total_points else "0%" ) points_bench_pro = ( f"{round(points_bench * 100 / total_points)}%" if total_points else "0%" ) avg_age = round(total_age / player_count, 1) if player_count else 0 avg_height = round(total_height / player_count, 1) if player_count else 0 points = [points_start, points_start_pro, points_bench, points_bench_pro] return avg_age, points, avg_height stat_name_list = [ ("points", "Очки", "points"), ("pt-1", "Штрафные", "free throws"), ("pt-1_pro", "штрафные, процент", "free throws pro"), ("pt-2", "2-очковые", "2-points"), ("pt-2_pro", "2-очковые, процент", "2-points pro"), ("pt-3", "3-очковые", "3-points"), ("pt-3_pro", "3-очковые, процент", "3-points pro"), ("fg", "очки с игры", "field goals"), ("fg_pro", "Очки с игры, процент", "field goals pro"), ("assist", "Передачи", "assists"), ("pass", "", ""), ("defReb", "подборы в защите", ""), ("offReb", "подборы в нападении", ""), ("Reb", "Подборы", "rebounds"), ("steal", "Перехваты", "steals"), ("block", "Блокшоты", "blocks"), ("blocked", "", ""), ("turnover", "Потери", "turnovers"), ("foul", "Фолы", "fouls"), ("foulsOn", "", ""), ("foulT", "", ""), ("foulD", "", ""), ("foulC", "", ""), ("foulB", "", ""), ("second", "секунды", "seconds"), ("dunk", "данки", "dunks"), ("fastBreak", "", "fast breaks"), ("plusMinus", "+/-", "+/-"), ("avgAge", "", "avg Age"), ("ptsBench", "", "Bench PTS"), ("ptsBench_pro", "", "Bench PTS, %"), ("ptsStart", "", "Start PTS"), ("ptsStart_pro", "", "Start PTS, %"), ("avgHeight", "", "avg height"), ("timeout_left", "", "timeout left"), ("timeout_str", "", "timeout str"), ] @app.get("/team_stats") async def team_stats(): teams = latest_data["game"]["data"]["result"]["teams"] plays = latest_data["game"]["data"]["result"]["plays"] team_1 = next((t for t in teams if t["teamNumber"] == 1), None) team_2 = next((t for t in teams if t["teamNumber"] == 2), None) timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays) avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1["starts"]) avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2["starts"]) total_1 = add_new_team_stat( team_1["total"], avg_age_1, points_1, avg_height_1, timeout_str1, timeout_left1, ) total_2 = add_new_team_stat( team_2["total"], avg_age_2, points_2, avg_height_2, timeout_str2, timeout_left2, ) result_json = [] for key in total_1: val1 = total_1[key] val2 = total_2[key] stat_rus = "" stat_eng = "" for metric_name, rus, eng in stat_name_list: if metric_name == key: stat_rus, stat_eng = rus, eng break result_json.append( { "name": key, "nameGFX_rus": stat_rus, "nameGFX_eng": stat_eng, "val1": val1, "val2": val2, } ) return result_json @app.get("/referee") async def referee(): desired_order = [ "Crew chief", "Referee 1", "Referee 2", "Commissioner", "Ст.судья", "Судья 1", "Судья 2", "Комиссар", ] # Найти судей (teamNumber == 0) team_ref = next( ( t for t in latest_data["game"]["data"]["result"]["teams"] if t["teamNumber"] == 0 ), None, ) referees_raw = team_ref.get("starts", []) referees = [] for r in referees_raw: flag_code = r.get("countryId", "").lower() if r.get("countryName") else "" referees.append( { "displayNumber": r.get("displayNumber", ""), "positionName": r.get("positionName", ""), "lastNameGFX": f"{r.get('firstName', '')} {r.get('lastName', '')}".strip(), "secondName": r.get("secondName", ""), "birthday": r.get("birthday", ""), "age": r.get("age", 0), "flag": f"https://flagicons.lipis.dev/flags/4x3/{flag_code}.svg", } ) # Сортировка по позиции referees = sorted( referees, key=lambda x: ( desired_order.index(x["positionName"]) if x["positionName"] in desired_order else len(desired_order) ), ) return referees @app.get("/team_comparison") async def team_comparison(): if STATUS not in ["no_game_today", "finished_today"]: data = latest_data["pregame"]["data"]["result"] teams = [] for data_team in (data["teamStats1"], data["teamStats2"]): temp_team = { "team": data_team["team"]["name"], "games": data_team["games"], "points": round( (data_team["totalStats"]["points"] / data_team["games"]), 1 ), "points_2": round( ( data_team["totalStats"]["goal2"] * 100 / data_team["totalStats"]["shot2"] ), 1, ), "points_3": round( ( data_team["totalStats"]["goal3"] * 100 / data_team["totalStats"]["shot3"] ), 1, ), "points_23": round( ( data_team["totalStats"]["goal23"] * 100 / data_team["totalStats"]["shot23"] ), 1, ), "points_1": round( ( data_team["totalStats"]["goal1"] * 100 / data_team["totalStats"]["shot1"] ), 1, ), "assists": round( (data_team["totalStats"]["assist"] / data_team["games"]), 1 ), "rebounds": round( ( ( data_team["totalStats"]["defRebound"] + data_team["totalStats"]["offRebound"] ) / data_team["games"] ), 1, ), "steals": round( (data_team["totalStats"]["steal"] / data_team["games"]), 1 ), "turnovers": round( (data_team["totalStats"]["turnover"] / data_team["games"]), 1 ), "blocks": round( (data_team["totalStats"]["blockShot"] / data_team["games"]), 1 ), "fouls": round( (data_team["totalStats"]["foul"] / data_team["games"]), 1 ), } teams.append(temp_team) return teams else: return [{"Данных о сравнении команд нет!"}] @app.get("/standings") async def regular_standings(): data = latest_data["actual-standings"]["data"]["items"] for item in data: if item["comp"]["name"] == "Regular Season": if item.get("standings"): standings_rows = item["standings"] df = pd.json_normalize(standings_rows) if "scores" in df.columns: df = df.drop(columns=["scores"]) if ( "totalWin" in df.columns and "totalDefeat" in df.columns and "totalGames" in df.columns and "totalGoalPlus" in df.columns and "totalGoalMinus" in df.columns ): tw = ( pd.to_numeric(df["totalWin"], errors="coerce") .fillna(0) .astype(int) ) td = ( pd.to_numeric(df["totalDefeat"], errors="coerce") .fillna(0) .astype(int) ) df["w_l"] = tw.astype(str) + " / " + td.astype(str) def calc_percent(row): win = row.get("totalWin", 0) games = row.get("totalGames", 0) # гарантируем числа try: win = int(win) except (TypeError, ValueError): win = 0 try: games = int(games) except (TypeError, ValueError): games = 0 if games == 0 or row["w_l"] == "0 / 0": return 0 return round(win * 100 / games + 0.000005) df["procent"] = df.apply(calc_percent, axis=1) tg_plus = ( pd.to_numeric(df["totalGoalPlus"], errors="coerce") .fillna(0) .astype(int) ) tg_minus = ( pd.to_numeric(df["totalGoalMinus"], errors="coerce") .fillna(0) .astype(int) ) df["plus_minus"] = tg_plus - tg_minus standings_payload = df.to_dict(orient="records") return standings_payload @app.get("/live_status") async def live_status(): # если матч реально идёт/вот-вот — пытаемся отдать то, что есть if STATUS in ["live", "live_soon"]: ls = latest_data.get("live-status") if not ls: # live-status ещё не прилетел return [{"foulsA": 0, "foulsB": 0}] raw = ls.get("data") # 1) если это уже готовый dict и в нём есть result → как было раньше if isinstance(raw, dict): # иногда API кладёт всё прямо в root, иногда внутрь result if "result" in raw and isinstance(raw["result"], dict): return [raw["result"]] else: # отдадим как есть, но в списке, чтобы фронт не сломать return [raw] # 2) если это просто строка статуса ("ok" / "no-status" / "error") if isinstance(raw, str): return [{"status": raw}] # fallback return [{"foulsA": 0, "foulsB": 0}] else: # матч не идёт — как у тебя было return [{"foulsA": 0, "foulsB": 0}] if __name__ == "__main__": uvicorn.run("get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="critical")