from fastapi import FastAPI, HTTPException, Request from fastapi.responses import Response, HTMLResponse, StreamingResponse, JSONResponse from fastapi.staticfiles import StaticFiles from contextlib import asynccontextmanager import requests, uvicorn, json import threading, queue import argparse import pandas as pd from datetime import datetime, time as dtime, timedelta from fastapi.responses import Response import logging import logging.config from dotenv import load_dotenv from pprint import pprint import nasio import io, os, platform, time import xml.etree.ElementTree as ET import re from PIL import Image, ImageDraw, ImageFont from io import BytesIO import warnings warnings.filterwarnings( "ignore", message="Data Validation extension is not supported and will be removed", category=UserWarning, module="openpyxl", ) parser = argparse.ArgumentParser() parser = argparse.ArgumentParser() parser.add_argument("--league", default="vtb") parser.add_argument("--team", required=True) parser.add_argument("--lang", default="en") args = parser.parse_args() MYHOST = platform.node() if not os.path.exists("logs"): os.makedirs("logs") def get_fqdn(): system_name = platform.system() if system_name == "Linux": hostname = platform.node().lower() fqdn = f"https://{hostname}.tvstart.ru" else: fqdn = "http://127.0.0.1:8000" return fqdn FQDN = get_fqdn() telegram_bot_token = os.getenv("TELEGRAM_TOKEN") telegram_chat_id = str(os.getenv("TELEGRAM_CHAT_ID")) log_config = { "version": 1, "handlers": { "telegram": { "class": "telegram_handler.TelegramHandler", "level": "INFO", "token": telegram_bot_token, "chat_id": telegram_chat_id, "formatter": "telegram", }, "console": { "class": "logging.StreamHandler", "level": "INFO", "formatter": "simple", "stream": "ext://sys.stdout", }, "file": { "class": "logging.FileHandler", "level": "DEBUG", "formatter": "simple", "filename": f"logs/GFX_{MYHOST}.log", "encoding": "utf-8", }, }, "loggers": { __name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"}, }, "formatters": { "telegram": { "class": "telegram_handler.HtmlFormatter", "format": f"%(levelname)s [{MYHOST.upper()}]\n%(message)s", "use_emoji": "True", }, "simple": { "class": "logging.Formatter", "format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s", "datefmt": "%d.%m.%Y %H:%M:%S", }, }, } logging.config.dictConfig(log_config) logger = logging.getLogger(__name__) logger.handlers[2].formatter.use_emoji = True pprint(f"Локальный файл окружения = {load_dotenv(verbose=True)}") LEAGUE = args.league TEAM = args.team LANG = args.lang HOST = os.getenv("API_BASE_URL") SYNO_PATH_EXCEL = f'{os.getenv("SYNO_PATH_EXCEL")}MATCH INFO.xlsx' SYNO_URL = os.getenv("SYNO_URL") SYNO_USERNAME = os.getenv("SYNO_USERNAME") SYNO_PASSWORD = os.getenv("SYNO_PASSWORD") SYNO_PATH_VMIX = os.getenv("SYNO_PATH_VMIX") SYNO_FONT_PATH = os.getenv("SYNO_FONT_PATH") _syno_font_path = nasio.load_bio( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=os.getenv("SYNO_FONT_PATH"), ) if isinstance(_syno_font_path, BytesIO): _syno_font_path = _syno_font_path.getvalue() SYNO_FONT_PATH = _syno_font_path # bytes или None # ---- ИКОНКА ПРОМАХА ---- _syno_miss_raw = nasio.load_bio( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=os.getenv("SYNO_MISS"), ) if isinstance(_syno_miss_raw, BytesIO): _syno_miss_raw = _syno_miss_raw.getvalue() SYNO_MISS = _syno_miss_raw # bytes или None # ---- ИКОНКА ПОПАДАНИЯ ---- _syno_goal_raw = nasio.load_bio( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=os.getenv("SYNO_GOAL"), ) if isinstance(_syno_goal_raw, BytesIO): _syno_goal_raw = _syno_goal_raw.getvalue() SYNO_GOAL = _syno_goal_raw # bytes или None CALENDAR = None STATUS = False GAME_ID = None SEASON = None GAME_START_DT = None # datetime начала матча (локальная из календаря) GAME_TODAY = False # флаг: игра сегодня GAME_SOON = False # флаг: игра сегодня и скоро (<1 часа) OFFLINE_SWITCH_AT = None # timestamp, когда надо уйти в оффлайн OFFLINE_DELAY_SEC = 600 # 10 минут SLEEP_NOTICE_SENT = False # 👈 чтобы не слать уведомление повторно # --- preload lock --- PRELOAD_LOCK = False # когда True — consumer будет принимать только preloaded game PRELOADED_GAME_ID = None # ID матча, который мы держим «тёплым» PRELOAD_HOLD_UNTIL = None # timestamp, до какого момента держим (T-1:15) # общая очередь results_q = queue.Queue() # тут будем хранить последние данные latest_data = {} # событие для остановки потоков stop_event = threading.Event() # отдельные события для разных наборов потоков stop_event_live = threading.Event() stop_event_offline = threading.Event() # чтобы из consumer можно было их гасить threads_live = [] threads_offline = [] # какой режим сейчас запущен: "live" / "offline" / None CURRENT_THREADS_MODE = None CLEAR_OUTPUT_FOR_VMIX = False EMPTY_PHOTO_PATH = r"D:\Графика\БАСКЕТБОЛ\VTB League\ASSETS\EMPTY.png" # 🔥 кэш картинок в оперативной памяти SHOTMAP_CACHE: dict[str, bytes] = {} # новое хранилище shotmaps по startNum SHOTMAPS: dict[int, dict] = {} SHOTMAPS_LOCK = threading.Lock() URLS = { "seasons": "{host}api/abc/comps/seasons?Tag={league}", "actual-standings": "{host}api/abc/comps/actual-standings?tag={league}&season={season}&lang={lang}", "calendar": "{host}api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000", "game": "{host}api/abc/games/game?Id={game_id}&Lang={lang}", "pregame": "{host}api/abc/games/pregame?tag={league}&season={season}&id={game_id}&lang={lang}", "pregame-full-stats": "{host}api/abc/games/pregame-full-stats?tag={league}&season={season}&id={game_id}&lang={lang}", "live-status": "{host}api/abc/games/live-status?id={game_id}", "box-score": "{host}api/abc/games/box-score?id={game_id}", "play-by-play": "{host}api/abc/games/play-by-play?id={game_id}", "players-stats-league": "{host}/api/abc/comps/players-stats?tag={league}&lang={lang}&maxResultCount=10000", "players-stats-season": "{host}/api/abc/comps/players-stats?tag={league}&season={season}&lang={lang}&maxResultCount=10000", } def maybe_clear_for_vmix(payload): """ Если включён режим очистки — возвращаем payload, где все значения заменены на "". Иначе — возвращаем как есть. """ if CLEAR_OUTPUT_FOR_VMIX: return wipe_json_values(payload) return payload def start_offline_threads(season, game_id): """Запускаем редкие запросы, когда матча нет или он уже сыгран.""" global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data if CURRENT_THREADS_MODE == "offline": logger.debug("[threads] already in OFFLINE mode → skip start_offline_threads") return stop_live_threads() stop_offline_threads() logger.info("[threads] switching to OFFLINE mode ...") stop_event_offline.clear() threads_offline = [ threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=game_id, lang=LANG), 150, # опрашиваем раз в секунду/реже stop_event_offline, False, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "pregame-full-stats", URLS["pregame-full-stats"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 300, stop_event_offline, False, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "actual-standings", URLS["actual-standings"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ), 300, stop_event_offline, False, True, ), daemon=True, ), ] for t in threads_offline: t.start() CURRENT_THREADS_MODE = "offline" logger.info("[threads] OFFLINE threads started (data cleaned)") def start_live_threads(season, game_id): """Запускаем частые онлайн-запросы, когда матч идёт/вот-вот.""" global threads_live, CURRENT_THREADS_MODE, stop_event_live # если уже в лайве — не дублируем if CURRENT_THREADS_MODE == "live": return # на всякий случай гасим офлайн stop_offline_threads() stop_event_live.clear() threads_live = [ threading.Thread( target=get_data_from_API, args=( "pregame", URLS["pregame"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 300, stop_event_live, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "pregame-full-stats", URLS["pregame-full-stats"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 300, stop_event_live, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "actual-standings", URLS["actual-standings"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ), 300, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=game_id, lang=LANG), 150, # часто stop_event_live, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "live-status", URLS["live-status"].format(host=HOST, game_id=game_id), 0.5, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "box-score", URLS["box-score"].format(host=HOST, game_id=game_id), 0.5, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "play-by-play", URLS["play-by-play"].format(host=HOST, game_id=game_id), 1, stop_event_live, ), daemon=True, ), ] for t in threads_live: t.start() CURRENT_THREADS_MODE = "live" logger.info("[threads] LIVE threads started") def stop_live_threads(): """Гасим только live-треды.""" global threads_live current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") if not threads_live: logger.info("[threads] LIVE threads stopped (nothing to stop)") return logger.info(f"[threads] Stopping {len(threads_live)} LIVE thread(s)...") stop_event_live.set() still_alive = [] for t in threads_live: t.join(timeout=2) if t.is_alive(): logger.warning( f"[{current_time}] [threads] LIVE thread is still alive: {t.name}" ) still_alive.append(t.name) threads_live = [] if still_alive: logger.warning( f"[{current_time}] [threads] Some LIVE threads did not stop: {still_alive}" ) else: logger.info("[threads] LIVE threads stopped") CURRENT_THREADS_MODE = None # 👈 сбрасываем режим def stop_offline_threads(): """Гасим только offline-треды.""" global threads_offline if not threads_offline: return stop_event_offline.set() for t in threads_offline: t.join(timeout=1) threads_offline = [] CURRENT_THREADS_MODE = None # 👈 сбрасываем режим logger.info("[threads] OFFLINE threads stopped") def has_full_game_ready() -> bool: game = latest_data.get("game") if not game: return False payload = game.get("data", game) return ( isinstance(payload, dict) and isinstance(payload.get("data"), dict) and isinstance(payload["data"].get("result"), dict) and "teams" in payload["data"]["result"] ) # Функция запускаемая в потоках def get_data_from_API( name: str, url: str, sleep_time: float, stop_event: threading.Event, stop_when_live=False, stop_after_success: bool = False, # 👈 флаг "останавливаемся после ОК" ): did_first_fetch = False while not stop_event.is_set(): current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") # останов при live + полная игра уже есть if ( stop_when_live and globals().get("STATUS") == "live" and has_full_game_ready() ): logger.info( f"{[{current_time}]} [{name}] stopping because STATUS='live' and full game is ready" ) break # ---------- запрос к API ---------- try: value = requests.get(url, timeout=5).json() did_first_fetch = True # помечаем, что один заход сделали except json.JSONDecodeError as json_err: logger.warning( f"[{current_time}] [{name}] Ошибка парсинга JSON: {json_err}" ) value = {"error": f"JSON decode error: {json_err}"} except requests.exceptions.Timeout: logger.warning(f"[{current_time}] [{name}] Таймаут при запросе {url}") value = {"error": "timeout"} except requests.exceptions.RequestException as req_err: logger.warning(f"[{current_time}] [{name}] Ошибка запроса: {req_err}") value = {"error": str(req_err)} except Exception as ex: logger.warning(f"[{current_time}] [{name}] Неизвестная ошибка: {ex}") value = {"error": str(ex)} # ---------- проверка статуса ответа ---------- # если API сам вернул status = error/fail/no-status if isinstance(value, dict) and str(value.get("status", "")).lower() in ( "error", "fail", "no-status", ): logger.warning( f"[{current_time}] [{name}] API вернул статус '{value.get('status')}'" ) ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] results_q.put({"source": name, "ts": ts, "data": value}) logger.debug(f"[{ts}] name: {name}, status: {value.get('status', 'no-status')}") ok_status = not ( isinstance(value, dict) and ( str(value.get("status", "")).lower() in ("error", "fail", "no-status") or "error" in value ) ) # print(name, ok_status) # ---------- быстрый retry при плохом ответе ---------- if not ok_status: # короткая задержка, чтобы не ушатать API частыми запросами quick_delay = min(2, sleep_time if sleep_time > 0 else 1) logger.warning( f"[{current_time}] [{name}] плохой ответ (status={value.get('status', 'no-status')}) → быстрый повтор через {quick_delay} сек." ) slept_q = 0.0 while slept_q < quick_delay: if stop_event.is_set(): break if ( stop_when_live and globals().get("STATUS") == "live" and has_full_game_ready() ): logger.info( f"[{name}] stopping during quick retry sleep because STATUS='live' and full game is ready" ) return time.sleep(0.5) slept_q += 0.5 # сразу на новую попытку, без длинного sleep_time continue # ---------- успешный ответ ---------- if stop_after_success and ok_status: logger.info( f"[{name}] got successful response → stopping thread (stop_after_success)" ) return # ---------- обычный сон между успешными запросами ---------- slept = 0 while slept < sleep_time: if stop_event.is_set(): break if ( stop_when_live and globals().get("STATUS") == "live" and has_full_game_ready() ): logger.info( f"[{name}] stopping during sleep because STATUS='live' and full game is ready" ) return time.sleep(1) slept += 1 # Получение результатов из всех запущенных потоков def results_consumer(): while not stop_event.is_set(): # ⬇️ проверяем, не пора ли в оффлайн (отложенный переход) current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") off_at = globals().get("OFFLINE_SWITCH_AT") if off_at is not None and time.time() >= off_at: # делаем переход ТОЛЬКО если ещё не оффлайн if globals().get("CURRENT_THREADS_MODE") != "offline": logger.info("[status] switching to OFFLINE (delayed)") stop_live_threads() start_offline_threads(SEASON, GAME_ID) # чтобы не повторять globals()["OFFLINE_SWITCH_AT"] = None try: msg = results_q.get(timeout=0.5) except queue.Empty: continue try: source = msg.get("source") payload = msg.get("data") or {} # универсальный статус (может не быть) incoming_status = payload.get("status") # может быть None # print(source, incoming_status) if source == "game": # принимаем ТОЛЬКО тот game_id, который держим в PRELOAD_LOCK try: if PRELOAD_LOCK: incoming_gid = extract_game_id_from_payload(payload) if not incoming_gid or str(incoming_gid) != str( PRELOADED_GAME_ID ): logger.debug( f"results_consumer: skip game (gid={incoming_gid}) due to PRELOAD_LOCK; keep {PRELOADED_GAME_ID}" ) continue except Exception as _e: logger.debug(f"results_consumer: preload lock check error: {_e}") else: latest_data[source] = { "ts": msg["ts"], "data": incoming_status if incoming_status is not None else payload, } # 1) play-by-play if "play-by-play" in source: game = latest_data.get("game") # если игра уже нормальная — приклеиваем плейи if ( game and isinstance(game, dict) and "data" in game and "result" in game["data"] ): # у pbp тоже может не быть data/result if "result" in payload: game["data"]["result"]["plays"] = payload["result"] # а вот статус у play-by-play иногда просто "no-status" # 2) box-score elif "box-score" in source: game = latest_data.get("game") if ( game and "data" in game and "result" in game["data"] and "teams" in game["data"]["result"] and "result" in payload and "teams" in payload["result"] ): # обновляем команды game["data"]["result"]["game"]["fullScore"] = payload["result"][ "fullScore" ] game["data"]["result"]["game"][ "score" ] = f'{payload["result"]["teams"][0]["total"]["points"]}:{payload["result"]["teams"][1]["total"]["points"]}' for team in game["data"]["result"]["teams"]: if team["teamNumber"] != 0: box_team = [ t for t in payload["result"]["teams"] if t["teamNumber"] == team["teamNumber"] ] if not box_team: print("ERROR: box-score team not found") continue box_team = box_team[0] for player in team["starts"]: box_player = [ p for p in box_team["starts"] if p["startNum"] == player["startNum"] ] if box_player: player["stats"] = box_player[0] team["total"] = box_team["total"] team["startTotal"] = box_team["startTotal"] team["benchTotal"] = box_team["benchTotal"] team["maxLeading"] = box_team["maxLeading"] team["pointsInRow"] = box_team["pointsInRow"] team["maxPointsInRow"] = box_team["maxPointsInRow"] elif "live-status" in source: latest_data[source] = { "ts": msg["ts"], "data": payload, } try: ls_data = payload.get("result") or payload raw_ls_status = ( ls_data.get("status") or ls_data.get("gameStatus") or ls_data.get("state") ) if raw_ls_status: raw_ls_status_low = str(raw_ls_status).lower() finished_markers = [ "finished", "result", "resultconfirmed", "ended", "final", "game over", ] # 1) матч ЗАКОНЧЕН → запускаем ОТСРОЧЕННЫЙ переход # ##TODO - Усложненый код? Нужен, если статус бывает сложнее, чем "result", # а что-то в стиле "result 1:0", т.е. слова из finished_markers являются # состовной частью настоящего статуса # В противном случае вполне рабочий вариант: # if raw_ls_status_low in finished_markers: if any(m in raw_ls_status_low for m in finished_markers): now_ts = time.time() # если ещё не назначали переход — назначим if globals().get("OFFLINE_SWITCH_AT") is None: switch_at = now_ts + globals().get( "OFFLINE_DELAY_SEC", 600 ) globals()["OFFLINE_SWITCH_AT"] = switch_at # статус тоже обозначим, что он завершён, но ждёт if ( GAME_START_DT and GAME_START_DT.date() == datetime.now().date() ): globals()["STATUS"] = "finished_wait" globals()["CLEAR_OUTPUT_FOR_VMIX"] = False else: globals()["STATUS"] = "finished_wait" globals()["CLEAR_OUTPUT_FOR_VMIX"] = False human_time = datetime.fromtimestamp(switch_at).strftime( "%H:%M:%S" ) logger.info( f"[status] match finished → will switch to OFFLINE at {human_time} " f"(in {globals().get('OFFLINE_DELAY_SEC', 600)}s)" ) else: # уже ждём — можно в debug logger.debug( "[status] match finished → OFFLINE already scheduled" ) # 2) матч снова стал онлайном → СБРАСЫВАЕМ отложенный переход elif ( "online" in raw_ls_status_low or "live" in raw_ls_status_low ): # если до этого стояла отложка — уберём globals()[ "CLEAR_OUTPUT_FOR_VMIX" ] = False # 👈 выключаем очистку if globals().get("OFFLINE_SWITCH_AT") is not None: logger.info( "[status] match back to LIVE → cancel scheduled OFFLINE" ) globals()["OFFLINE_SWITCH_AT"] = None if globals().get("STATUS") != "live": logger.info( "[status] match became LIVE → switch to LIVE threads" ) globals()["STATUS"] = "live" start_live_threads(SEASON, GAME_ID) except Exception as e: logger.warning( f"[{current_time}] results_consumer: live-status postprocess error: {e}" ) else: if source == "game": has_game_already = "game" in latest_data and isinstance( latest_data.get("game"), dict ) # Полная структура? is_full = ( isinstance(payload, dict) and "data" in payload and isinstance(payload["data"], dict) and "result" in payload["data"] and "teams" in payload["data"]["result"] ) # ⚙️ ЛОГИКА: # 1) Пока матч НЕ online (STATUS != 'live'): обновляем всегда, # чтобы /status видел "живость" раз в 5 минут независимо от полноты JSON. if globals().get("STATUS") != "live": latest_data["game"] = {"ts": msg["ts"], "data": payload} logger.debug( "results_consumer: pre-live game → updated (full=%s)", is_full, ) else: # ✅ если игры ещё НЕТ в кэше — примем ПЕРВЫЙ game даже неполный, # чтобы box-score/play-by-play могли его дорастить if is_full or not has_game_already: latest_data["game"] = {"ts": msg["ts"], "data": payload} logger.debug( "results_consumer: LIVE → stored (full=%s, had=%s)", is_full, has_game_already, ) else: logger.debug( "results_consumer: LIVE & partial game → keep previous one" ) continue else: latest_data[source] = { "ts": msg["ts"], "data": payload, } continue # ... остальная обработка ... except Exception as e: logger.warning(f"[{current_time}] results_consumer error: {repr(e)}") continue def get_items(data: dict) -> list: """ Мелкий хелпер: берём первый список в ответе API. Многие ручки отдают {"result":[...]} или {"seasons":[...]}. Если находим список — возвращаем его. Если нет — возвращаем None (значит, нужно брать весь dict). ВНИМАНИЕ: если списков в data несколько - вернет случайный """ for k, v in data.items(): if isinstance(v, list): return data[k] return None def pick_game_for_team(calendar_json): """ Возвращает: game_id: str | None game_dt: datetime | None is_today: bool cal_status: str | None # Scheduled / Online / Result / ResultConfirmed Логика: 1. если в календаре есть игра КОМАНДЫ на сегодня — берём ЕЁ и возвращаем её gameStatus 2. иначе — берём последнюю прошедшую и тоже возвращаем её gameStatus """ items = get_items(calendar_json) if not items: return None, None, False, None today = datetime.now().date() # 1) сначала — сегодняшняя for game in reversed(items): if game["team1"]["name"].lower() != TEAM.lower(): continue gdt = extract_game_datetime(game) gdate = ( gdt.date() if gdt else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date() ) if gdate == today: cal_status = game["game"].get("gameStatus") return game["game"]["id"], gdt, True, cal_status # 2) если на сегодня нет — берём последнюю прошедшую # TODO - код повторяется почти без изменений. # Можно без сожаления свести в один проход. last_id = None last_dt = None last_status = None for game in reversed(items): if game["team1"]["name"].lower() != TEAM.lower(): continue gdt = extract_game_datetime(game) gdate = ( gdt.date() if gdt else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date() ) if gdate <= today: last_id = game["game"]["id"] last_dt = gdt last_status = game["game"].get("gameStatus") break return last_id, last_dt, False, last_status def extract_game_datetime(game_item: dict) -> datetime | None: """ Из элемента календаря достаём datetime матча. В календаре есть localDate и часто localTime. Если localTime нет — берём 00:00. """ try: date_str = game_item["game"]["localDate"] # '31.10.2025' dt_date = datetime.strptime(date_str, "%d.%m.%Y").date() time_str = game_item["game"].get("defaultZoneTime") # '19:30' if time_str: hh, mm = map(int, time_str.split(":")) dt_time = dtime(hour=hh, minute=mm) else: dt_time = dtime(hour=0, minute=0) return datetime.combine(dt_date, dt_time) except Exception: return None def build_pretty_status_message(): """ Собирает одно красивое сообщение про текущее состояние онлайна. Если game ещё нет — шлём хотя бы статусы источников. """ lines = [] cgid = get_cached_game_id() lines.append(f"🏀 {LEAGUE.upper()} • {TEAM}") lines.append(f"📌 Game ID: {cgid or GAME_ID}") lines.append(f"🕒 {GAME_START_DT}") # сначала попробуем собрать нормальный game game_wrap = latest_data.get("game") has_game = False if game_wrap: raw = game_wrap.get("data") if isinstance(game_wrap, dict) else game_wrap # raw может быть: dict (полный payload) | dict (уже result) | str ("ok"/"no-status") result = {} if isinstance(raw, dict): # ваш нормальный полный ответ по game имеет структуру: {"data": {"result": {...}}} # но на всякий случай поддержим и вариант, где сразу {"result": {...}} или уже {"game": ...} result = ( raw.get("data", {}).get("result", {}) if "data" in raw else (raw.get("result") or raw) ) else: result = {} game_info = result.get("game") or {} team1_name = (result.get("team1") or {}).get("name", "Team 1") team2_name = (result.get("team2") or {}).get("name", "Team 2") lines.append(f"👥 {team1_name} vs {team2_name}") score_now = game_info.get("score") or "" full_score = game_info.get("fullScore") or "" if score_now: lines.append(f"🔢 Score: {score_now}") if isinstance(full_score, str) and full_score: quarters = full_score.split(",") q_text = " | ".join(f"Q{i+1} {q}" for i, q in enumerate(quarters) if q) if q_text: lines.append(f"🧱 By quarters: {q_text}") has_game = bool(result) # live-status отдельно # ls = latest_data.get("live-status", {}) # ls_raw = ls.get("data") or {} # ls_status = ( # ls_raw.get("status") or ls_raw.get("gameStatus") or ls_raw.get("state") or "—" # ) # lines.append(f"🟢 LIVE status: {ls_status}") ls_wrap = latest_data.get("live-status") ls_status = "—" if ls_wrap: raw = ls_wrap.get("data") if isinstance(raw, dict): ls_dict = raw.get("result") or raw ls_status = ( ls_dict.get("status") or ls_dict.get("gameStatus") or ls_dict.get("state") or "—" ) elif isinstance(raw, str): # API/consumer могли положить просто строку статуса: "ok", "no-status", "error" ls_status = raw lines.append(f"🟢 LIVE status: {ls_status}") # добавим блок по источникам — это как раз “состояние запросов” sort_order = ["game", "live-status", "box-score", "play-by-play"] keys = [k for k in sort_order if k in latest_data] + sorted( [k for k in latest_data if k not in sort_order] ) src_lines = [] for k in keys: if "excel" not in k.lower(): d = latest_data.get(k) or {} ts = d.get("ts", "—") dat = d.get("data") if isinstance(dat, dict) and "status" in dat: st = str(dat["status"]).lower() else: st = str(dat).lower() # Эмодзи-кружки для статусов if any(x in st for x in ["ok", "success", "live", "online"]): emoji = "🟢" elif any( x in st for x in ["error", "fail", "no-status", "none", "timeout"] ): emoji = "🔴" else: emoji = "🟡" src_lines.append(f"{emoji} {k}: {st} ({ts})") if src_lines: lines.append("📡 Sources:") lines.extend(src_lines) # даже если game не успел — мы всё равно что-то вернём return "\n".join(lines) def status_broadcaster(): """ Если матч live — сразу шлём статус. Потом — раз в 5 минут. Если матч не live — ждём и проверяем снова. """ INTERVAL = 300 # 5 минут last_text = None first_live_sent = False while not stop_event.is_set(): # если игра не идёт — спим по чуть-чуть и крутимся if STATUS not in ("live", "live_soon"): first_live_sent = False # чтобы при новом лайве снова сразу отправить time.sleep(5) continue # сюда попадаем только если live text = build_pretty_status_message() if text and text != last_text: logger.info(text) last_text = text first_live_sent = True # после первого лайва ждём 5 минут, а до него — 10 секунд wait_sec = INTERVAL if first_live_sent else 10 for _ in range(wait_sec): if stop_event.is_set(): break time.sleep(1) def get_cached_game_id() -> str | None: game = latest_data.get("game") if not game: return None payload = game.get("data", game) if not isinstance(payload, dict): return None # структура может быть {"data":{"result":{...}}} или {"result":{...}} result = ( payload.get("data", {}).get("result") if "data" in payload else payload.get("result") ) if not isinstance(result, dict): return None g = result.get("game") if isinstance(g, dict): return g.get("id") return None def extract_game_id_from_payload(payload: dict) -> str | None: if not isinstance(payload, dict): return None root = payload.get("data") if isinstance(payload.get("data"), dict) else payload res = root.get("result") if isinstance(root.get("result"), dict) else None if not isinstance(res, dict): return None g = res.get("game") if isinstance(g, dict): return g.get("id") return None def start_offline_prevgame(season, prev_game_id: str): """ Специальный оффлайн для ПРЕДЫДУЩЕЙ игры: - гасит любые текущие треды - запускает только 'game' для prev_game_id - НЕ останавливается после первого 'ok' (stop_after_success=False) """ global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data # всегда переключаемся чисто stop_live_threads() stop_offline_threads() logger.info("[threads] switching to OFFLINE mode (previous game) ...") stop_event_offline.clear() threads_offline = [ threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=prev_game_id, lang=LANG), 150, # редкий опрос stop_event_offline, False, # stop_when_live False, # ✅ stop_after_success=False (держим тред) ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "pregame-full-stats", URLS["pregame-full-stats"].format( host=HOST, league=LEAGUE, season=season, game_id=prev_game_id, lang=LANG, ), 600, stop_event_offline, False, False, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "actual-standings", URLS["actual-standings"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ), 600, stop_event_offline, False, False, ), daemon=True, ), ] for t in threads_offline: t.start() CURRENT_THREADS_MODE = "offline" logger.info(f"[threads] OFFLINE prev-game thread started for {prev_game_id}") def start_prestart_watcher(game_dt: datetime | None): """ Логика на день игры: 1) Немедленно подгружаем ДАННЫЕ ПРОШЛОГО МАТЧА (один раз, оффлайн-поток 'game'), чтобы программа имела данные до старта. 2) Ровно за 1:15 до начала — СБРАСЫВАЕМ эти данные (останавливаем оффлайн, чистим latest_data). 3) Ровно за 1:10 до начала — ВКЛЮЧАЕМ LIVE-треды. """ if not game_dt: return # разовое уведомление о "спячке" global SLEEP_NOTICE_SENT, STATUS, SEASON, GAME_ID now = datetime.now() if not SLEEP_NOTICE_SENT and game_dt > now: logger.info( "🛌 Тред ушёл в спячку до начала игры.\n" f"⏰ Матч начинается сегодня в {game_dt.strftime('%H:%M')}." ) SLEEP_NOTICE_SENT = True def _runner(): from datetime import time as dtime # для резервного парсинга времени global STATUS PRELOAD_LEAD = timedelta(hours=1, minutes=15) # T-1:15 → сброс LIVE_LEAD = timedelta(hours=1, minutes=10) # T-1:10 → live RESET_AT = game_dt - PRELOAD_LEAD LIVE_AT = game_dt - LIVE_LEAD PRELOAD_MAXWAIT_SEC = 180 # ждём до 3 мин готовности full game при предзагрузке did_preload = False did_reset = False did_live = False # --- вспомогательное: поиск предыдущей игры команды ДО сегодняшнего матча --- def _find_prev_game_id( calendar_json: dict, cutoff_dt: datetime ) -> tuple[str | None, datetime | None]: items = get_items(calendar_json) or [] prev_id, prev_dt = None, None team_norm = (TEAM or "").strip().casefold() for g in reversed(items): try: t1 = (g["team1"]["name"] or "").strip().casefold() if team_norm not in (t1): continue except Exception: continue gdt = extract_game_datetime(g) if not gdt: try: gd = datetime.strptime( g["game"]["localDate"], "%d.%m.%Y" ).date() gdt = datetime.combine(gd, dtime(0, 0)) except Exception: continue if gdt < cutoff_dt: prev_id, prev_dt = g["game"]["id"], gdt break return prev_id, prev_dt # --- Шаг 1: сразу включаем оффлайн по ПРЕДЫДУЩЕЙ игре и держим до T-1:15 --- try: now = datetime.now() if now < RESET_AT: calendar_resp = requests.get( URLS["calendar"].format( host=HOST, league=LEAGUE, season=SEASON, lang=LANG ), timeout=6, ).json() prev_game_id, prev_game_dt = _find_prev_game_id(calendar_resp, game_dt) # print(prev_game_id, prev_game_dt) if prev_game_id and str(prev_game_id) != str(GAME_ID): logger.info( f"[preload] старт оффлайна по предыдущей игре {prev_game_id} ({prev_game_dt})" ) # включаем «замок», чтобы consumer принимал только старую игру globals()["PRELOAD_LOCK"] = True globals()["PRELOADED_GAME_ID"] = str(prev_game_id) globals()["PRELOAD_HOLD_UNTIL"] = RESET_AT.timestamp() # поднимаем один оффлайн-тред по старой игре (без stop_after_success) start_offline_prevgame(SEASON, prev_game_id) did_preload = True else: logger.warning("[preload] предыдущая игра не найдена — пропускаем") else: logger.info( "[preload] уже поздно для предзагрузки (прошло T-1:15) — пропуск" ) except Exception as e: logger.warning(f"[preload] ошибка предзагрузки прошлой игры: {e}") # --- Основной цикл ожидания контрольных моментов --- while not stop_event.is_set(): now = datetime.now() # если матч уже в другом конечном состоянии — выходим if STATUS in ("live", "finished", "finished_wait", "finished_today"): break # Шаг 2: ровно T-1:15 — сбрасываем предзагруженные данные if not did_reset and now >= RESET_AT: logger.info( f"[reset] {now:%H:%M:%S} → T-1:15: сбрасываем предзагруженные данные" ) try: stop_offline_threads() # на всякий # for key in latest_data: # latest_data[key] = wipe_json_values(latest_data[key]) # latest_data.clear() # полный сброс кэша # снять замок предзагрузки globals()["PRELOAD_LOCK"] = False globals()["PRELOADED_GAME_ID"] = None globals()["PRELOAD_HOLD_UNTIL"] = None logger.info( "[reset] latest_data очищен; ждём T-1:10 для запуска live" ) except Exception as e: logger.warning(f"[reset] ошибка при очистке: {e}") globals()["CLEAR_OUTPUT_FOR_VMIX"] = True did_reset = True # Шаг 3: T-1:10 — включаем live-треды if not did_live and now >= LIVE_AT: logger.info( f"[prestart] {now:%H:%M:%S}, игра в {game_dt:%H:%M}, включаем LIVE threads по правилу T-1:10" ) STATUS = "live_soon" globals()[ "CLEAR_OUTPUT_FOR_VMIX" ] = False # можно оставить пустоту до первых живых данных stop_offline_threads() # на всякий случай start_live_threads(SEASON, GAME_ID) did_live = True break time.sleep(15) t = threading.Thread(target=_runner, daemon=True) t.start() def get_excel(): return nasio.load_formatted( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=SYNO_PATH_EXCEL, # sheet="TEAMS LEGEND", ) def excel_worker(): """ Раз в минуту читает ВСЕ вкладки Excel и сохраняет их в latest_data с префиксом excel_. """ global latest_data while not stop_event.is_set(): try: sheets = get_excel() # <- теперь это dict: {sheet_name: DataFrame} ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] if isinstance(sheets, dict): for sheet_name, df in sheets.items(): # пропускаем странные объекты if not hasattr(df, "fillna"): # logger.warning(f"[excel] Лист '{sheet_name}' не DataFrame") continue # ЧИСТИМ NaN и конвертируем df = df.fillna("") data_json = df.to_dict(orient="records") # ключ в latest_data: excel_<имя_вкладки> key = f"excel_{sheet_name}".replace(" ", "_").replace("-", "_") latest_data[key] = { "ts": ts, "data": data_json, } # logger.info("[excel] Все вкладки Excel обновлены") else: pass # logger.warning("[excel] get_excel() вернул не словарь") except Exception as e: logger.warning(f"[excel] ошибка при чтении Excel: {e}") # пауза 60 сек for _ in range(60): if stop_event.is_set(): break time.sleep(1) @asynccontextmanager async def lifespan(app: FastAPI): global STATUS, GAME_ID, SEASON, GAME_START_DT, GAME_TODAY, GAME_SOON, CALENDAR # 1. проверяем API: seasons try: seasons_resp = requests.get( URLS["seasons"].format(host=HOST, league=LEAGUE) ).json() season = seasons_resp["items"][0]["season"] except Exception: now = datetime.now() if now.month > 9: season = now.year + 1 else: season = now.year logger.info(f"предположили номер сезона: {season}") SEASON = season # 2. берём календарь try: calendar = requests.get( URLS["calendar"].format(host=HOST, league=LEAGUE, season=season, lang=LANG) ).json() except Exception as ex: logger.error(f"не получилось проверить работу API. код ошибки: {ex}") # тут можно вообще не запускать сервер, но оставим как есть calendar = None CALENDAR = calendar # 3. определяем игру game_id, game_dt, is_today, cal_status = ( pick_game_for_team(calendar) if calendar else (None, None, False, None) ) GAME_ID = game_id GAME_START_DT = game_dt GAME_TODAY = is_today logger.info(f"Лига: {LEAGUE}\nСезон: {season}\nКоманда: {TEAM}\nGame ID: {game_id}") thread_excel = threading.Thread( target=excel_worker, daemon=True, ) thread_excel.start() try: season_stats = requests.get( URLS["players-stats-season"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ) ).json() except Exception as ex: season_stats = None try: league_stats = requests.get( URLS["players-stats-league"].format(host=HOST, league=LEAGUE, lang=LANG) ).json() except Exception as ex: league_stats = None # 👇 ДОБАВЬ ЭТО ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] if season_stats is not None: latest_data["season_stats"] = { "ts": ts, "data": season_stats, } if league_stats is not None: latest_data["league_stats"] = { "ts": ts, "data": league_stats, } # 👆 ДО СЮДА # 4. запускаем "длинные" потоки (они у тебя и так всегда) thread_result_consumer = threading.Thread( target=results_consumer, daemon=True, ) thread_result_consumer.start() thread_status_broadcaster = threading.Thread( target=status_broadcaster, daemon=True, ) thread_status_broadcaster.start() # новый поток для shotmaps thread_shotmap = threading.Thread( target=shotmap_worker, daemon=True, ) thread_shotmap.start() # 5. решаем, что запускать if not is_today: STATUS = "no_game_today" start_offline_threads(SEASON, GAME_ID) else: # игра сегодня # в любом случае запускаем сторож, если знаем время игры start_prestart_watcher(game_dt) if cal_status is None: STATUS = "today_not_started" start_offline_threads(SEASON, GAME_ID) elif cal_status == "Scheduled": if game_dt: delta = game_dt - datetime.now() # если мы уже МЕНЬШЕ чем за 1:10 до игры — сразу в live if delta <= timedelta(hours=1, minutes=10): STATUS = "live_soon" start_live_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" # start_offline_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" # start_offline_threads(SEASON, GAME_ID) elif cal_status == "Online": STATUS = "live" start_live_threads(SEASON, GAME_ID) elif cal_status in ["Result", "ResultConfirmed"]: STATUS = "finished_today" start_offline_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" start_offline_threads(SEASON, GAME_ID) yield # -------- shutdown -------- stop_event.set() stop_live_threads() stop_offline_threads() thread_result_consumer.join(timeout=1) thread_status_broadcaster.join(timeout=1) thread_excel.join(timeout=1) thread_shotmap.join(timeout=1) app = FastAPI( lifespan=lifespan, docs_url=None, # ❌ отключает /docs redoc_url=None, # ❌ отключает /redoc openapi_url=None, # ❌ отключает /openapi.json ) def format_time(seconds: float | int) -> str: """ Удобный формат времени для игроков: 71 -> "1:11" 0 -> "0:00" Любые кривые значения -> "0:00". """ try: total_seconds = int(float(seconds)) minutes = total_seconds // 60 sec = total_seconds % 60 return f"{minutes}:{sec:02}" except (ValueError, TypeError): return "0:00" @app.get("/team1") async def team1(): game = get_latest_game_safe("game") if not game: # если данных вообще нет (ещё ни одной игры) — тут реально нечего отдавать raise HTTPException(status_code=503, detail="game data not ready") data = await team("team1") return maybe_clear_for_vmix(data) @app.get("/team2") async def team2(): game = get_latest_game_safe("game") if not game: raise HTTPException(status_code=503, detail="game data not ready") data = await team("team2") return maybe_clear_for_vmix(data) @app.get("/top_team1") async def top_team1(): data = await team("team1") top = await top_sorted_team(data) return maybe_clear_for_vmix(top) @app.get("/top_team2") async def top_team2(): data = await team("team2") top = await top_sorted_team(data) return maybe_clear_for_vmix(top) def _b(v) -> bool: if isinstance(v, bool): return v if isinstance(v, (int, float)): return v != 0 if isinstance(v, str): return v.strip().lower() in ("1", "true", "yes", "on") return False def _placeholders(n=5): return [ { "NameGFX": "", "Name1GFX": "", "Name2GFX": "", "isOnCourt": False, "num": "", "photoGFX": EMPTY_PHOTO_PATH, } for _ in range(n) ] def wipe_json_values(obj): """ Рекурсивно заменяет все значения JSON на пустые строки. Если ключ содержит "photo", заменяет значение на EMPTY_PHOTO_PATH. """ # если словарь — обрабатываем ключи if isinstance(obj, dict): new_dict = {} for k, v in obj.items(): if "photo" in str(k).lower(): # ключ содержит photo → отдаём пустую картинку new_dict[k] = EMPTY_PHOTO_PATH else: new_dict[k] = wipe_json_values(v) return new_dict # если список — рекурсивно обработать элементы elif isinstance(obj, list): return [wipe_json_values(v) for v in obj] # любое конечное значение → "" else: return " " @app.get("/started_team1") async def started_team1(sort_by: str = None): data = await team("team1") players = await started_team(data) or [] # нормализуем флаги for p in players: p["isStart"] = _b(p.get("isStart", False)) p["isOnCourt"] = _b(p.get("isOnCourt", False)) if sort_by and sort_by.strip().lower() == "isstart": starters = [p for p in players if p["isStart"]] return maybe_clear_for_vmix(starters[:5] if starters else _placeholders(5)) if sort_by and sort_by.strip().lower() == "isoncourt": on_court = [p for p in players if p["isOnCourt"]] return maybe_clear_for_vmix(on_court[:5] if on_court else _placeholders(5)) # дефолт — без фильтра, как раньше return maybe_clear_for_vmix(players) @app.get("/started_team2") async def started_team2(sort_by: str = None): data = await team("team2") players = await started_team(data) or [] for p in players: p["isStart"] = _b(p.get("isStart", False)) p["isOnCourt"] = _b(p.get("isOnCourt", False)) if sort_by and sort_by.strip().lower() == "isstart": starters = [p for p in players if p["isStart"]] return maybe_clear_for_vmix(starters[:5] if starters else _placeholders(5)) if sort_by and sort_by.strip().lower() == "isoncourt": on_court = [p for p in players if p["isOnCourt"]] return maybe_clear_for_vmix(on_court[:5] if on_court else _placeholders(5)) return maybe_clear_for_vmix(players) @app.get("/latest_data") async def game(): return latest_data @app.get("/status") async def status(request: Request): global STATUS # будем его править, если live-status свежее def color_for_status(status_value: str) -> str: """Подбор цвета статуса в HEX""" status_value = str(status_value).lower() if status_value in ["ok", "success", "live", "live_soon", "online"]: return "#00FF00" # зелёный elif status_value in ["scheduled", "today_not_started", "upcoming"]: return "#FFFF00" # жёлтый elif status_value in [ "result", "resultconfirmed", "finished", "finished_today", ]: return "#FF0000" # красный elif status_value in ["no_game_today", "unknown", "none"]: return "#FFFFFF" # белый else: return "#FF7700" # серый (неизвестный статус) # ✳️ сортируем latest_data в нужном порядке sort_order = ["game", "live-status", "box-score", "play-by-play"] sorted_keys = [k for k in sort_order if k in latest_data] + sorted( [k for k in latest_data if k not in sort_order] ) # убираем excel_* из списка ключей sorted_keys = [k for k in sorted_keys if "excel" not in k.lower()] cached_game_id = get_cached_game_id() or GAME_ID note = "" if cached_game_id and GAME_ID and str(cached_game_id) != str(GAME_ID): note = ( f' (предзагружены данные прошлой игры)' ) data = { "league": LEAGUE, "team": TEAM, "game_id": cached_game_id, "game_status": STATUS, "statuses": [ { "name": TEAM, "status": STATUS, "ts": ( GAME_START_DT.strftime("%Y-%m-%d %H:%M") if GAME_START_DT else "N/A" ), "link": LEAGUE, "color": color_for_status(STATUS), } ] + [ { "name": item, "status": ( latest_data[item]["data"]["status"] if isinstance(latest_data[item]["data"], dict) and "status" in latest_data[item]["data"] else latest_data[item]["data"] ), "ts": latest_data[item]["ts"], "link": URLS[item].format( host=HOST, league=LEAGUE, season=SEASON, lang=LANG, game_id=cached_game_id, ), "color": color_for_status( latest_data[item]["data"]["status"] if isinstance(latest_data[item]["data"], dict) and "status" in latest_data[item]["data"] else latest_data[item]["data"] ), } for item in sorted_keys if item not in ["league_stats", "season_stats"] ], } accept = request.headers.get("accept", "") if "text/html" in accept: status_raw = str(STATUS).lower() if status_raw in ["live", "online"]: gs_class = "live" gs_text = "🟢 LIVE" elif status_raw in ["live_soon", "today_not_started"]: gs_class = "live" gs_text = "🟢 GAME TODAY (soon)" elif status_raw in ["finished_wait"]: gs_class = "upcoming" off_at = OFFLINE_SWITCH_AT if off_at: human = datetime.fromtimestamp(off_at).strftime("%H:%M:%S") gs_text = f"🟡 Game finished, cooling down → OFFLINE at {human}" else: gs_text = "🟡 Game finished, cooling down" elif status_raw in ["finished_today", "finished"]: gs_class = "finished" gs_text = "🔴 Game finished" else: gs_class = status_raw gs_text = "⚪ Unknown" html = f"""

📊 Game Status Monitor

League: {LEAGUE}

Team: {TEAM}

Game ID: {cached_game_id}{note}

Game Status: {gs_text}

""" # ВАЖНО: цикл только добавляет строки, return будет ПОСЛЕ цикла for s in data["statuses"]: status_text = str(s["status"]).strip().lower() if any( x in status_text for x in ["ok", "success", "live", "live_soon", "online"] ): color_class = "ok" elif any( x in status_text for x in ["scheduled", "today_not_started", "upcoming"] ): color_class = "warn" elif any( x in status_text for x in ["result", "resultconfirmed", "finished", "finished_today"] ): color_class = "fail" else: color_class = "unknown" html += f""" """ # закрываем таблицу и страницу УЖЕ ПОСЛЕ цикла html += """
NameStatusTimestampLink
{s["name"]} {status_text} {s["ts"]} {s["link"]}
""" return HTMLResponse(content=html, media_type="text/html") # JSON для API (красиво отформатированный) formatted = json.dumps(data, indent=4, ensure_ascii=False) response = Response(content=formatted, media_type="application/json") response.headers["Refresh"] = "1" return maybe_clear_for_vmix(response) @app.get("/scores") async def scores(): game = get_latest_game_safe("game") if not game: # игры ещё нет или пришёл только частичный ответ # отдаём пустую структуру, чтобы фронт не падал return [ {"Q": "Q1", "score1": "", "score2": ""}, {"Q": "Q2", "score1": "", "score2": ""}, {"Q": "Q3", "score1": "", "score2": ""}, {"Q": "Q4", "score1": "", "score2": ""}, ] game_data = game["data"] if "data" in game else game result = game_data.get("result", {}) game_info = result.get("game", {}) full_score = game_info.get("fullScore") if not full_score: # поле есть, но ещё пустое/None return [ {"Q": "Q1", "score1": "", "score2": ""}, {"Q": "Q2", "score1": "", "score2": ""}, {"Q": "Q3", "score1": "", "score2": ""}, {"Q": "Q4", "score1": "", "score2": ""}, ] quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"] score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters] full_score_list = full_score.split(",") for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]): parts = score_str.split(":") if len(parts) == 2: score_by_quarter[i]["score1"] = parts[0] score_by_quarter[i]["score2"] = parts[1] return maybe_clear_for_vmix(score_by_quarter) async def top_sorted_team(data): top_sorted_team = sorted( (p for p in data if p.get("startRole") in ["Player", ""]), key=lambda x: ( x.get("pts", 0), x.get("dreb", 0) + x.get("oreb", 0), x.get("ast", 0), x.get("stl", 0), x.get("blk", 0), x.get("time", "0:00"), ), reverse=True, ) # пустые строки не должны ломать UI процентами фолов/очков for player in top_sorted_team: if player.get("num", "") == "": player["pts"] = "" player["foul"] = "" return top_sorted_team def get_latest_game_safe(name: str): """ Безопасно достаём актуальный game из latest_data. Возвращаем None, если структура ещё не готова или прилетел "плохой" game (например, с {"status": "no-status"} без data/result). """ game = latest_data.get(name) if not game: return None # у нас в latest_data["game"] лежит {"ts": ..., "data": {...}} или сразу {...} # в consumer мы клали {"ts": ..., "data": payload}, так что берём .get("data") if "data" in game: game_data = game["data"] else: # на всякий случай, если где-то клали сразу payload game_data = game if not isinstance(game_data, dict): return None result = game_data.get("result") if not result: return None # если всё ок — вернём в исходном виде (с ts и т.п.) return game def _pick_last_avg_and_sum(stats_list: list) -> tuple[dict, dict]: """Возвращает (season_sum, season_avg) из seasonStats. Безопасно при пустых данных.""" if not isinstance(stats_list, list) or len(stats_list) == 0: return {}, {} # В JSON конец массива: ... {"class":"Sum"}, {"class":"Avg"} last = stats_list[-1] if stats_list else None prev = stats_list[-2] if len(stats_list) >= 2 else None season_avg = ( last.get("stats", {}) if isinstance(last, dict) and str(last.get("class")).lower() == "avg" else {} ) season_sum = ( prev.get("stats", {}) if isinstance(prev, dict) and str(prev.get("class")).lower() == "sum" else {} ) # Бывают инверсии порядка (на всякий случай): попробуем найти явно if not season_avg or not season_sum: for x in reversed(stats_list): if ( isinstance(x, dict) and str(x.get("class")).lower() == "avg" and not season_avg ): season_avg = x.get("stats", {}) or {} if ( isinstance(x, dict) and str(x.get("class")).lower() == "sum" and not season_sum ): season_sum = x.get("stats", {}) or {} if season_avg and season_sum: break return season_sum, season_avg def _pick_career_sum_and_avg(carrier_list: list) -> tuple[dict, dict]: """Возвращает (career_sum, career_avg) из carrier. В API встречаются блоки с class: Normal/Sum/Avg.""" if not isinstance(carrier_list, list) or len(carrier_list) == 0: return {}, {} career_sum, career_avg = {}, {} # Ищем явные «Sum» и «Avg» for x in reversed(carrier_list): if isinstance(x, dict): cls = str(x.get("class", "")).lower() stats = x.get("stats", {}) or {} if cls == "sum" and not career_sum: career_sum = stats elif cls == "avg" and not career_avg: career_avg = stats if career_sum and career_avg: break # Если «Avg» нет (часто для карьеры бывает только Normal/Sum) — ок, оставим пустым return career_sum, career_avg def _as_int(v, default=0): try: # в JSON часто строки; пустые строки -> 0 if v in ("", None): return default return int(float(v)) except Exception: return default def _safe(d: dict) -> dict: return d if isinstance(d, dict) else {} async def team(who: str): """ Возвращает данные по команде (team1 / team2) из актуального game. Защищена от ситуации, когда latest_data["game"] ещё не прогрелся или в него прилетел "плохой" ответ от API. """ game = get_latest_game_safe("game") if not game: # игра ещё не подгружена или структура кривоватая raise HTTPException(status_code=503, detail="game data not ready") full_stat = get_latest_game_safe("pregame-full-stats") if not full_stat: # ⚠️ full_stat_data отсутствует — работаем только с game_data logger.debug( f"[{who}] full_stat_data not found → continuing with game_data only" ) full_stat_data = {} else: full_stat_data = full_stat["data"] if "data" in full_stat else full_stat # нормализуем доступ к данным game_data = game["data"] if "data" in game else game result = game_data[ "result" ] # здесь уже безопасно, мы проверили в get_latest_game_safe result_full = full_stat_data.get("result", {}) if full_stat_data else {} # в result ожидаем "teams" teams = result.get("teams") if not teams: raise HTTPException(status_code=503, detail="game teams not ready") # выбираем команду if who == "team1": payload = next((t for t in teams if t.get("teamNumber") == 1), None) payload_full = result_full.get("team1PlayersStats") if result_full else [] else: payload = next((t for t in teams if t.get("teamNumber") == 2), None) payload_full = result_full.get("team2PlayersStats") if result_full else [] if payload is None: raise HTTPException(status_code=404, detail=f"{who} not found in game data") # дальше — твоя исходная логика формирования ответа по команде # я не знаю весь твой оригинальный код ниже, поэтому вставляю каркас # и показываю, где нужно аккуратно брать plays/box-score из latest_data role_list = [ ("Center", "C"), ("Guard", "G"), ("Forward", "F"), ("Power Forward", "PF"), ("Small Forward", "SF"), ("Shooting Guard", "SG"), ("Point Guard", "PG"), ("Forward-Center", "FC"), ] starts = payload.get("starts", []) team_rows = [] for item in starts: stats = item.get("stats") or {} pid = str(item.get("personId")) full_obj = next( (p for p in (payload_full or []) if str(p.get("personId")) == pid), None ) season_sum = season_avg = career_sum = career_avg = {} if full_obj: # сезон season_sum, season_avg = _pick_last_avg_and_sum( full_obj.get("seasonStats") or [] ) # карьера career_sum, career_avg = _pick_career_sum_and_avg( full_obj.get("carrier") or [] ) season_sum = _safe(season_sum) season_avg = _safe(season_avg) career_sum = _safe(career_sum) career_avg = _safe(career_avg) # Полезные числа для Totals+Live # live-поля в box-score называются goal1/2/3, shot1/2/3, defReb/offReb и т.п. g1 = _as_int(stats.get("goal1")) s1 = _as_int(stats.get("shot1")) g2 = _as_int(stats.get("goal2")) s2 = _as_int(stats.get("shot2")) g3 = _as_int(stats.get("goal3")) s3 = _as_int(stats.get("shot3")) # Сезонные суммы из pregame-full-stats ss_pts = _as_int(season_sum.get("points")) ss_ast = _as_int(season_sum.get("assist")) ss_blk = _as_int(season_sum.get("blockShot")) ss_dreb = _as_int(season_sum.get("defRebound")) ss_oreb = _as_int(season_sum.get("offRebound")) ss_reb = _as_int(season_sum.get("rebound")) ss_stl = _as_int(season_sum.get("steal")) ss_to = _as_int(season_sum.get("turnover")) ss_foul = _as_int(season_sum.get("foul")) ss_sec = _as_int(season_sum.get("second")) ss_gms = _as_int(season_sum.get("games")) ss_st = _as_int(season_sum.get("isStarts")) ss_g1 = _as_int(season_sum.get("goal1")) ss_s1 = _as_int(season_sum.get("shot1")) ss_g2 = _as_int(season_sum.get("goal2")) ss_s2 = _as_int(season_sum.get("shot2")) ss_g3 = _as_int(season_sum.get("goal3")) ss_s3 = _as_int(season_sum.get("shot3")) # Карьерные суммы из pregame-full-stats car_ss_pts = _as_int(career_sum.get("points")) car_ss_ast = _as_int(career_sum.get("assist")) car_ss_blk = _as_int(career_sum.get("blockShot")) car_ss_dreb = _as_int(career_sum.get("defRebound")) car_ss_oreb = _as_int(career_sum.get("offRebound")) car_ss_reb = _as_int(career_sum.get("rebound")) car_ss_stl = _as_int(career_sum.get("steal")) car_ss_to = _as_int(career_sum.get("turnover")) car_ss_foul = _as_int(career_sum.get("foul")) car_ss_sec = _as_int(career_sum.get("second")) car_ss_gms = _as_int(career_sum.get("games")) car_ss_st = _as_int(career_sum.get("isStarts")) car_ss_g1 = _as_int(career_sum.get("goal1")) car_ss_s1 = _as_int(career_sum.get("shot1")) car_ss_g2 = _as_int(career_sum.get("goal2")) car_ss_s2 = _as_int(career_sum.get("shot2")) car_ss_g3 = _as_int(career_sum.get("goal3")) car_ss_s3 = _as_int(career_sum.get("shot3")) # Totals по сезону, «с учётом текущего матча»: T_points = ss_pts + _as_int(stats.get("points")) T_assist = ss_ast + _as_int(stats.get("assist")) T_block = ss_blk + _as_int(stats.get("block")) T_dreb = ss_dreb + _as_int(stats.get("defReb")) T_oreb = ss_oreb + _as_int(stats.get("offReb")) T_reb = ss_reb + (_as_int(stats.get("defReb")) + _as_int(stats.get("offReb"))) T_steal = ss_stl + _as_int(stats.get("steal")) T_turn = ss_to + _as_int(stats.get("turnover")) T_foul = ss_foul + _as_int(stats.get("foul")) T_sec = ss_sec + _as_int(stats.get("second")) T_gms = ss_gms + (1 if _as_int(stats.get("second")) > 0 else 0) T_starts = ss_st + (1 if bool(stats.get("isStart")) else 0) T_g1 = ss_g1 + g1 T_s1 = ss_s1 + s1 T_g2 = ss_g2 + g2 T_s2 = ss_s2 + s2 T_g3 = ss_g3 + g3 T_s3 = ss_s3 + s3 # Totals по карьере, «с учётом текущего матча»: car_T_points = car_ss_pts + _as_int(stats.get("points")) car_T_assist = car_ss_ast + _as_int(stats.get("assist")) car_T_block = car_ss_blk + _as_int(stats.get("block")) car_T_dreb = car_ss_dreb + _as_int(stats.get("defReb")) car_T_oreb = car_ss_oreb + _as_int(stats.get("offReb")) car_T_reb = car_ss_reb + ( _as_int(stats.get("defReb")) + _as_int(stats.get("offReb")) ) car_T_steal = car_ss_stl + _as_int(stats.get("steal")) car_T_turn = car_ss_to + _as_int(stats.get("turnover")) car_T_foul = car_ss_foul + _as_int(stats.get("foul")) car_T_sec = car_ss_sec + _as_int(stats.get("second")) car_T_gms = car_ss_gms + (1 if _as_int(stats.get("second")) > 0 else 0) car_T_starts = car_ss_st + (1 if bool(stats.get("isStart")) else 0) car_T_g1 = car_ss_g1 + g1 car_T_s1 = car_ss_s1 + s1 car_T_g2 = car_ss_g2 + g2 car_T_s2 = car_ss_s2 + s2 car_T_g3 = car_ss_g3 + g3 car_T_s3 = car_ss_s3 + s3 # Проценты (без деления на 0) def _pct(goal, shot): return f"{round(goal*100/shot, 1)}%" if shot else "0.0%" # Для «23» используем сумму 2-х и 3-х T_g23 = T_g2 + T_g3 T_s23 = T_s2 + T_s3 car_T_g23 = car_T_g2 + car_T_g3 car_T_s23 = car_T_s2 + car_T_s3 # print(avg_season, total_season) row = { "id": item.get("personId") or "", "num": item.get("displayNumber"), "startRole": item.get("startRole"), "role": item.get("positionName"), "roleShort": ( [ r[1] for r in role_list if r[0].lower() == (item.get("positionName") or "").lower() ][0] if any( r[0].lower() == (item.get("positionName") or "").lower() for r in role_list ) else "" ), "NameGFX": ( f"{(item.get('firstName') or '').strip()} {(item.get('lastName') or '').strip()}".strip() if item.get("firstName") is not None and item.get("lastName") is not None else "Команда" ), "Name1GFX": (item.get("firstName") or "").strip(), "Name2GFX": (item.get("lastName") or "").strip(), "captain": item.get("isCapitan", False), "age": item.get("age") or 0, "height": f"{item.get('height')} cm" if item.get("height") else 0, "weight": f"{item.get('weight')} kg" if item.get("weight") else 0, "isStart": stats.get("isStart", False), "isOn": "🏀" if stats.get("isOnCourt") is True else "", "isOnCourt": stats.get("isOnCourt", False), "flag": ( "https://flagicons.lipis.dev/flags/4x3/" + ( "ru" if item.get("countryId") is None and item.get("countryName") == "Russia" else ( "" if item.get("countryId") is None else ( (item.get("countryId") or "").lower() if item.get("countryName") is not None else "" ) ) ) + ".svg" ), "photoGFX": ( os.path.join( "D:\\Photos", result["league"]["abcName"], result[who]["name"], f"{item.get('displayNumber')}.png", ) if item.get("startRole") == "Player" else os.path.join( "D:\\Photos", result["league"]["abcName"], result[who]["name"], f'Head Coach_{(item.get("lastName") or "").strip()} {(item.get("firstName") or "").strip()}.png', ) ), # live-стата "pts": _as_int(stats.get("points")), "pt-2": f"{g2}/{s2}", "pt-3": f"{g3}/{s3}", "pt-1": f"{g1}/{s1}", "fg": f"{g2+g3}/{s2+s3}", "ast": _as_int(stats.get("assist")), "stl": _as_int(stats.get("steal")), "blk": _as_int(stats.get("block")), "blkVic": _as_int(stats.get("blocked")), "dreb": _as_int(stats.get("defReb")), "oreb": _as_int(stats.get("offReb")), "reb": _as_int(stats.get("defReb")) + _as_int(stats.get("offReb")), "to": _as_int(stats.get("turnover")), "foul": _as_int(stats.get("foul")), "foulT": _as_int(stats.get("foulT")), "foulD": _as_int(stats.get("foulD")), "foulC": _as_int(stats.get("foulC")), "foulB": _as_int(stats.get("foulB")), "fouled": _as_int(stats.get("foulsOn")), "plusMinus": _as_int(stats.get("plusMinus")), "dunk": _as_int(stats.get("dunk")), "kpi": ( _as_int(stats.get("points")) + _as_int(stats.get("defReb")) + _as_int(stats.get("offReb")) + _as_int(stats.get("assist")) + _as_int(stats.get("steal")) + _as_int(stats.get("block")) + _as_int(stats.get("foulsOn")) + (g1 - s1) + (g2 - s2) + (g3 - s3) - _as_int(stats.get("turnover")) - _as_int(stats.get("foul")) ), "time": format_time(_as_int(stats.get("second"))), # сезон — средние (из последнего Avg) "AvgPoints": season_avg.get("points") or "0.0", "AvgAssist": season_avg.get("assist") or "0.0", "AvgBlocks": season_avg.get("blockShot") or "0.0", "AvgDefRebound": season_avg.get("defRebound") or "0.0", "AvgOffRebound": season_avg.get("offRebound") or "0.0", "AvgRebound": season_avg.get("rebound") or "0.0", "AvgSteal": season_avg.get("steal") or "0.0", "AvgTurnover": season_avg.get("turnover") or "0.0", "AvgFoul": season_avg.get("foul") or "0.0", "AvgOpponentFoul": season_avg.get("foulsOnPlayer") or "0.0", "AvgDunk": season_avg.get("dunk") or "0.0", "AvgPlayedTime": season_avg.get("playedTime") or "0:00", "Shot1Percent": season_avg.get("shot1Percent") or "0.0%", "Shot2Percent": season_avg.get("shot2Percent") or "0.0%", "Shot3Percent": season_avg.get("shot3Percent") or "0.0%", "Shot23Percent": season_avg.get("shot23Percent") or "0.0%", # сезон — Totals (суммы из Sum + live) "TPoints": T_points, "TShots1": f"{T_g1}/{T_s1}", "TShots2": f"{T_g2}/{T_s2}", "TShots3": f"{T_g3}/{T_s3}", "TShots23": f"{T_g23}/{T_s23}", "TShot1Percent": _pct(T_g1, T_s1), "TShot2Percent": _pct(T_g2, T_s2), "TShot3Percent": _pct(T_g3, T_s3), "TShot23Percent": _pct(T_g23, T_s23), "TAssist": T_assist, "TBlocks": T_block, "TDefRebound": T_dreb, "TOffRebound": T_oreb, "TRebound": T_reb, "TSteal": T_steal, "TTurnover": T_turn, "TFoul": T_foul, "TPlayedTime": format_time(T_sec), "TGameCount": T_gms, "TStartCount": T_starts, # карьера — средние (из последнего Avg) "Career_AvgPoints": career_avg.get("points") or "0.0", "Career_AvgAssist": career_avg.get("assist") or "0.0", "Career_AvgBlocks": career_avg.get("blockShot") or "0.0", "Career_AvgDefRebound": career_avg.get("defRebound") or "0.0", "Career_AvgOffRebound": career_avg.get("offRebound") or "0.0", "Career_AvgRebound": career_avg.get("rebound") or "0.0", "Career_AvgSteal": career_avg.get("steal") or "0.0", "Career_AvgTurnover": career_avg.get("turnover") or "0.0", "Career_AvgFoul": career_avg.get("foul") or "0.0", "Career_AvgOpponentFoul": career_avg.get("foulsOnPlayer") or "0.0", "Career_AvgDunk": career_avg.get("dunk") or "0.0", "Career_AvgPlayedTime": career_avg.get("playedTime") or "0:00", "Career_Shot1Percent": career_avg.get("shot1Percent") or "0.0%", "Career_Shot2Percent": career_avg.get("shot2Percent") or "0.0%", "Career_Shot3Percent": career_avg.get("shot3Percent") or "0.0%", "Career_Shot23Percent": career_avg.get("shot23Percent") or "0.0%", # карьера — Totals (суммы из Sum + live) "Career_TPoints": car_T_points, "Career_TShots1": f"{car_T_g1}/{car_T_s1}", "Career_TShots2": f"{car_T_g2}/{car_T_s2}", "Career_TShots3": f"{car_T_g3}/{car_T_s3}", "Career_TShots23": f"{car_T_g23}/{car_T_s23}", "Career_TShot1Percent": _pct(car_T_g1, car_T_s1), "Career_TShot2Percent": _pct(car_T_g2, car_T_s2), "Career_TShot3Percent": _pct(car_T_g3, car_T_s3), "Career_TShot23Percent": _pct(car_T_g23, car_T_s23), "Career_TAssist": car_T_assist, "Career_TBlocks": car_T_block, "Career_TDefRebound": car_T_dreb, "Career_TOffRebound": car_T_oreb, "Career_TRebound": car_T_reb, "Career_TSteal": car_T_steal, "Career_TTurnover": car_T_turn, "Career_TFoul": car_T_foul, "Career_TPlayedTime": format_time(car_T_sec), "Career_TGameCount": car_T_gms, "Career_TStartCount": car_T_starts, "startNum": stats.get("startNum"), "photoShotMapGFX": "", "mask": "#FFFFFF", } team_rows.append(row) # добиваем до 12 строк, чтобы UI был ровный count_player = sum(1 for x in team_rows if x["startRole"] == "Player") if count_player < 12 and team_rows: filler_count = (4 if count_player <= 4 else 12) - count_player template_keys = list(team_rows[0].keys()) for _ in range(filler_count): empty_row = {} for key in template_keys: if key in ["captain", "isStart", "isOnCourt"]: empty_row[key] = False elif key in [ "id", "pts", "weight", "height", "age", "ast", "stl", "blk", "blkVic", "dreb", "oreb", "reb", "to", "foul", "foulT", "foulD", "foulC", "foulB", "fouled", "plusMinus", "dunk", "kpi", ]: empty_row[key] = 0 elif key == "mask": empty_row[key] = "#FFFFFF00" else: empty_row[key] = "" team_rows.append(empty_row) # сортируем игроков по типу роли: сначала "Player", потом "", потом "Coach" и т.д. role_priority = { "Player": 0, "": 1, "Coach": 2, "Team": 3, None: 4, "Other": 5, } sorted_team = sorted( team_rows, key=lambda x: role_priority.get(x.get("startRole", 99), 99), ) # --- 👇 ДОБАВЛЯЕМ КАРТЫ БРОСКОВ ПО startNum --- # # Берём play-by-play из текущего game, если он есть plays = result.get("plays") or [] # Собираем множество startNum текущей команды, чтобы не ловить чужих игроков team_startnums = { p.get("startNum") for p in payload.get("starts", []) if p.get("startRole") == "Player" } # startNum -> список (x, y, is_made) shots_by_startnum: dict[int, list[tuple[float, float, bool]]] = {} for ev in plays: play_code = ev.get("play") if play_code not in (2, 3, 5, 6): continue sn = ev.get("startNum") if sn not in team_startnums: continue x = ev.get("x") y = ev.get("y") if x is None or y is None: continue sec = ev.get("sec") period = ev.get("period") is_made = play_code in (2, 3) # 2,3 — точные, 5,6 — промахи shots_by_startnum.setdefault(sn, []).append((x, y, is_made, sec, period)) # --- shotmaps: используем уже готовые пути из глобального SHOTMAPS --- with SHOTMAPS_LOCK: shotmaps_snapshot = dict(SHOTMAPS) for row in sorted_team: sn = row.get("startNum") info = shotmaps_snapshot.get(sn) if info and info.get("count", 0) > 0: # сюда кладём путь, который воркер получил из get_image: # например: "https://host/image/23_shots_7" row["photoShotMapGFX"] = FQDN + info.get("url", "") else: row["photoShotMapGFX"] = EMPTY_PHOTO_PATH return sorted_team async def started_team(data): return data def add_new_team_stat( data: dict, avg_age: float, points, avg_height: float, timeout_str: str, timeout_left: int, ) -> dict: """ Берёт словарь total по команде (очки, подборы, броски и т.д.), добавляет: - проценты попаданий - средний возраст / рост - очки старт / бенч - информацию по таймаутам и всё приводит к строкам (для UI, чтобы не ловить типы). Возвращает обновлённый словарь. """ def safe_int(v): try: return int(v) except (ValueError, TypeError): return 0 def format_percent(goal, shot): goal, shot = safe_int(goal), safe_int(shot) return f"{round(goal * 100 / shot)}%" if shot else "0%" goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1")) goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2")) goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3")) def_reb = safe_int(data.get("defReb")) off_reb = safe_int(data.get("offReb")) data.update( { "pt-1": f"{goal1}/{shot1}", "pt-2": f"{goal2}/{shot2}", "pt-3": f"{goal3}/{shot3}", "fg": f"{goal2 + goal3}/{shot2 + shot3}", "pt-1_pro": format_percent(goal1, shot1), "pt-2_pro": format_percent(goal2, shot2), "pt-3_pro": format_percent(goal3, shot3), "fg_pro": format_percent(goal2 + goal3, shot2 + shot3), "Reb": str(def_reb + off_reb), "avgAge": str(avg_age), "ptsStart": str(points[0]), "ptsStart_pro": str(points[1]), "ptsBench": str(points[2]), "ptsBench_pro": str(points[3]), "avgHeight": f"{avg_height} cm", "timeout_left": str(timeout_left), "timeout_str": str(timeout_str), } ) for k in data: data[k] = str(data[k]) return data def time_outs_func(data_pbp): """ Считает таймауты для обеих команд и формирует читабельные строки вида: "2 Time-outs left in 2nd half" Возвращает: (строка_для_команды1, остаток1, строка_для_команды2, остаток2) """ timeout1 = [] timeout2 = [] for event in data_pbp: if event.get("play") == 23: # 23 == таймаут if event.get("startNum") == 1: timeout1.append(event) elif event.get("startNum") == 2: timeout2.append(event) def timeout_status(timeout_list, last_event: dict): period = last_event.get("period", 0) sec = last_event.get("sec", 0) if period < 3: timeout_max = 2 count = sum(1 for t in timeout_list if t.get("period", 0) <= period) quarter = "1st half" elif period < 5: count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period) quarter = "2nd half" if period == 4 and sec >= 4800 and count in (0, 1): timeout_max = 2 else: timeout_max = 3 else: timeout_max = 1 count = sum(1 for t in timeout_list if t.get("period", 0) == period) quarter = f"OverTime {period - 4}" left = max(0, timeout_max - count) word = "Time-outs" if left != 1 else "Time-out" text = f"{left if left != 0 else 'No'} {word} left in {quarter}" return text, left if not data_pbp: return "", 0, "", 0 last_event = data_pbp[-1] t1_str, t1_left = timeout_status(timeout1, last_event) t2_str, t2_left = timeout_status(timeout2, last_event) return t1_str, t1_left, t2_str, t2_left def add_data_for_teams(new_data): """ Считает командные агрегаты: - средний возраст - очки со старта vs со скамейки, + их проценты - средний рост Возвращает кортеж: (avg_age, [start_pts, start%, bench_pts, bench%], avg_height_cm) """ players = [item for item in new_data if item["startRole"] == "Player"] points_start = 0 points_bench = 0 total_age = 0 total_height = 0 player_count = len(players) for player in players: # print(player) stats = player["stats"] if stats: # print(stats) if stats["isStart"] is True: points_start += stats["points"] elif stats["isStart"] is False: points_bench += stats["points"] total_age += player["age"] total_height += player["height"] total_points = points_start + points_bench points_start_pro = ( f"{round(points_start * 100 / total_points)}%" if total_points else "0%" ) points_bench_pro = ( f"{round(points_bench * 100 / total_points)}%" if total_points else "0%" ) avg_age = round(total_age / player_count, 1) if player_count else 0 avg_height = round(total_height / player_count, 1) if player_count else 0 points = [points_start, points_start_pro, points_bench, points_bench_pro] return avg_age, points, avg_height stat_name_list = [ ("points", "Очки", "points"), ("pt-1", "Штрафные", "free throws"), ("pt-1_pro", "штрафные, процент", "free throws pro"), ("pt-2", "2-очковые", "2-points"), ("pt-2_pro", "2-очковые, процент", "2-points pro"), ("pt-3", "3-очковые", "3-points"), ("pt-3_pro", "3-очковые, процент", "3-points pro"), ("fg", "очки с игры", "field goals"), ("fg_pro", "Очки с игры, процент", "field goals pro"), ("assist", "Передачи", "assists"), ("pass", "", ""), ("defReb", "подборы в защите", ""), ("offReb", "подборы в нападении", ""), ("Reb", "Подборы", "rebounds"), ("steal", "Перехваты", "steals"), ("block", "Блокшоты", "blocks"), ("blocked", "", ""), ("turnover", "Потери", "turnovers"), ("foul", "Фолы", "fouls"), ("foulsOn", "", ""), ("foulT", "", ""), ("foulD", "", ""), ("foulC", "", ""), ("foulB", "", ""), ("second", "секунды", "seconds"), ("dunk", "данки", "dunks"), ("fastBreak", "быстрые отрывы", "fast breaks"), ("plusMinus", "+/-", "+/-"), ("avgAge", "", "avg Age"), ("ptsBench", "Скамейка, очки", "Bench PTS"), ("ptsBench_pro", "Скамейка, %", "Bench PTS, %"), ("ptsStart", "Стартовая пятерка, очки", "Start PTS"), ("ptsStart_pro", "Стартовая пятерка, %", "Start PTS, %"), ("avgHeight", "", "avg height"), ("timeout_left", "", "timeout left"), ("timeout_str", "", "timeout str"), ] @app.get("/team_stats") async def team_stats(): teams = latest_data["game"]["data"]["result"]["teams"] plays = latest_data["game"]["data"]["result"]["plays"] team_1 = next((t for t in teams if t["teamNumber"] == 1), None) team_2 = next((t for t in teams if t["teamNumber"] == 2), None) timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays) avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1["starts"]) avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2["starts"]) total_1 = add_new_team_stat( team_1["total"], avg_age_1, points_1, avg_height_1, timeout_str1, timeout_left1, ) total_2 = add_new_team_stat( team_2["total"], avg_age_2, points_2, avg_height_2, timeout_str2, timeout_left2, ) result_json = [] for key in total_1: val1 = total_1[key] val2 = total_2[key] stat_rus = "" stat_eng = "" for metric_name, rus, eng in stat_name_list: if metric_name == key: stat_rus, stat_eng = rus, eng break result_json.append( { "name": key, "nameGFX_rus": stat_rus, "nameGFX_eng": stat_eng, "val1": val1, "val2": val2, } ) return maybe_clear_for_vmix(result_json) @app.get("/referee") async def referee(): desired_order = [ "Crew chief", "Referee 1", "Referee 2", "Commissioner", "Ст.судья", "Судья 1", "Судья 2", "Комиссар", ] # Найти судей (teamNumber == 0) team_ref = next( ( t for t in latest_data["game"]["data"]["result"]["teams"] if t["teamNumber"] == 0 ), None, ) referees_raw = team_ref.get("starts", []) referees = [] for r in referees_raw: flag_code = r.get("countryId", "").lower() if r.get("countryName") else "" referees.append( { "displayNumber": r.get("displayNumber", ""), "positionName": r.get("positionName", ""), "lastNameGFX": f"{r.get('firstName', '')} {r.get('lastName', '')}".strip(), "secondName": r.get("secondName", ""), "birthday": r.get("birthday", ""), "age": r.get("age", 0), "flag": f"https://flagicons.lipis.dev/flags/4x3/{flag_code}.svg", } ) # Сортировка по позиции referees = sorted( referees, key=lambda x: ( desired_order.index(x["positionName"]) if x["positionName"] in desired_order else len(desired_order) ), ) return maybe_clear_for_vmix(referees) @app.get("/team_comparison") async def team_comparison(): if STATUS not in ["no_game_today", "finished_today"]: data = latest_data["pregame"]["data"]["result"] teams = [] for data_team in (data["teamStats1"], data["teamStats2"]): temp_team = { "team": data_team["team"]["name"], "games": data_team["games"], "points": round( (data_team["totalStats"]["points"] / data_team["games"]), 1 ), "points_2": round( ( data_team["totalStats"]["goal2"] * 100 / data_team["totalStats"]["shot2"] ), 1, ), "points_3": round( ( data_team["totalStats"]["goal3"] * 100 / data_team["totalStats"]["shot3"] ), 1, ), "points_23": round( ( data_team["totalStats"]["goal23"] * 100 / data_team["totalStats"]["shot23"] ), 1, ), "points_1": round( ( data_team["totalStats"]["goal1"] * 100 / data_team["totalStats"]["shot1"] ), 1, ), "assists": round( (data_team["totalStats"]["assist"] / data_team["games"]), 1 ), "rebounds": round( ( ( data_team["totalStats"]["defRebound"] + data_team["totalStats"]["offRebound"] ) / data_team["games"] ), 1, ), "steals": round( (data_team["totalStats"]["steal"] / data_team["games"]), 1 ), "turnovers": round( (data_team["totalStats"]["turnover"] / data_team["games"]), 1 ), "blocks": round( (data_team["totalStats"]["blockShot"] / data_team["games"]), 1 ), "fouls": round( (data_team["totalStats"]["foul"] / data_team["games"]), 1 ), } teams.append(temp_team) return maybe_clear_for_vmix(teams) else: return [{"Данных о сравнении команд нет!"}] @app.get("/standings") async def regular_standings(): data = latest_data["actual-standings"]["data"]["items"] for item in data: # if item["comp"]["name"] == "Regular Season": if item.get("standings"): standings_rows = item["standings"] df = pd.json_normalize(standings_rows) if "scores" in df.columns: df = df.drop(columns=["scores"]) if ( "totalWin" in df.columns and "totalDefeat" in df.columns and "totalGames" in df.columns and "totalGoalPlus" in df.columns and "totalGoalMinus" in df.columns ): tw = ( pd.to_numeric(df["totalWin"], errors="coerce").fillna(0).astype(int) ) td = ( pd.to_numeric(df["totalDefeat"], errors="coerce") .fillna(0) .astype(int) ) df["w_l"] = tw.astype(str) + " / " + td.astype(str) def calc_percent(row): win = row.get("totalWin", 0) games = row.get("totalGames", 0) # гарантируем числа try: win = int(win) except (TypeError, ValueError): win = 0 try: games = int(games) except (TypeError, ValueError): games = 0 if games == 0 or row["w_l"] == "0 / 0": return 0 return round(win * 100 / games + 0.000005) df["procent"] = df.apply(calc_percent, axis=1) tg_plus = ( pd.to_numeric(df["totalGoalPlus"], errors="coerce") .fillna(0) .astype(int) ) tg_minus = ( pd.to_numeric(df["totalGoalMinus"], errors="coerce") .fillna(0) .astype(int) ) df["plus_minus"] = tg_plus - tg_minus standings_payload = df.to_dict(orient="records") return maybe_clear_for_vmix(standings_payload) @app.get("/live_status") async def live_status(): # если матч реально идёт/вот-вот — пытаемся отдать то, что есть if STATUS in ["live", "live_soon"]: ls = latest_data.get("live-status") if not ls: # live-status ещё не прилетел return maybe_clear_for_vmix( [{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}] ) raw = ls.get("data") # 1) если это уже готовый dict и в нём есть result → как было раньше if isinstance(raw, dict): # иногда API кладёт всё прямо в root, иногда внутрь result if "result" in raw and isinstance(raw["result"], dict): return maybe_clear_for_vmix([raw["result"]]) else: # отдадим как есть, но в списке, чтобы фронт не сломать return maybe_clear_for_vmix([raw]) # 2) если это просто строка статуса ("ok" / "no-status" / "error") if isinstance(raw, str): return maybe_clear_for_vmix([{"status": raw}]) # fallback return maybe_clear_for_vmix( [{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}] ) else: # матч не идёт — как у тебя было return maybe_clear_for_vmix( [{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}] ) def get_excel_row_for_team(team_name: str) -> dict: """ Ищем строку из Excel по имени команды (колонка 'Team'). Читаем из latest_data["excel"]["data"] (список dict'ов). Возвращаем dict по команде или {} если не нашли / нет Excel. """ excel_wrap = latest_data.get("excel_TEAMS_LEGEND") if not excel_wrap: return {} rows = excel_wrap.get("data") or [] # это list[dict] после df.to_dict("records") team_norm = (team_name or "").strip().casefold() for row in rows: name = str(row.get("Team", "")).strip().casefold() if name == team_norm: return row return {} @app.get("/info") async def info(): data = latest_data["game"]["data"]["result"] team1_name = data["team1"]["name"] team2_name = data["team2"]["name"] team1_name_short_api = data["team1"]["abcName"] team2_name_short_api = data["team2"]["abcName"] team1_logo_api = data["team1"]["logo"] team2_logo_api = data["team2"]["logo"] arena_api = data["arena"]["name"] arena_short_api = data["arena"]["shortName"] region_api = data["region"]["name"] date_obj_api = datetime.strptime(data["game"]["localDate"], "%d.%m.%Y") league_api = data["league"]["abcName"] league_full_api = data["league"]["name"] season_api = ( f'{str(data["league"]["season"]-1)}/{str(data["league"]["season"])[2:]}' ) stadia_api = data["comp"]["name"] row_team1 = get_excel_row_for_team(team1_name) row_team2 = get_excel_row_for_team(team2_name) team1_logo_exl = row_team1.get("TeamLogo", "") team1_logo_left_exl = row_team1.get("TeamLogo(LEFT-LOOP)", "") team1_logo_right_exl = row_team1.get("TeamLogo(RIGHT-LOOP)", "") team1_tla_exl = row_team1.get("TeamTLA", "") team1_teamstat_exl = row_team1.get("TeamStats", "") team1_2line_exl = row_team1.get("TeamName2Lines", "") team2_logo_exl = row_team2.get("TeamLogo", "") team2_logo_left_exl = row_team2.get("TeamLogo(LEFT-LOOP)", "") team2_logo_right_exl = row_team2.get("TeamLogo(RIGHT-LOOP)", "") team2_tla_exl = row_team2.get("TeamTLA", "") team2_teamstat_exl = row_team2.get("TeamStats", "") team2_2line_exl = row_team2.get("TeamName2Lines", "") fon_exl = latest_data.get("excel_INFO")["data"][1]["SELECT TEAM1"] swape_exl = latest_data.get("excel_INFO")["data"][2]["SELECT TEAM1"] logo_exl = latest_data.get("excel_INFO")["data"][3]["SELECT TEAM1"] try: full_format = date_obj_api.strftime("%A, %-d %B %Y") short_format = date_obj_api.strftime("%A, %-d %b") except ValueError: full_format = date_obj_api.strftime("%A, %#d %B %Y") short_format = date_obj_api.strftime("%A, %#d %b") return maybe_clear_for_vmix( [ { "team1": team1_name, "team2": team2_name, "team1_short_api": team1_name_short_api, "team2_short_api": team2_name_short_api, "logo1_api": team1_logo_api, "logo2_api": team2_logo_api, "arena_api": arena_api, "short_arena_api": arena_short_api, "region_api": region_api, "league_api": league_api, "league_full_api": league_full_api, "season_api": season_api, "stadia_api": stadia_api, "date1_api": str(full_format), "date2_api": str(short_format), "team1_logo_exl": team1_logo_exl, "team1_logo_left_exl": team1_logo_left_exl, "team1_logo_right_exl": team1_logo_right_exl, "team1_tla_exl": team1_tla_exl, "team1_teamstat_exl": team1_teamstat_exl, "team1_2line_exl": team1_2line_exl, "team2_logo_exl": team2_logo_exl, "team2_logo_left_exl": team2_logo_left_exl, "team2_logo_right_exl": team2_logo_right_exl, "team2_tla_exl": team2_tla_exl, "team2_teamstat_exl": team2_teamstat_exl, "team2_2line_exl": team2_2line_exl, "fon_exl": fon_exl, "swape_exl": swape_exl, "logo_exl": logo_exl, } ] ) @app.get("/play_by_play") async def play_by_play(): data = latest_data["game"]["data"]["result"] data_pbp = data["plays"] team1_name = data["team1"]["name"] team2_name = data["team2"]["name"] team1_startnum = [ i["startNum"] for i in next( (t for t in data["teams"] if t["teamNumber"] == 1), None, )["starts"] if i["startRole"] == "Player" ] team2_startnum = [ i["startNum"] for i in next( (t for t in data["teams"] if t["teamNumber"] == 2), None, )["starts"] if i["startRole"] == "Player" ] # если вообще нет плей-бай-плея — просто отдаём пустой список if not data_pbp: return maybe_clear_for_vmix([]) df_data_pbp = pd.DataFrame(data_pbp[::-1]) last_event = data_pbp[-1] if "play" not in df_data_pbp: return maybe_clear_for_vmix([]) if ( "live-status" in latest_data and latest_data["live-status"]["data"] != "Not Found" ): json_quarter = latest_data["live-status"]["data"]["result"]["period"] json_second = latest_data["live-status"]["data"]["result"]["second"] else: json_quarter = last_event["period"] json_second = 0 if "3x3" in LEAGUE: df_data_pbp["play"].replace({2: 1, 3: 2}, inplace=True) df_goals = df_data_pbp.loc[df_data_pbp["play"].isin([1, 2, 3])].copy() if df_goals.empty: return maybe_clear_for_vmix([]) df_goals.loc[df_goals["startNum"].isin(team1_startnum), "score1"] = df_goals["play"] df_goals.loc[df_goals["startNum"].isin(team2_startnum), "score2"] = df_goals["play"] df_goals["score_sum1"] = df_goals["score1"].fillna(0).cumsum() df_goals["score_sum2"] = df_goals["score2"].fillna(0).cumsum() df_goals["new_sec"] = df_goals["sec"].astype(str).str.slice(0, -1).astype(int) df_goals["time_now"] = (600 if json_quarter < 5 else 300) - json_second df_goals["quar"] = json_quarter - df_goals["period"] # без numpy: diff_time через маски pandas same_quarter = df_goals["quar"] == 0 other_quarter = ~same_quarter df_goals.loc[same_quarter, "diff_time"] = ( df_goals.loc[same_quarter, "time_now"] - df_goals.loc[same_quarter, "new_sec"] ) df_goals.loc[other_quarter, "diff_time"] = ( 600 * df_goals.loc[other_quarter, "quar"] - df_goals.loc[other_quarter, "new_sec"] + df_goals.loc[other_quarter, "time_now"] ) df_goals["diff_time"] = df_goals["diff_time"].astype(int) df_goals["diff_time_str"] = df_goals["diff_time"].apply( lambda x: f"{x // 60}:{str(x % 60).zfill(2)}" ) df_goals["team"] = df_goals.apply( lambda row: team1_name if not pd.isna(row["score1"]) else team2_name, axis=1, ) df_goals["text_rus"] = df_goals.apply( lambda row: ( f"рывок {int(row['score_sum1'])}-{int(row['score_sum2'])}" if not pd.isna(row["score1"]) else f"рывок {int(row['score_sum2'])}-{int(row['score_sum1'])}" ), axis=1, ) df_goals["text_time_rus"] = df_goals.apply( lambda row: ( f"рывок {int(row['score_sum1'])}-{int(row['score_sum2'])} за {row['diff_time_str']}" if not pd.isna(row["score1"]) else f"рывок {int(row['score_sum2'])}-{int(row['score_sum1'])} за {row['diff_time_str']}" ), axis=1, ) df_goals["text"] = df_goals.apply( lambda row: ( f"{team1_name} {int(row['score_sum1'])}-{int(row['score_sum2'])} run" if not pd.isna(row["score1"]) else f"{team2_name} {int(row['score_sum2'])}-{int(row['score_sum1'])} run" ), axis=1, ) df_goals["text_time"] = df_goals.apply( lambda row: ( f"{team1_name} {int(row['score_sum1'])}-{int(row['score_sum2'])} run in last {row['diff_time_str']}" if not pd.isna(row["score1"]) else f"{team2_name} {int(row['score_sum2'])}-{int(row['score_sum1'])} run in last {row['diff_time_str']}" ), axis=1, ) new_order = ["text", "text_time"] + [ col for col in df_goals.columns if col not in ["text", "text_time"] ] df_goals = df_goals[new_order] for _ in ["children", "start", "stop", "hl", "sort", "startNum", "zone", "x", "y"]: if _ in df_goals.columns: del df_goals[_] # 👉 здесь избавляемся от NaN: только для score1/score2 df_goals["score1"] = df_goals["score1"].fillna("") df_goals["score2"] = df_goals["score2"].fillna("") # если хочешь вообще никаких NaN во всём JSON — можно так: # df_goals = df_goals.fillna("") # print(payload) payload = df_goals.to_dict(orient="records") return maybe_clear_for_vmix(payload) def change_vmix_datasource_urls(xml_data, new_base_url: str) -> bytes: """ Ищет все и меняет внутри на new_base_url + endpoint. """ # 1. Приводим вход к bytes if isinstance(xml_data, (bytes, bytearray)): raw_bytes = bytes(xml_data) elif isinstance(xml_data, str): raw_bytes = xml_data.encode("utf-8") elif isinstance(xml_data, io.IOBase) or hasattr(xml_data, "read"): # nasio.load_bio, скорее всего, возвращает BytesIO raw_bytes = xml_data.read() try: xml_data.seek(0) except Exception: pass else: raise TypeError(f"Unsupported xml_data type: {type(xml_data)}") # 2. Декодируем text = raw_bytes.decode("utf-8", errors="replace") # 3. Парсим XML root = ET.fromstring(text) # 4. Меняем URL for url_tag in root.findall( ".//datasource[@friendlyName='JSON']//instance//state/xml/url" ): old_url = url_tag.text.strip() pattern = r"https?:\/\/[^\/]+" new_url = re.sub(pattern, new_base_url, old_url, count=0, flags=0) url_tag.text = new_url # 5. Сериализуем обратно в bytes new_xml = ET.tostring(root, encoding="utf-8", method="xml") return new_xml @app.get("/vmix") async def vmix_project(): vmix_bio = nasio.load_bio( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=SYNO_PATH_VMIX, ) # system_name = platform.system() # if system_name == "Windows": # pass # else: # ❗ На Linux/Synology/Docker — заменяем URL edited_vmix = change_vmix_datasource_urls(vmix_bio, FQDN) # 2. гарантируем, что это bytes if isinstance(edited_vmix, str): edited_vmix = edited_vmix.encode("utf-8") return StreamingResponse( io.BytesIO(edited_vmix), media_type="application/octet-stream", headers={"Content-Disposition": f'attachment; filename="VTB_{MYHOST}.vmix"'}, ) @app.get("/quarter") async def select_quarter(): return latest_data["excel_QUARTER"] def resolve_period(ls: dict, game: dict) -> str: try: period_num = int(ls.get("period", 0)) except (TypeError, ValueError): return game.get("period", "") try: seconds_left = int(ls.get("second", 0)) except (TypeError, ValueError): seconds_left = 0 if period_num <= 0: return game.get("period", "") score_a = ls.get("scoreA", game.get("score1")) score_b = ls.get("scoreB", game.get("score2")) if seconds_left == 0: # период закончился if period_num == 1: return "End 1q" if period_num == 2: return "HT" if period_num == 3: return "End 3q" if period_num == 4: return "End 4q" if score_a == score_b else "" # овертаймы return f"End OT{period_num - 4}".replace("1", "") else: # период в процессе if period_num <= 4: return f"Q{period_num}" return f"OT{period_num - 4}".replace("1", "") @app.get("/games_online") async def games_online(): if not CALENDAR or "items" not in CALENDAR: raise HTTPException(status_code=503, detail="calendar data not ready") today = datetime.now().date() # today = (datetime.now() + timedelta(days=1)).date() todays_games = [] final_states = {"result", "resultconfirmed", "finished"} for item in CALENDAR["items"]: game = item.get("game") or {} status_raw = str(game.get("gameStatus", "") or "").lower() need_refresh = status_raw not in final_states dt_str = game.get("defaultZoneDateTime") or "" try: game_dt = datetime.fromisoformat(dt_str).date() except ValueError: continue if game_dt == today: row_team1 = get_excel_row_for_team(item["team1"]["name"]) or {} row_team2 = get_excel_row_for_team(item["team2"]["name"]) or {} game["team1_xls"] = row_team1.get("TeamName2Lines", "") game["team1_logo_xls"] = row_team1.get("TeamLogo", "") game["team2_xls"] = row_team2.get("TeamName2Lines", "") game["team2_logo_xls"] = row_team2.get("TeamLogo", "") game_id = game.get("id") if game_id and need_refresh: try: resp = requests.get( URLS["live-status"].format(host=HOST, game_id=game_id), timeout=5, ).json() ls = resp.get("result") or resp msg = str(ls.get("message") or "").lower() status = str(ls.get("status") or "").lower() if msg == "not found" or status == "404": pass elif ( ls.get("message") != "Not found" and str(ls.get("gameStatus")).lower() == "online" ): game["score1"] = ls.get("scoreA", game.get("score1", "")) game["score2"] = ls.get("scoreB", game.get("score2", "")) game["period"] = resolve_period(ls, game) game["gameStatus"] = ls.get( "gameStatus", game.get("gameStatus", "") ) except Exception as ex: print(ex) scores = [game.get("score1"), game.get("score2")] todays_games.append( { "gameStatus": game["gameStatus"], "score1": ( game["score1"] if any((s or 0) > 0 for s in scores) else "" ), "score2": ( game["score2"] if any((s or 0) > 0 for s in scores) else "" ), "period": ( game["period"] if "period" in game and any((s or 0) > 0 for s in scores) else "" ), "defaultZoneTime": game["defaultZoneTime"], "team1": item["team1"]["name"], "team2": item["team2"]["name"], "team1_xls": game["team1_xls"], "team1_logo_xls": game["team1_logo_xls"], "team2_xls": game["team2_xls"], "team2_logo_xls": game["team2_logo_xls"], "mask1": ( "#FFFFFF00" if any((s or 0) > 0 for s in scores) else "#FFFFFF" ), "mask2": ( "#FFFFFF00" if ( game["period"] if "period" in game and any((s or 0) > 0 for s in scores) else "" ) == "" else "#FFFFFF" ), "mask3": ( "#FFFFFF00" if (game["period"] if "period" in game else "") != "" else "#FFFFFF" ), "split": ( ":" if any((s or 0) > 0 for s in scores) and ( game["period"] if "period" in game and any((s or 0) > 0 for s in scores) else "" ) == "" else "" ), } ) return todays_games def get_image(points, bib, count_point): """ points: список кортежей (x, y, is_made, sec, period) x, y — координаты из API, где (0,0) = центр кольца is_made — True (play 2,3) или False (play 5,6) bib: startNum/номер игрока count_point: строка-версия (количество бросков) """ if not points: return b"" base_image = Image.new("RGBA", (1500, 2800), (0, 0, 0, 0)) width, height = base_image.size draw = ImageDraw.Draw(base_image) # === Диапазон координат === COURT_WIDTH_UNITS = 150.0 # по X COURT_LENGTH_UNITS = 280.0 # по Y scale_x = height / COURT_LENGTH_UNITS scale_y = width / COURT_WIDTH_UNITS HOOP_X_PX = 750 HOOP_Y_PX = 157.5 def to_px(x, y): px = int(HOOP_X_PX - x * scale_x) py = int(HOOP_Y_PX + y * scale_y) return px, py point_radius = 30 # # --- шрифт --- # font = ImageFont.load_default() # try: # nasio_font = SYNO_FONT_PATH # if nasio_font: # if isinstance(nasio_font, BytesIO): # nasio_font = nasio_font.getvalue() # if isinstance(nasio_font, (bytes, bytearray)): # font = ImageFont.truetype(BytesIO(nasio_font), 18) # else: # font = ImageFont.truetype(nasio_font, 18) # except Exception as e: # logger.warning(f"[shotmap] не удалось загрузить шрифт: {e}") for x_raw, y_raw, is_made, sec, period in points: try: x = float(x_raw) y = float(y_raw) except (TypeError, ValueError): continue px, py = to_px(x, y) # --- выбираем bytes иконки --- icon_bytes = SYNO_GOAL if is_made else SYNO_MISS used_icon = False if isinstance(icon_bytes, (bytes, bytearray)) and icon_bytes: try: buf = BytesIO(icon_bytes) icon = Image.open(buf).convert("RGBA") size = point_radius * 3 icon = icon.resize((size, size), Image.LANCZOS) base_image.paste(icon, (px - point_radius, py - point_radius), icon) used_icon = True except Exception as e: logger.warning( f"[shotmap] не удалось открыть иконку из bytes " f"(is_made={is_made}, len={len(icon_bytes)}): {e}" ) else: logger.warning( f"[shotmap] icon_bytes пустой или не bytes " f"(is_made={is_made}, type={type(icon_bytes)})" ) # --- Fallback: кружок, если картинка не нарисовалась --- if not used_icon: bbox = ( px - point_radius, py - point_radius, px + point_radius, py + point_radius, ) color = (0, 255, 0, 255) if is_made else (255, 0, 0, 255) draw.ellipse(bbox, fill=color) # # --- подпись Q{period} по центру --- # label = f"Q{period}" # try: # bbox = draw.textbbox((0, 0), label, font=font) # text_w = bbox[2] - bbox[0] # text_h = bbox[3] - bbox[1] # except AttributeError: # text_w, text_h = draw.textsize(label, font=font) # center_x, center_y = px, py # text_x = center_x - text_w // 2 # text_y = center_y - text_h // 2 # draw.text((text_x + 1, text_y + 1), label, font=font, fill=(255, 255, 255, 255)) # draw.text((text_x, text_y), label, font=font, fill=(0, 0, 0, 255)) filename = f"{bib}_shots_{count_point}" buf = BytesIO() try: base_image = base_image.transpose(Image.ROTATE_90) base_image.save(buf, format="PNG", compress_level=1) except Exception as e: logger.warning(f"[shotmap] не удалось сохранить shotmap в память: {e}") return "" data = buf.getvalue() SHOTMAP_CACHE[filename] = data # кладём в RAM # относительный путь для HTTP-эндпоинта public_url = f"/image/{filename}" return public_url def shotmap_worker(): """ Фоновый поток: следит за latest_data['game'] и пересчитывает карты бросков по каждому startNum. Картинки лежат только в RAM (SHOTMAP_CACHE/SHOTMAPS). """ last_counts: dict[int, int] = {} while not stop_event.is_set(): try: game = get_latest_game_safe("game") if not game: time.sleep(1) continue # в get_latest_game_safe уже нормализована структура, # но на всякий случай ещё раз берём data/result game_data = game.get("data") if isinstance(game, dict) else None if not isinstance(game_data, dict): time.sleep(1) continue result = game_data.get("result") or {} plays = result.get("plays") or [] # собираем все броски по startNum shots_by_startnum: dict[int, list[tuple]] = {} for ev in plays: play_code = ev.get("play") # 2,3 — попали; 5,6 — промахи if play_code not in (2, 3, 5, 6): continue sn = ev.get("startNum") if sn is None: continue x = ev.get("x") y = ev.get("y") if x is None or y is None: continue sec = ev.get("sec") period = ev.get("period") is_made = play_code in (2, 3) shots_by_startnum.setdefault(sn, []).append( (x, y, is_made, sec, period) ) # обновляем только тех, у кого поменялось количество бросков for sn, points in shots_by_startnum.items(): count = len(points) if count == 0: continue if last_counts.get(sn) == count: # количество бросков и так то же — картинка уже есть continue version = str(count) bib = str(sn) # get_image: # - рисует shotmap # - кладёт PNG bytes в SHOTMAP_CACHE[filename] # - возвращает полный URL вида f"{FQDN}/image/{filename}" url = get_image(points, bib, version) if not url: continue with SHOTMAPS_LOCK: SHOTMAPS[sn] = { "count": count, "url": url, # готовый путь, который можно отдавать в vMix } last_counts[sn] = count except Exception as e: logger.warning(f"[shotmap_worker] error: {e}") time.sleep(1) @app.get("/image/{player_id_shots}") async def get_shotmap_image(player_id_shots: str): """ Отдаёт картинку карты бросков из оперативной памяти. player_id_shots должен совпадать с ключом в SHOTMAP_CACHE, например "23_shots". """ # async with SHOTMAPS_LOCK: data = SHOTMAP_CACHE.get(player_id_shots) if not data: raise HTTPException(status_code=404, detail="Shotmap not found in memory") return Response(content=data, media_type="image/png") @app.get("/last_5_games") async def last_5_games(): # достаём актуальный game game = get_latest_game_safe("game") if not game: raise HTTPException(status_code=503, detail="game data not ready") game_data = game["data"] if "data" in game else game result = game_data.get("result", {}) or {} team1_info = result.get("team1") or {} team2_info = result.get("team2") or {} team1_id = team1_info.get("teamId") team2_id = team2_info.get("teamId") team1_name = team1_info.get("name", "") team2_name = team2_info.get("name", "") if not team1_id or not team2_id: raise HTTPException(status_code=503, detail="team ids not ready") if not CALENDAR or "items" not in CALENDAR: raise HTTPException(status_code=503, detail="calendar data not ready") final_states = {"result", "resultconfirmed", "finished"} # последние N завершённых игр по команде def collect_last_games_for_team(team_id: int, limit: int = 5): matches = [] for item in CALENDAR["items"]: game_info = item.get("game") or {} if not game_info: continue status_raw = str(game_info.get("gameStatus", "") or "").lower() if status_raw not in final_states: # пропускаем незавершённые матчи continue t1 = item.get("team1") or {} t2 = item.get("team2") or {} if team_id not in (t1.get("teamId"), t2.get("teamId")): continue # пропускаем текущий матч gid = game_info.get("id") if gid is not None and GAME_ID is not None and str(gid) == str(GAME_ID): continue dt_str = game_info.get("defaultZoneDateTime") or "" try: dt = datetime.fromisoformat(dt_str) except ValueError: continue matches.append({"dt": dt, "item": item}) matches.sort(key=lambda x: x["dt"], reverse=True) return [m["item"] for m in matches[:limit]] # считаем W/L для списка матчей команды def calc_results_list(games: list[dict], team_id: int): wl_list = [] for item in games: game_info = item.get("game") or {} t1 = item.get("team1") or {} t2 = item.get("team2") or {} score1 = game_info.get("score1") score2 = game_info.get("score2") id1 = t1.get("teamId") id2 = t2.get("teamId") is_win = None if isinstance(score1, (int, float)) and isinstance(score2, (int, float)): if team_id == id1: is_win = score1 > score2 elif team_id == id2: is_win = score2 > score1 wl_list.append( r"D:\Графика\БАСКЕТБОЛ\VTB League\ASSETS\LAST 5 GAMES\W.png" if is_win else ( r"D:\Графика\БАСКЕТБОЛ\VTB League\ASSETS\LAST 5 GAMES\L.png" if is_win is not None else EMPTY_PHOTO_PATH ) ) return wl_list[::-1] # ищем СЛЕДУЮЩИЙ матч (ближайший в будущем) для команды def find_next_game(team_id: int): if not CALENDAR or "items" not in CALENDAR: return {"opponent": "", "date": "", "place": "", "place_ru": ""} WEEKDAYS_EN = [ "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday", ] MONTHS_EN = [ "january", "february", "march", "april", "may", "june", "july", "august", "september", "october", "november", "december", ] now = datetime.now() # наивное "сейчас" best = None # {"dt": ..., "opp": ..., "place": "home"/"away"} for item in CALENDAR["items"]: game_info = item.get("game") or {} t1 = item.get("team1") or {} t2 = item.get("team2") or {} if team_id not in (t1.get("teamId"), t2.get("teamId")): continue dt_str = game_info.get("defaultZoneDateTime") or "" try: dt = datetime.fromisoformat(dt_str) except ValueError: continue # убираем tzinfo, чтобы можно было сравнивать с now if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) # только будущие матчи if dt <= now: continue # определяем соперника и место if team_id == t1.get("teamId"): opp_name = t2.get("name", "") place = "home" # команда дома else: opp_name = t1.get("name", "") place = "away" # команда в гостях if best is None or dt < best["dt"]: best = {"dt": dt, "opp": opp_name, "place": place} if not best: return {"opponent": "", "date": "", "place": "", "place_ru": ""} place_ru = "дома" if best["place"] == "home" else "в гостях" # 🆕 формируем английскую строку dt = best["dt"] weekday_en = WEEKDAYS_EN[dt.weekday()] # monday..sunday month_en = MONTHS_EN[dt.month - 1] # january..december day = dt.day # 1..31 place_en = "home" if best["place"] == "home" else "away" formatted = ( f"{weekday_en}, {month_en} {day}, at {place_en} against {best['opp']}" ) return { "opponent": best["opp"], "date": best["dt"].strftime("%Y-%m-%d %H:%M"), "place": best["place"], # "home" / "away" "place_ru": place_ru, # "дома" / "в гостях" "formatted": formatted, # 🆕 "wednesday, march 26, at home against astana" } # последние 5 игр и результаты team1_games = collect_last_games_for_team(team1_id) team2_games = collect_last_games_for_team(team2_id) team1_wl = calc_results_list(team1_games, team1_id) team2_wl = calc_results_list(team2_games, team2_id) # следующий матч next1 = find_next_game(team1_id) next2 = find_next_game(team2_id) data = [ { "teamName": team1_name, "teamId": team1_id, "team_results": team1_wl, "nextOpponent": next1["opponent"], "nextGameDate": next1["date"], "nextGamePlace": next1["place_ru"], # "дома" / "в гостях" "nextGameHomeAway": next1["place"], # "home" / "away" (если нужно в логике) "nextGameFormatted": next1["formatted"], }, { "teamName": team2_name, "teamId": team2_id, "team_results": team2_wl, "nextOpponent": next2["opponent"], "nextGameDate": next2["date"], "nextGamePlace": next2["place_ru"], "nextGameHomeAway": next2["place"], "nextGameFormatted": next2["formatted"], }, ] return data @app.get("/commentary", response_class=HTMLResponse) async def commentary(): game_wrap = get_latest_game_safe("game") if not game_wrap: return HTMLResponse( "

Данные матча ещё не готовы

", status_code=503, ) game_data = game_wrap["data"] if "data" in game_wrap else game_wrap result = game_data.get("result", {}) or {} game_info = result.get("game", {}) or {} team1_info = result.get("team1", {}) or {} team2_info = result.get("team2", {}) or {} team1_name = team1_info.get("name", "Team 1") team2_name = team2_info.get("name", "Team 2") score_now = game_info.get("score", "") full_score = game_info.get("fullScore", "") # live-status ls_wrap = latest_data.get("live-status", {}) ls_raw = ls_wrap.get("data", {}) if isinstance(ls_wrap, dict) else {} ls_dict = ls_raw.get("result") or ls_raw if isinstance(ls_raw, dict) else {} live_status = ( ls_dict.get("status") or ls_dict.get("gameStatus") or ls_dict.get("state") or "—" ) # счёт по четвертям quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"] score_rows = [] if isinstance(full_score, str) and full_score: fs_list = [x.strip() for x in full_score.split(",") if x.strip()] for i, q in enumerate(quarters[: len(fs_list)]): parts = fs_list[i].split(":") s1, s2 = (parts + ["", ""])[:2] score_rows.append((q, s1, s2)) # как в /team1 и /team2 team1_players = await team("team1") team2_players = await team("team2") team1_json = json.dumps(team1_players, ensure_ascii=False) team2_json = json.dumps(team2_players, ensure_ascii=False) def render_players_table(players, title, team_key: str) -> str: header = """ """ rows = [] for p in players: # только игроки if p.get("startRole") not in ("Player", ""): continue num = p.get("num", "") pid = p.get("id", "") name = p.get("NameGFX") or p.get("name", "") pts = p.get("pts", "") reb = p.get("reb", (p.get("dreb", 0) or 0) + (p.get("oreb", 0) or 0)) ast = p.get("ast", "") stl = p.get("stl", "") blk = p.get("blk", "") time_played = p.get("time", "") fg = p.get("fg", "") pt2 = p.get("pt-2", "") pt3 = p.get("pt-3", "") pt1 = p.get("pt-1", "") to = p.get("to", "") foul = p.get("foul", "") plus_minus = p.get("plusMinus", "") kpi = p.get("kpi", "") rows.append( f""" """ ) return f"""

{title}

{header} {''.join(rows)}
# Игрок PTS REB AST STL BLK MIN FG 2PT 3PT FT TO Foul +/- KPI
{num} {name} {pts} {reb} {ast} {stl} {blk} {time_played} {fg} {pt2} {pt3} {pt1} {to} {foul} {plus_minus} {kpi}
""" # (pbp можно добавить сюда, я его пока опустил, чтобы не раздувать код) pbp_html = "" game_time_str = GAME_START_DT.strftime("%d.%m.%Y %H:%M") if GAME_START_DT else "N/A" html = f""" Комментаторский дашборд

{team1_name} vs {team2_name}

Счёт: {score_now or "—"} • Статус: {live_status} • Начало: {game_time_str}

Счёт по четвертям

{''.join(f"".format(q=q, s1=s1, s2=s2) for (q, s1, s2) in score_rows)}
Период{team1_name}{team2_name}
{{q}}{{s1}}{{s2}}
{render_players_table(team1_players, team1_name, "team1")}
{render_players_table(team2_players, team2_name, "team2")}
Кликни по фамилии игрока, чтобы показать сезон/карьеру.
{pbp_html} """ return HTMLResponse(content=html) @app.get("/dashboard", response_class=HTMLResponse) async def dashboard(): """ HTML-дашборд для комментаторов: - слева/справа: данные по командам и игрокам - по центру: сравнительная командная статистика Берём данные только из latest_data (game + team_stats-подобные агрегаты). """ game_wrap = get_latest_game_safe("game") if not game_wrap: return HTMLResponse( "

Данные матча ещё не готовы

", status_code=503, ) game_data = game_wrap["data"] if "data" in game_wrap else game_wrap result = game_data.get("result", {}) or {} game_info = result.get("game", {}) or {} team1_info = result.get("team1", {}) or {} team2_info = result.get("team2", {}) or {} score_now = game_info.get("score", "") full_score = game_info.get("fullScore", "") team1_name = team1_info.get("name", "Team 1") team2_name = team2_info.get("name", "Team 2") # --- счёт по четвертям, как в game.fullScore --- quarters_labels = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"] quarter_rows = [] if isinstance(full_score, str) and full_score: parts = [p.strip() for p in full_score.split(",") if p.strip()] for i, part in enumerate(parts): if i >= len(quarters_labels): break sep = ":" if ":" in part else "-" # на всякий случай left, right = (part.split(sep) + ["", ""])[:2] quarter_rows.append((quarters_labels[i], left.strip(), right.strip())) # --- агрегированная командная статистика (как /team_stats) --- teams = result.get("teams") or [] plays = result.get("plays") or [] team_1 = next((t for t in teams if t.get("teamNumber") == 1), None) team_2 = next((t for t in teams if t.get("teamNumber") == 2), None) if not team_1 or not team_2: return HTMLResponse( "

Нет данных по командам в latest_data['game']

", status_code=503, ) timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays) avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1.get("starts", [])) avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2.get("starts", [])) total_1 = add_new_team_stat( team_1.get("total", {}), avg_age_1, points_1, avg_height_1, timeout_str1, timeout_left1, ) total_2 = add_new_team_stat( team_2.get("total", {}), avg_age_2, points_2, avg_height_2, timeout_str2, timeout_left2, ) # словарь "метрика -> русское имя" из stat_name_list stat_labels = {name: rus for (name, rus, eng) in stat_name_list} # порядок вывода метрик в центральном столбце (как на твоём дашборде) center_stats_order = [ "pt-1", # штрафные "pt-1_pro", "pt-2", "pt-2_pro", "pt-3", "pt-3_pro", "fg", "fg_pro", "defReb", "offReb", "Reb", "assist", "steal", "turnover", "block", "foul", "dunk", "fastBreak", "ptsStart", "ptsStart_pro", "ptsBench", "ptsBench_pro", ] center_rows_html = [] for key in center_stats_order: v1 = total_1.get(key, "") v2 = total_2.get(key, "") if v1 == "" and v2 == "": continue label_rus = stat_labels.get(key, key) center_rows_html.append( f"" f"{v1}" f"{label_rus}" f"{v2}" f"" ) center_stats_html = "".join(center_rows_html) # --- данные по игрокам для левой / правой таблиц --- team1_players = await team("team1") team2_players = await team("team2") def pick_coach(players_list): for p in players_list: if p.get("startRole") == "Coach": return p return None coach1 = pick_coach(team1_players) coach2 = pick_coach(team2_players) coach1_name = (coach1 or {}).get("NameGFX") or (coach1 or {}).get("name", "") coach2_name = (coach2 or {}).get("NameGFX") or (coach2 or {}).get("name", "") avg_height1 = total_1.get("avgHeight", "") avg_age1 = total_1.get("avgAge", "") avg_height2 = total_2.get("avgHeight", "") avg_age2 = total_2.get("avgAge", "") timeouts_left1 = total_1.get("timeout_left", "") timeouts_left2 = total_2.get("timeout_left", "") def render_players_column(players_list, team_key): rows = [] for p in players_list: if p.get("startRole") != "Player": continue num = p.get("num", "") name = p.get("NameGFX") or p.get("name", "") pts = p.get("pts", "") fouls = p.get("foul", "") is_on = bool(p.get("isOnCourt")) # <-- тут берём флаг ball = "🏀" if is_on else "" rows.append( f""" {num} {ball} {name} {pts} {fouls} """ ) return "".join(rows) team1_players_html = render_players_column(team1_players, "team1") team2_players_html = render_players_column(team2_players, "team2") html = f""" Game Dashboard
HEAD COACH
{coach1_name}
TIME-OUTS LEFT: {timeouts_left1}
AVG. HEIGHT: {avg_height1}
AVG. AGE: {avg_age1}
{team1_players_html}
# PLAYER PTS FOULS
{team1_name} vs {team2_name}
{score_now}
{''.join(f"" for (q, s1, s2) in quarter_rows)}
{team1_name}{team2_name}
{q}{s1}{s2}
{center_stats_html}
HEAD COACH
{coach2_name}
TIME-OUTS LEFT: {timeouts_left2}
AVG. HEIGHT: {avg_height2}
AVG. AGE: {avg_age2}
{team2_players_html}
# PLAYER PTS FOULS
""" return HTMLResponse(content=html) @app.get("/game_history") async def game_history(): pregame = get_latest_game_safe("pregame") if not pregame: return [{"Данных об истории команд нет!"}] pregame_data = pregame["data"] if "data" in pregame else pregame result = pregame_data.get("result", {}).get("gameHistory", {}) or {} history = [] for row in result: row_team1 = get_excel_row_for_team(row["team1"]["name"]) or {} row_team2 = get_excel_row_for_team(row["team2"]["name"]) or {} history.append( { "team1": row_team1.get("TeamTLA", ""), "team2": row_team2.get("TeamTLA", ""), "team1_logo": row_team1.get("TeamLogo", ""), "team2_logo": row_team2.get("TeamLogo", ""), "score1": row["game"]["score1"], "score2": row["game"]["score2"], "localDate": row["game"]["localDate"], "team1Win": pregame_data.get("result", {}).get("team1Win"), "team2Win": pregame_data.get("result", {}).get("team2Win"), } ) return history if __name__ == "__main__": uvicorn.run( "get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="debug" )