from fastapi import FastAPI, HTTPException, Request from fastapi.responses import Response, HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from contextlib import asynccontextmanager import requests, uvicorn, json import threading, queue import argparse import pandas as pd from datetime import datetime, time as dtime, timedelta from fastapi.responses import Response import logging import logging.config from dotenv import load_dotenv from pprint import pprint import nasio import io, os, platform, time import xml.etree.ElementTree as ET import re from PIL import Image, ImageDraw, ImageFont from io import BytesIO parser = argparse.ArgumentParser() parser = argparse.ArgumentParser() parser.add_argument("--league", default="vtb") parser.add_argument("--team", required=True) parser.add_argument("--lang", default="en") args = parser.parse_args() MYHOST = platform.node() if not os.path.exists("logs"): os.makedirs("logs") def get_fqdn(): system_name = platform.system() if system_name == "Linux": hostname = platform.node() fqdn = f"https://{hostname}.tvstart.ru" else: fqdn = "http://127.0.0.1" return fqdn FQDN = get_fqdn def get_fqdn(): system_name = platform.system() if system_name == "Linux": hostname = platform.node() fqdn = f"https://{hostname}.tvstart.ru" else: fqdn = "http://127.0.0.1" return fqdn FQDN = get_fqdn telegram_bot_token = os.getenv("TELEGRAM_TOKEN") telegram_chat_id = str(os.getenv("TELEGRAM_CHAT_ID")) log_config = { "version": 1, "handlers": { "telegram": { "class": "telegram_handler.TelegramHandler", "level": "INFO", "token": telegram_bot_token, "chat_id": telegram_chat_id, "formatter": "telegram", }, "console": { "class": "logging.StreamHandler", "level": "INFO", "formatter": "simple", "stream": "ext://sys.stdout", }, "file": { "class": "logging.FileHandler", "level": "DEBUG", "formatter": "simple", "filename": f"logs/GFX_{MYHOST}.log", "encoding": "utf-8", }, }, "loggers": { __name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"}, }, "formatters": { "telegram": { "class": "telegram_handler.HtmlFormatter", "format": f"%(levelname)s [{MYHOST.upper()}]\n%(message)s", "use_emoji": "True", }, "simple": { "class": "logging.Formatter", "format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s", "datefmt": "%d.%m.%Y %H:%M:%S", }, }, } logging.config.dictConfig(log_config) logger = logging.getLogger(__name__) logger.handlers[2].formatter.use_emoji = True pprint(f"Локальный файл окружения ={load_dotenv(verbose=True)}") LEAGUE = args.league TEAM = args.team LANG = args.lang HOST = os.getenv("API_BASE_URL") SYNO_PATH_EXCEL = f'{os.getenv("SYNO_PATH_EXCEL")}MATCH INFO.xlsx' SYNO_URL = os.getenv("SYNO_URL") SYNO_USERNAME = os.getenv("SYNO_USERNAME") SYNO_PASSWORD = os.getenv("SYNO_PASSWORD") SYNO_PATH_VMIX = os.getenv("SYNO_PATH_VMIX") SYNO_FONT_PATH = os.getenv("SYNO_FONT_PATH") _syno_font_path = nasio.load_bio( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=os.getenv("SYNO_FONT_PATH"), ) if isinstance(_syno_font_path, BytesIO): _syno_font_path = _syno_font_path.getvalue() SYNO_FONT_PATH = _syno_font_path # bytes или None # ---- ИКОНКА ПРОМАХА ---- _syno_miss_raw = nasio.load_bio( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=os.getenv("SYNO_MISS"), ) if isinstance(_syno_miss_raw, BytesIO): _syno_miss_raw = _syno_miss_raw.getvalue() SYNO_MISS = _syno_miss_raw # bytes или None # ---- ИКОНКА ПОПАДАНИЯ ---- _syno_goal_raw = nasio.load_bio( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=os.getenv("SYNO_GOAL"), ) if isinstance(_syno_goal_raw, BytesIO): _syno_goal_raw = _syno_goal_raw.getvalue() SYNO_GOAL = _syno_goal_raw # bytes или None SHOTMAP_SUBDIR = "shotmaps" SHOTMAP_DIR = os.path.join(os.getcwd(), "shotmaps") os.makedirs(SHOTMAP_DIR, exist_ok=True) try: for name in os.listdir(SHOTMAP_DIR): fp = os.path.join(SHOTMAP_DIR, name) if os.path.isfile(fp): os.remove(fp) logger.info(f"[shotmap] очищена папка {SHOTMAP_DIR} при старте") except Exception as e: logger.warning(f"[shotmap] не удалось очистить {SHOTMAP_DIR}: {e}") CALENDAR = None STATUS = False GAME_ID = None SEASON = None GAME_START_DT = None # datetime начала матча (локальная из календаря) GAME_TODAY = False # флаг: игра сегодня GAME_SOON = False # флаг: игра сегодня и скоро (<1 часа) OFFLINE_SWITCH_AT = None # timestamp, когда надо уйти в оффлайн OFFLINE_DELAY_SEC = 600 # 10 минут SLEEP_NOTICE_SENT = False # 👈 чтобы не слать уведомление повторно # --- preload lock --- PRELOAD_LOCK = False # когда True — consumer будет принимать только preloaded game PRELOADED_GAME_ID = None # ID матча, который мы держим «тёплым» PRELOAD_HOLD_UNTIL = None # timestamp, до какого момента держим (T-1:15) # 🔥 кэш картинок в оперативной памяти SHOTMAP_CACHE: dict[str, bytes] = {} # общая очередь results_q = queue.Queue() # тут будем хранить последние данные latest_data = {} # событие для остановки потоков stop_event = threading.Event() # отдельные события для разных наборов потоков stop_event_live = threading.Event() stop_event_offline = threading.Event() # чтобы из consumer можно было их гасить threads_live = [] threads_offline = [] # какой режим сейчас запущен: "live" / "offline" / None CURRENT_THREADS_MODE = None CLEAR_OUTPUT_FOR_VMIX = False EMPTY_PHOTO_PATH = r"D:\Графика\БАСКЕТБОЛ\VTB League\ASSETS\EMPTY.png" URLS = { "seasons": "{host}api/abc/comps/seasons?Tag={league}", "actual-standings": "{host}api/abc/comps/actual-standings?tag={league}&season={season}&lang={lang}", "calendar": "{host}api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000", "game": "{host}api/abc/games/game?Id={game_id}&Lang={lang}", "pregame": "{host}api/abc/games/pregame?tag={league}&season={season}&id={game_id}&lang={lang}", "pregame-full-stats": "{host}api/abc/games/pregame-full-stats?tag={league}&season={season}&id={game_id}&lang={lang}", "live-status": "{host}api/abc/games/live-status?id={game_id}", "box-score": "{host}api/abc/games/box-score?id={game_id}", "play-by-play": "{host}api/abc/games/play-by-play?id={game_id}", } def maybe_clear_for_vmix(payload): """ Если включён режим очистки — возвращаем payload, где все значения заменены на "". Иначе — возвращаем как есть. """ if CLEAR_OUTPUT_FOR_VMIX: return wipe_json_values(payload) return payload def start_offline_threads(season, game_id): """Запускаем редкие запросы, когда матча нет или он уже сыгран.""" global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data if CURRENT_THREADS_MODE == "offline": logger.debug("[threads] already in OFFLINE mode → skip start_offline_threads") return stop_live_threads() stop_offline_threads() logger.info("[threads] switching to OFFLINE mode ...") stop_event_offline.clear() threads_offline = [ threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=game_id, lang=LANG), 150, # опрашиваем раз в секунду/реже stop_event_offline, False, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "pregame-full-stats", URLS["pregame-full-stats"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 300, stop_event_offline, False, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "actual-standings", URLS["actual-standings"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ), 300, stop_event_offline, False, True, ), daemon=True, ), ] for t in threads_offline: t.start() CURRENT_THREADS_MODE = "offline" logger.info("[threads] OFFLINE threads started (data cleaned)") def start_live_threads(season, game_id): """Запускаем частые онлайн-запросы, когда матч идёт/вот-вот.""" global threads_live, CURRENT_THREADS_MODE, stop_event_live # если уже в лайве — не дублируем if CURRENT_THREADS_MODE == "live": return # на всякий случай гасим офлайн stop_offline_threads() stop_event_live.clear() threads_live = [ threading.Thread( target=get_data_from_API, args=( "pregame", URLS["pregame"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 300, stop_event_live, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "pregame-full-stats", URLS["pregame-full-stats"].format( host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG ), 300, stop_event_live, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "actual-standings", URLS["actual-standings"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ), 300, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=game_id, lang=LANG), 150, # часто stop_event_live, True, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "live-status", URLS["live-status"].format(host=HOST, game_id=game_id), 0.5, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "box-score", URLS["box-score"].format(host=HOST, game_id=game_id), 0.5, stop_event_live, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "play-by-play", URLS["play-by-play"].format(host=HOST, game_id=game_id), 1, stop_event_live, ), daemon=True, ), ] for t in threads_live: t.start() CURRENT_THREADS_MODE = "live" logger.info("[threads] LIVE threads started") def stop_live_threads(): """Гасим только live-треды.""" global threads_live current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") if not threads_live: logger.info("[threads] LIVE threads stopped (nothing to stop)") return logger.info(f"[threads] Stopping {len(threads_live)} LIVE thread(s)...") stop_event_live.set() still_alive = [] for t in threads_live: t.join(timeout=2) if t.is_alive(): logger.warning( f"[{current_time}] [threads] LIVE thread is still alive: {t.name}" ) still_alive.append(t.name) threads_live = [] if still_alive: logger.warning( f"[{current_time}] [threads] Some LIVE threads did not stop: {still_alive}" ) else: logger.info("[threads] LIVE threads stopped") CURRENT_THREADS_MODE = None # 👈 сбрасываем режим def stop_offline_threads(): """Гасим только offline-треды.""" global threads_offline if not threads_offline: return stop_event_offline.set() for t in threads_offline: t.join(timeout=1) threads_offline = [] CURRENT_THREADS_MODE = None # 👈 сбрасываем режим logger.info("[threads] OFFLINE threads stopped") def has_full_game_ready() -> bool: game = latest_data.get("game") if not game: return False payload = game.get("data", game) return ( isinstance(payload, dict) and isinstance(payload.get("data"), dict) and isinstance(payload["data"].get("result"), dict) and "teams" in payload["data"]["result"] ) # Функция запускаемая в потоках def get_data_from_API( name: str, url: str, sleep_time: float, stop_event: threading.Event, stop_when_live=False, stop_after_success: bool = False, # 👈 новый флаг ): did_first_fetch = False while not stop_event.is_set(): current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") if ( stop_when_live and globals().get("STATUS") == "live" and has_full_game_ready() ): logger.info( f"{[{current_time}]} [{name}] stopping because STATUS='live' and full game is ready" ) break try: value = requests.get(url, timeout=5).json() did_first_fetch = True # помечаем, что один заход сделали except json.JSONDecodeError as json_err: logger.warning( f"[{current_time}] [{name}] Ошибка парсинга JSON: {json_err}" ) value = {"error": f"JSON decode error: {json_err}"} except requests.exceptions.Timeout: logger.warning(f"[{current_time}] [{name}] Таймаут при запросе {url}") value = {"error": "timeout"} except requests.exceptions.RequestException as req_err: logger.warning(f"[{current_time}] [{name}] Ошибка запроса: {req_err}") value = {"error": str(req_err)} except Exception as ex: logger.warning(f"[{current_time}] [{name}] Неизвестная ошибка: {ex}") value = {"error": str(ex)} # Проверяем, нет ли явного статуса ошибки в JSON if isinstance(value, dict) and str(value.get("status", "")).lower() in ( "error", "fail", "no-status", ): logger.warning( f"[{current_time}] [{name}] API вернул статус '{value.get('status')}'" ) ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] results_q.put({"source": name, "ts": ts, "data": value}) logger.debug(f"[{ts}] name: {name}, status: {value.get('status', 'no-status')}") ok_status = not ( isinstance(value, dict) and str(value.get("status", "")).lower() in ("error", "fail", "no-status") ) if stop_after_success and ok_status: logger.info( f"[{name}] got successful response → stopping thread (stop_after_success)" ) return slept = 0 while slept < sleep_time: if stop_event.is_set(): break if ( stop_when_live and globals().get("STATUS") == "live" and has_full_game_ready() ): logger.info( f"[{name}] stopping during sleep because STATUS='live' and full game is ready" ) return time.sleep(1) slept += 1 # если запрос занял дольше — просто сразу следующую итерацию # Получение результатов из всех запущенных потоков def results_consumer(): while not stop_event.is_set(): # ⬇️ проверяем, не пора ли в оффлайн (отложенный переход) current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") off_at = globals().get("OFFLINE_SWITCH_AT") if off_at is not None and time.time() >= off_at: # делаем переход ТОЛЬКО если ещё не оффлайн if globals().get("CURRENT_THREADS_MODE") != "offline": logger.info("[status] switching to OFFLINE (delayed)") stop_live_threads() start_offline_threads(SEASON, GAME_ID) # чтобы не повторять globals()["OFFLINE_SWITCH_AT"] = None try: msg = results_q.get(timeout=0.5) except queue.Empty: continue try: source = msg.get("source") payload = msg.get("data") or {} # универсальный статус (может не быть) incoming_status = payload.get("status") # может быть None # print(source, incoming_status) if source == "game": # принимаем ТОЛЬКО тот game_id, который держим в PRELOAD_LOCK try: if PRELOAD_LOCK: incoming_gid = extract_game_id_from_payload(payload) if not incoming_gid or str(incoming_gid) != str( PRELOADED_GAME_ID ): logger.debug( f"results_consumer: skip game (gid={incoming_gid}) due to PRELOAD_LOCK; keep {PRELOADED_GAME_ID}" ) continue except Exception as _e: logger.debug(f"results_consumer: preload lock check error: {_e}") else: latest_data[source] = { "ts": msg["ts"], "data": incoming_status if incoming_status is not None else payload, } # 1) play-by-play if "play-by-play" in source: game = latest_data.get("game") # если игра уже нормальная — приклеиваем плейи if ( game and isinstance(game, dict) and "data" in game and "result" in game["data"] ): # у pbp тоже может не быть data/result if "result" in payload: game["data"]["result"]["plays"] = payload["result"] # а вот статус у play-by-play иногда просто "no-status" # 2) box-score elif "box-score" in source: game = latest_data.get("game") if ( game and "data" in game and "result" in game["data"] and "teams" in game["data"]["result"] and "result" in payload and "teams" in payload["result"] ): # обновляем команды game["data"]["result"]["game"]["fullScore"] = payload["result"][ "fullScore" ] game["data"]["result"]["game"][ "score" ] = f'{payload["result"]["teams"][0]["total"]["points"]}:{payload["result"]["teams"][1]["total"]["points"]}' for team in game["data"]["result"]["teams"]: if team["teamNumber"] != 0: box_team = [ t for t in payload["result"]["teams"] if t["teamNumber"] == team["teamNumber"] ] if not box_team: print("ERROR: box-score team not found") continue box_team = box_team[0] for player in team["starts"]: box_player = [ p for p in box_team["starts"] if p["startNum"] == player["startNum"] ] if box_player: player["stats"] = box_player[0] team["total"] = box_team["total"] team["startTotal"] = box_team["startTotal"] team["benchTotal"] = box_team["benchTotal"] team["maxLeading"] = box_team["maxLeading"] team["pointsInRow"] = box_team["pointsInRow"] team["maxPointsInRow"] = box_team["maxPointsInRow"] elif "live-status" in source: latest_data[source] = { "ts": msg["ts"], "data": payload, } try: ls_data = payload.get("result") or payload raw_ls_status = ( ls_data.get("status") or ls_data.get("gameStatus") or ls_data.get("state") ) if raw_ls_status: raw_ls_status_low = str(raw_ls_status).lower() finished_markers = [ "finished", "result", "resultconfirmed", "ended", "final", "game over", ] # 1) матч ЗАКОНЧЕН → запускаем ОТСРОЧЕННЫЙ переход if any(m in raw_ls_status_low for m in finished_markers): now_ts = time.time() # если ещё не назначали переход — назначим if globals().get("OFFLINE_SWITCH_AT") is None: switch_at = now_ts + globals().get( "OFFLINE_DELAY_SEC", 600 ) globals()["OFFLINE_SWITCH_AT"] = switch_at # статус тоже обозначим, что он завершён, но ждёт if ( GAME_START_DT and GAME_START_DT.date() == datetime.now().date() ): globals()["STATUS"] = "finished_wait" globals()["CLEAR_OUTPUT_FOR_VMIX"] = False else: globals()["STATUS"] = "finished_wait" globals()["CLEAR_OUTPUT_FOR_VMIX"] = False human_time = datetime.fromtimestamp(switch_at).strftime( "%H:%M:%S" ) logger.info( f"[status] match finished → will switch to OFFLINE at {human_time} " f"(in {globals().get('OFFLINE_DELAY_SEC', 600)}s)" ) else: # уже ждём — можно в debug logger.debug( "[status] match finished → OFFLINE already scheduled" ) # 2) матч снова стал онлайном → СБРАСЫВАЕМ отложенный переход elif ( "online" in raw_ls_status_low or "live" in raw_ls_status_low ): # если до этого стояла отложка — уберём globals()[ "CLEAR_OUTPUT_FOR_VMIX" ] = False # 👈 выключаем очистку if globals().get("OFFLINE_SWITCH_AT") is not None: logger.info( "[status] match back to LIVE → cancel scheduled OFFLINE" ) globals()["OFFLINE_SWITCH_AT"] = None if globals().get("STATUS") != "live": logger.info( "[status] match became LIVE → switch to LIVE threads" ) globals()["STATUS"] = "live" start_live_threads(SEASON, GAME_ID) except Exception as e: logger.warning( f"[{current_time}] results_consumer: live-status postprocess error: {e}" ) else: if source == "game": has_game_already = "game" in latest_data and isinstance( latest_data.get("game"), dict ) # Полная структура? is_full = ( isinstance(payload, dict) and "data" in payload and isinstance(payload["data"], dict) and "result" in payload["data"] and "teams" in payload["data"]["result"] ) # ⚙️ ЛОГИКА: # 1) Пока матч НЕ online (STATUS != 'live'): обновляем всегда, # чтобы /status видел "живость" раз в 5 минут независимо от полноты JSON. if globals().get("STATUS") != "live": latest_data["game"] = {"ts": msg["ts"], "data": payload} logger.debug( "results_consumer: pre-live game → updated (full=%s)", is_full, ) else: # ✅ если игры ещё НЕТ в кэше — примем ПЕРВЫЙ game даже неполный, # чтобы box-score/play-by-play могли его дорастить if is_full or not has_game_already: latest_data["game"] = {"ts": msg["ts"], "data": payload} logger.debug( "results_consumer: LIVE → stored (full=%s, had=%s)", is_full, has_game_already, ) else: logger.debug( "results_consumer: LIVE & partial game → keep previous one" ) continue else: latest_data[source] = { "ts": msg["ts"], "data": payload, } continue # ... остальная обработка ... except Exception as e: logger.warning(f"[{current_time}] results_consumer error: {repr(e)}") continue def get_items(data: dict) -> list: """ Мелкий хелпер: берём первый список в ответе API. Многие ручки отдают {"result":[...]} или {"seasons":[...]}. Если находим список — возвращаем его. Если нет — возвращаем None (значит, нужно брать весь dict). """ for k, v in data.items(): if isinstance(v, list): return data[k] return None def pick_game_for_team(calendar_json): """ Возвращает: game_id: str | None game_dt: datetime | None is_today: bool cal_status: str | None # Scheduled / Online / Result / ResultConfirmed Логика: 1. если в календаре есть игра КОМАНДЫ на сегодня — берём ЕЁ и возвращаем её gameStatus 2. иначе — берём последнюю прошедшую и тоже возвращаем её gameStatus """ items = get_items(calendar_json) if not items: return None, None, False, None today = datetime.now().date() # 1) сначала — сегодняшняя for game in reversed(items): if game["team1"]["name"].lower() != TEAM.lower(): continue gdt = extract_game_datetime(game) gdate = ( gdt.date() if gdt else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date() ) if gdate == today: cal_status = game["game"].get("gameStatus") return game["game"]["id"], gdt, True, cal_status # 2) если на сегодня нет — берём последнюю прошедшую last_id = None last_dt = None last_status = None for game in reversed(items): if game["team1"]["name"].lower() != TEAM.lower(): continue gdt = extract_game_datetime(game) gdate = ( gdt.date() if gdt else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date() ) if gdate <= today: last_id = game["game"]["id"] last_dt = gdt last_status = game["game"].get("gameStatus") break return last_id, last_dt, False, last_status def extract_game_datetime(game_item: dict) -> datetime | None: """ Из элемента календаря достаём datetime матча. В календаре есть localDate и часто localTime. Если localTime нет — берём 00:00. """ try: date_str = game_item["game"]["localDate"] # '31.10.2025' dt_date = datetime.strptime(date_str, "%d.%m.%Y").date() time_str = game_item["game"].get("defaultZoneTime") # '19:30' if time_str: hh, mm = map(int, time_str.split(":")) dt_time = dtime(hour=hh, minute=mm) else: dt_time = dtime(hour=0, minute=0) return datetime.combine(dt_date, dt_time) except Exception: return None def build_pretty_status_message(): """ Собирает одно красивое сообщение про текущее состояние онлайна. Если game ещё нет — шлём хотя бы статусы источников. """ lines = [] cgid = get_cached_game_id() lines.append(f"🏀 {LEAGUE.upper()} • {TEAM}") lines.append(f"📌 Game ID: {cgid or GAME_ID}") lines.append(f"🕒 {GAME_START_DT}") # сначала попробуем собрать нормальный game game_wrap = latest_data.get("game") has_game = False if game_wrap: raw = game_wrap.get("data") if isinstance(game_wrap, dict) else game_wrap # raw может быть: dict (полный payload) | dict (уже result) | str ("ok"/"no-status") result = {} if isinstance(raw, dict): # ваш нормальный полный ответ по game имеет структуру: {"data": {"result": {...}}} # но на всякий случай поддержим и вариант, где сразу {"result": {...}} или уже {"game": ...} result = ( raw.get("data", {}).get("result", {}) if "data" in raw else (raw.get("result") or raw) ) else: result = {} game_info = result.get("game") or {} team1_name = (result.get("team1") or {}).get("name", "Team 1") team2_name = (result.get("team2") or {}).get("name", "Team 2") lines.append(f"👥 {team1_name} vs {team2_name}") score_now = game_info.get("score") or "" full_score = game_info.get("fullScore") or "" if score_now: lines.append(f"🔢 Score: {score_now}") if isinstance(full_score, str) and full_score: quarters = full_score.split(",") q_text = " | ".join(f"Q{i+1} {q}" for i, q in enumerate(quarters) if q) if q_text: lines.append(f"🧱 By quarters: {q_text}") has_game = bool(result) # live-status отдельно # ls = latest_data.get("live-status", {}) # ls_raw = ls.get("data") or {} # ls_status = ( # ls_raw.get("status") or ls_raw.get("gameStatus") or ls_raw.get("state") or "—" # ) # lines.append(f"🟢 LIVE status: {ls_status}") ls_wrap = latest_data.get("live-status") ls_status = "—" if ls_wrap: raw = ls_wrap.get("data") if isinstance(raw, dict): ls_dict = raw.get("result") or raw ls_status = ( ls_dict.get("status") or ls_dict.get("gameStatus") or ls_dict.get("state") or "—" ) elif isinstance(raw, str): # API/consumer могли положить просто строку статуса: "ok", "no-status", "error" ls_status = raw lines.append(f"🟢 LIVE status: {ls_status}") # добавим блок по источникам — это как раз “состояние запросов” sort_order = ["game", "live-status", "box-score", "play-by-play"] keys = [k for k in sort_order if k in latest_data] + sorted( [k for k in latest_data if k not in sort_order] ) src_lines = [] for k in keys: if "excel" not in k.lower(): d = latest_data.get(k) or {} ts = d.get("ts", "—") dat = d.get("data") if isinstance(dat, dict) and "status" in dat: st = str(dat["status"]).lower() else: st = str(dat).lower() # Эмодзи-кружки для статусов if any(x in st for x in ["ok", "success", "live", "online"]): emoji = "🟢" elif any( x in st for x in ["error", "fail", "no-status", "none", "timeout"] ): emoji = "🔴" else: emoji = "🟡" src_lines.append(f"{emoji} {k}: {st} ({ts})") if src_lines: lines.append("📡 Sources:") lines.extend(src_lines) # даже если game не успел — мы всё равно что-то вернём return "\n".join(lines) def status_broadcaster(): """ Если матч live — сразу шлём статус. Потом — раз в 5 минут. Если матч не live — ждём и проверяем снова. """ INTERVAL = 300 # 5 минут last_text = None first_live_sent = False while not stop_event.is_set(): # если игра не идёт — спим по чуть-чуть и крутимся if STATUS not in ("live", "live_soon"): first_live_sent = False # чтобы при новом лайве снова сразу отправить time.sleep(5) continue # сюда попадаем только если live text = build_pretty_status_message() if text and text != last_text: logger.info(text) last_text = text first_live_sent = True # после первого лайва ждём 5 минут, а до него — 10 секунд wait_sec = INTERVAL if first_live_sent else 10 for _ in range(wait_sec): if stop_event.is_set(): break time.sleep(1) def get_cached_game_id() -> str | None: game = latest_data.get("game") if not game: return None payload = game.get("data", game) if not isinstance(payload, dict): return None # структура может быть {"data":{"result":{...}}} или {"result":{...}} result = ( payload.get("data", {}).get("result") if "data" in payload else payload.get("result") ) if not isinstance(result, dict): return None g = result.get("game") if isinstance(g, dict): return g.get("id") return None def extract_game_id_from_payload(payload: dict) -> str | None: if not isinstance(payload, dict): return None root = payload.get("data") if isinstance(payload.get("data"), dict) else payload res = root.get("result") if isinstance(root.get("result"), dict) else None if not isinstance(res, dict): return None g = res.get("game") if isinstance(g, dict): return g.get("id") return None def start_offline_prevgame(season, prev_game_id: str): """ Специальный оффлайн для ПРЕДЫДУЩЕЙ игры: - гасит любые текущие треды - запускает только 'game' для prev_game_id - НЕ останавливается после первого 'ok' (stop_after_success=False) """ global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data # всегда переключаемся чисто stop_live_threads() stop_offline_threads() logger.info("[threads] switching to OFFLINE mode (previous game) ...") stop_event_offline.clear() threads_offline = [ threading.Thread( target=get_data_from_API, args=( "game", URLS["game"].format(host=HOST, game_id=prev_game_id, lang=LANG), 150, # редкий опрос stop_event_offline, False, # stop_when_live False, # ✅ stop_after_success=False (держим тред) ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "pregame-full-stats", URLS["pregame-full-stats"].format( host=HOST, league=LEAGUE, season=season, game_id=prev_game_id, lang=LANG, ), 600, stop_event_offline, False, False, ), daemon=True, ), threading.Thread( target=get_data_from_API, args=( "actual-standings", URLS["actual-standings"].format( host=HOST, league=LEAGUE, season=season, lang=LANG ), 600, stop_event_offline, False, False, ), daemon=True, ), ] for t in threads_offline: t.start() CURRENT_THREADS_MODE = "offline" logger.info(f"[threads] OFFLINE prev-game thread started for {prev_game_id}") def start_prestart_watcher(game_dt: datetime | None): """ Логика на день игры: 1) Немедленно подгружаем ДАННЫЕ ПРОШЛОГО МАТЧА (один раз, оффлайн-поток 'game'), чтобы программа имела данные до старта. 2) Ровно за 1:15 до начала — СБРАСЫВАЕМ эти данные (останавливаем оффлайн, чистим latest_data). 3) Ровно за 1:10 до начала — ВКЛЮЧАЕМ LIVE-треды. """ if not game_dt: return # разовое уведомление о "спячке" global SLEEP_NOTICE_SENT, STATUS, SEASON, GAME_ID now = datetime.now() if not SLEEP_NOTICE_SENT and game_dt > now: logger.info( "🛌 Тред ушёл в спячку до начала игры.\n" f"⏰ Матч начинается сегодня в {game_dt.strftime('%H:%M')}." ) SLEEP_NOTICE_SENT = True def _runner(): from datetime import time as dtime # для резервного парсинга времени global STATUS PRELOAD_LEAD = timedelta(hours=1, minutes=15) # T-1:15 → сброс LIVE_LEAD = timedelta(hours=1, minutes=10) # T-1:10 → live RESET_AT = game_dt - PRELOAD_LEAD LIVE_AT = game_dt - LIVE_LEAD PRELOAD_MAXWAIT_SEC = 180 # ждём до 3 мин готовности full game при предзагрузке did_preload = False did_reset = False did_live = False # --- вспомогательное: поиск предыдущей игры команды ДО сегодняшнего матча --- def _find_prev_game_id( calendar_json: dict, cutoff_dt: datetime ) -> tuple[str | None, datetime | None]: items = get_items(calendar_json) or [] prev_id, prev_dt = None, None team_norm = (TEAM or "").strip().casefold() for g in reversed(items): try: t1 = (g["team1"]["name"] or "").strip().casefold() if team_norm not in (t1): continue except Exception: continue print(g) gdt = extract_game_datetime(g) if not gdt: try: gd = datetime.strptime( g["game"]["localDate"], "%d.%m.%Y" ).date() gdt = datetime.combine(gd, dtime(0, 0)) except Exception: continue if gdt < cutoff_dt: prev_id, prev_dt = g["game"]["id"], gdt break return prev_id, prev_dt # --- Шаг 1: сразу включаем оффлайн по ПРЕДЫДУЩЕЙ игре и держим до T-1:15 --- try: now = datetime.now() if now < RESET_AT: calendar_resp = requests.get( URLS["calendar"].format( host=HOST, league=LEAGUE, season=SEASON, lang=LANG ), timeout=6, ).json() prev_game_id, prev_game_dt = _find_prev_game_id(calendar_resp, game_dt) # print(prev_game_id, prev_game_dt) if prev_game_id and str(prev_game_id) != str(GAME_ID): logger.info( f"[preload] старт оффлайна по предыдущей игре {prev_game_id} ({prev_game_dt})" ) # включаем «замок», чтобы consumer принимал только старую игру globals()["PRELOAD_LOCK"] = True globals()["PRELOADED_GAME_ID"] = str(prev_game_id) globals()["PRELOAD_HOLD_UNTIL"] = RESET_AT.timestamp() # поднимаем один оффлайн-тред по старой игре (без stop_after_success) start_offline_prevgame(SEASON, prev_game_id) did_preload = True else: logger.warning("[preload] предыдущая игра не найдена — пропускаем") else: logger.info( "[preload] уже поздно для предзагрузки (прошло T-1:15) — пропуск" ) except Exception as e: logger.warning(f"[preload] ошибка предзагрузки прошлой игры: {e}") # --- Основной цикл ожидания контрольных моментов --- while not stop_event.is_set(): now = datetime.now() # если матч уже в другом конечном состоянии — выходим if STATUS in ("live", "finished", "finished_wait", "finished_today"): break # Шаг 2: ровно T-1:15 — сбрасываем предзагруженные данные if not did_reset and now >= RESET_AT: logger.info( f"[reset] {now:%H:%M:%S} → T-1:15: сбрасываем предзагруженные данные" ) try: stop_offline_threads() # на всякий # for key in latest_data: # latest_data[key] = wipe_json_values(latest_data[key]) # latest_data.clear() # полный сброс кэша # снять замок предзагрузки globals()["PRELOAD_LOCK"] = False globals()["PRELOADED_GAME_ID"] = None globals()["PRELOAD_HOLD_UNTIL"] = None logger.info( "[reset] latest_data очищен; ждём T-1:10 для запуска live" ) except Exception as e: logger.warning(f"[reset] ошибка при очистке: {e}") globals()["CLEAR_OUTPUT_FOR_VMIX"] = True did_reset = True # Шаг 3: T-1:10 — включаем live-треды if not did_live and now >= LIVE_AT: logger.info( f"[prestart] {now:%H:%M:%S}, игра в {game_dt:%H:%M}, включаем LIVE threads по правилу T-1:10" ) STATUS = "live_soon" globals()[ "CLEAR_OUTPUT_FOR_VMIX" ] = False # можно оставить пустоту до первых живых данных stop_offline_threads() # на всякий случай start_live_threads(SEASON, GAME_ID) did_live = True break time.sleep(15) t = threading.Thread(target=_runner, daemon=True) t.start() def get_excel(): return nasio.load_formatted( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=SYNO_PATH_EXCEL, # sheet="TEAMS LEGEND", ) def excel_worker(): """ Раз в минуту читает ВСЕ вкладки Excel и сохраняет их в latest_data с префиксом excel_. """ global latest_data while not stop_event.is_set(): try: sheets = get_excel() # <- теперь это dict: {sheet_name: DataFrame} ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] if isinstance(sheets, dict): for sheet_name, df in sheets.items(): # пропускаем странные объекты if not hasattr(df, "fillna"): # logger.warning(f"[excel] Лист '{sheet_name}' не DataFrame") continue # ЧИСТИМ NaN и конвертируем df = df.fillna("") data_json = df.to_dict(orient="records") # ключ в latest_data: excel_<имя_вкладки> key = f"excel_{sheet_name}".replace(" ", "_").replace("-", "_") latest_data[key] = { "ts": ts, "data": data_json, } # logger.info("[excel] Все вкладки Excel обновлены") else: pass # logger.warning("[excel] get_excel() вернул не словарь") except Exception as e: logger.warning(f"[excel] ошибка при чтении Excel: {e}") # пауза 60 сек for _ in range(60): if stop_event.is_set(): break time.sleep(1) @asynccontextmanager async def lifespan(app: FastAPI): global STATUS, GAME_ID, SEASON, GAME_START_DT, GAME_TODAY, GAME_SOON, CALENDAR # 1. проверяем API: seasons try: seasons_resp = requests.get( URLS["seasons"].format(host=HOST, league=LEAGUE) ).json() season = seasons_resp["items"][0]["season"] except Exception: now = datetime.now() if now.month > 9: season = now.year + 1 else: season = now.year logger.info(f"предположили номер сезона: {season}") SEASON = season # 2. берём календарь try: calendar = requests.get( URLS["calendar"].format(host=HOST, league=LEAGUE, season=season, lang=LANG) ).json() except Exception as ex: logger.error(f"не получилось проверить работу API. код ошибки: {ex}") # тут можно вообще не запускать сервер, но оставим как есть calendar = None CALENDAR = calendar # 3. определяем игру game_id, game_dt, is_today, cal_status = ( pick_game_for_team(calendar) if calendar else (None, None, False, None) ) GAME_ID = game_id GAME_START_DT = game_dt GAME_TODAY = is_today logger.info(f"Лига: {LEAGUE}\nСезон: {season}\nКоманда: {TEAM}\nGame ID: {game_id}") thread_excel = threading.Thread( target=excel_worker, daemon=True, ) thread_excel.start() # 4. запускаем "длинные" потоки (они у тебя и так всегда) thread_result_consumer = threading.Thread( target=results_consumer, daemon=True, ) thread_result_consumer.start() thread_status_broadcaster = threading.Thread( target=status_broadcaster, daemon=True, ) thread_status_broadcaster.start() # 5. решаем, что запускать if not is_today: STATUS = "no_game_today" start_offline_threads(SEASON, GAME_ID) else: # игра сегодня # в любом случае запускаем сторож, если знаем время игры start_prestart_watcher(game_dt) if cal_status is None: STATUS = "today_not_started" start_offline_threads(SEASON, GAME_ID) elif cal_status == "Scheduled": if game_dt: delta = game_dt - datetime.now() # если мы уже МЕНЬШЕ чем за 1:10 до игры — сразу в live if delta <= timedelta(hours=1, minutes=10): STATUS = "live_soon" start_live_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" # start_offline_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" # start_offline_threads(SEASON, GAME_ID) elif cal_status == "Online": STATUS = "live" start_live_threads(SEASON, GAME_ID) elif cal_status in ["Result", "ResultConfirmed"]: STATUS = "finished_today" start_offline_threads(SEASON, GAME_ID) else: STATUS = "today_not_started" start_offline_threads(SEASON, GAME_ID) yield # -------- shutdown -------- stop_event.set() stop_live_threads() stop_offline_threads() thread_result_consumer.join(timeout=1) thread_status_broadcaster.join(timeout=1) thread_excel.join(timeout=1) app = FastAPI( lifespan=lifespan, docs_url=None, # ❌ отключает /docs redoc_url=None, # ❌ отключает /redoc openapi_url=None, # ❌ отключает /openapi.json ) # раздаём /shotmaps как статику из SHOTMAP_DIR app.mount("/shotmaps", StaticFiles(directory=SHOTMAP_DIR), name="shotmaps") # @app.get("/shotmaps/{filename}") # async def get_shotmap(filename: str): # data = SHOTMAP_CACHE.get(filename) # if not data: # # если вдруг перезапустился процесс или такой карты нет # raise HTTPException(status_code=404, detail="Shotmap not found") # return Response(content=data, media_type="image/png") def format_time(seconds: float | int) -> str: """ Удобный формат времени для игроков: 71 -> "1:11" 0 -> "0:00" Любые кривые значения -> "0:00". """ try: total_seconds = int(float(seconds)) minutes = total_seconds // 60 sec = total_seconds % 60 return f"{minutes}:{sec:02}" except (ValueError, TypeError): return "0:00" @app.get("/team1") async def team1(): game = get_latest_game_safe("game") if not game: # если данных вообще нет (ещё ни одной игры) — тут реально нечего отдавать raise HTTPException(status_code=503, detail="game data not ready") data = await team("team1") return maybe_clear_for_vmix(data) @app.get("/team2") async def team2(): game = get_latest_game_safe("game") if not game: raise HTTPException(status_code=503, detail="game data not ready") data = await team("team2") return maybe_clear_for_vmix(data) @app.get("/top_team1") async def top_team1(): data = await team("team1") top = await top_sorted_team(data) return maybe_clear_for_vmix(top) @app.get("/top_team2") async def top_team2(): data = await team("team2") top = await top_sorted_team(data) return maybe_clear_for_vmix(top) def _b(v) -> bool: if isinstance(v, bool): return v if isinstance(v, (int, float)): return v != 0 if isinstance(v, str): return v.strip().lower() in ("1", "true", "yes", "on") return False def _placeholders(n=5): return [ { "NameGFX": "", "Name1GFX": "", "Name2GFX": "", "isOnCourt": False, "num": "", "photoGFX": EMPTY_PHOTO_PATH, } for _ in range(n) ] def wipe_json_values(obj): """ Рекурсивно заменяет все значения JSON на пустые строки. Если ключ содержит "photo", заменяет значение на EMPTY_PHOTO_PATH. """ # если словарь — обрабатываем ключи if isinstance(obj, dict): new_dict = {} for k, v in obj.items(): if "photo" in str(k).lower(): # ключ содержит photo → отдаём пустую картинку new_dict[k] = EMPTY_PHOTO_PATH else: new_dict[k] = wipe_json_values(v) return new_dict # если список — рекурсивно обработать элементы elif isinstance(obj, list): return [wipe_json_values(v) for v in obj] # любое конечное значение → "" else: return "" @app.get("/started_team1") async def started_team1(sort_by: str = None): data = await team("team1") players = await started_team(data) or [] # нормализуем флаги for p in players: p["isStart"] = _b(p.get("isStart", False)) p["isOnCourt"] = _b(p.get("isOnCourt", False)) if sort_by and sort_by.strip().lower() == "isstart": starters = [p for p in players if p["isStart"]] return maybe_clear_for_vmix(starters[:5] if starters else _placeholders(5)) if sort_by and sort_by.strip().lower() == "isoncourt": on_court = [p for p in players if p["isOnCourt"]] return maybe_clear_for_vmix(on_court[:5] if on_court else _placeholders(5)) # дефолт — без фильтра, как раньше return maybe_clear_for_vmix(players) @app.get("/started_team2") async def started_team2(sort_by: str = None): data = await team("team2") players = await started_team(data) or [] for p in players: p["isStart"] = _b(p.get("isStart", False)) p["isOnCourt"] = _b(p.get("isOnCourt", False)) if sort_by and sort_by.strip().lower() == "isstart": starters = [p for p in players if p["isStart"]] return maybe_clear_for_vmix(starters[:5] if starters else _placeholders(5)) if sort_by and sort_by.strip().lower() == "isoncourt": on_court = [p for p in players if p["isOnCourt"]] return maybe_clear_for_vmix(on_court[:5] if on_court else _placeholders(5)) return maybe_clear_for_vmix(players) @app.get("/latest_data") async def game(): return latest_data @app.get("/status") async def status(request: Request): global STATUS # будем его править, если live-status свежее def color_for_status(status_value: str) -> str: """Подбор цвета статуса в HEX""" status_value = str(status_value).lower() if status_value in ["ok", "success", "live", "live_soon", "online"]: return "#00FF00" # зелёный elif status_value in ["scheduled", "today_not_started", "upcoming"]: return "#FFFF00" # жёлтый elif status_value in [ "result", "resultconfirmed", "finished", "finished_today", ]: return "#FF0000" # красный elif status_value in ["no_game_today", "unknown", "none"]: return "#FFFFFF" # белый else: return "#FF7700" # серый (неизвестный статус) # ✳️ сортируем latest_data в нужном порядке sort_order = ["game", "live-status", "box-score", "play-by-play"] sorted_keys = [k for k in sort_order if k in latest_data] + sorted( [k for k in latest_data if k not in sort_order] ) # убираем excel_* из списка ключей sorted_keys = [k for k in sorted_keys if "excel" not in k.lower()] cached_game_id = get_cached_game_id() or GAME_ID note = "" if cached_game_id and GAME_ID and str(cached_game_id) != str(GAME_ID): note = ( f' (предзагружены данные прошлой игры)' ) data = { "league": LEAGUE, "team": TEAM, "game_id": cached_game_id, "game_status": STATUS, "statuses": [ { "name": TEAM, "status": STATUS, "ts": ( GAME_START_DT.strftime("%Y-%m-%d %H:%M") if GAME_START_DT else "N/A" ), "link": LEAGUE, "color": color_for_status(STATUS), } ] + [ { "name": item, "status": ( latest_data[item]["data"]["status"] if isinstance(latest_data[item]["data"], dict) and "status" in latest_data[item]["data"] else latest_data[item]["data"] ), "ts": latest_data[item]["ts"], "link": URLS[item].format( host=HOST, league=LEAGUE, season=SEASON, lang=LANG, game_id=cached_game_id, ), "color": color_for_status( latest_data[item]["data"]["status"] if isinstance(latest_data[item]["data"], dict) and "status" in latest_data[item]["data"] else latest_data[item]["data"] ), } for item in sorted_keys ], } accept = request.headers.get("accept", "") if "text/html" in accept: status_raw = str(STATUS).lower() if status_raw in ["live", "online"]: gs_class = "live" gs_text = "🟢 LIVE" elif status_raw in ["live_soon", "today_not_started"]: gs_class = "live" gs_text = "🟢 GAME TODAY (soon)" elif status_raw in ["finished_wait"]: gs_class = "upcoming" off_at = OFFLINE_SWITCH_AT if off_at: human = datetime.fromtimestamp(off_at).strftime("%H:%M:%S") gs_text = f"🟡 Game finished, cooling down → OFFLINE at {human}" else: gs_text = "🟡 Game finished, cooling down" elif status_raw in ["finished_today", "finished"]: gs_class = "finished" gs_text = "🔴 Game finished" else: gs_class = status_raw gs_text = "⚪ Unknown" html = f"""

📊 Game Status Monitor

League: {LEAGUE}

Team: {TEAM}

Game ID: {cached_game_id}{note}

Game Status: {gs_text}

""" # ВАЖНО: цикл только добавляет строки, return будет ПОСЛЕ цикла for s in data["statuses"]: status_text = str(s["status"]).strip().lower() if any( x in status_text for x in ["ok", "success", "live", "live_soon", "online"] ): color_class = "ok" elif any( x in status_text for x in ["scheduled", "today_not_started", "upcoming"] ): color_class = "warn" elif any( x in status_text for x in ["result", "resultconfirmed", "finished", "finished_today"] ): color_class = "fail" else: color_class = "unknown" html += f""" """ # закрываем таблицу и страницу УЖЕ ПОСЛЕ цикла html += """
NameStatusTimestampLink
{s["name"]} {status_text} {s["ts"]} {s["link"]}
""" return HTMLResponse(content=html, media_type="text/html") # JSON для API (красиво отформатированный) formatted = json.dumps(data, indent=4, ensure_ascii=False) response = Response(content=formatted, media_type="application/json") response.headers["Refresh"] = "1" return maybe_clear_for_vmix(response) @app.get("/scores") async def scores(): game = get_latest_game_safe("game") if not game: # игры ещё нет или пришёл только частичный ответ # отдаём пустую структуру, чтобы фронт не падал return [ {"Q": "Q1", "score1": "", "score2": ""}, {"Q": "Q2", "score1": "", "score2": ""}, {"Q": "Q3", "score1": "", "score2": ""}, {"Q": "Q4", "score1": "", "score2": ""}, ] game_data = game["data"] if "data" in game else game result = game_data.get("result", {}) game_info = result.get("game", {}) full_score = game_info.get("fullScore") if not full_score: # поле есть, но ещё пустое/None return [ {"Q": "Q1", "score1": "", "score2": ""}, {"Q": "Q2", "score1": "", "score2": ""}, {"Q": "Q3", "score1": "", "score2": ""}, {"Q": "Q4", "score1": "", "score2": ""}, ] quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"] score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters] full_score_list = full_score.split(",") for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]): parts = score_str.split(":") if len(parts) == 2: score_by_quarter[i]["score1"] = parts[0] score_by_quarter[i]["score2"] = parts[1] return maybe_clear_for_vmix(score_by_quarter) async def top_sorted_team(data): top_sorted_team = sorted( (p for p in data if p.get("startRole") in ["Player", ""]), key=lambda x: ( x.get("pts", 0), x.get("dreb", 0) + x.get("oreb", 0), x.get("ast", 0), x.get("stl", 0), x.get("blk", 0), x.get("time", "0:00"), ), reverse=True, ) # пустые строки не должны ломать UI процентами фолов/очков for player in top_sorted_team: if player.get("num", "") == "": player["pts"] = "" player["foul"] = "" return top_sorted_team def get_latest_game_safe(name: str): """ Безопасно достаём актуальный game из latest_data. Возвращаем None, если структура ещё не готова или прилетел "плохой" game (например, с {"status": "no-status"} без data/result). """ game = latest_data.get(name) if not game: return None # у нас в latest_data["game"] лежит {"ts": ..., "data": {...}} или сразу {...} # в consumer мы клали {"ts": ..., "data": payload}, так что берём .get("data") if "data" in game: game_data = game["data"] else: # на всякий случай, если где-то клали сразу payload game_data = game if not isinstance(game_data, dict): return None result = game_data.get("result") if not result: return None # если всё ок — вернём в исходном виде (с ts и т.п.) return game def _pick_last_avg_and_sum(stats_list: list) -> tuple[dict, dict]: """Возвращает (season_sum, season_avg) из seasonStats. Безопасно при пустых данных.""" if not isinstance(stats_list, list) or len(stats_list) == 0: return {}, {} # В JSON конец массива: ... {"class":"Sum"}, {"class":"Avg"} last = stats_list[-1] if stats_list else None prev = stats_list[-2] if len(stats_list) >= 2 else None season_avg = ( last.get("stats", {}) if isinstance(last, dict) and str(last.get("class")).lower() == "avg" else {} ) season_sum = ( prev.get("stats", {}) if isinstance(prev, dict) and str(prev.get("class")).lower() == "sum" else {} ) # Бывают инверсии порядка (на всякий случай): попробуем найти явно if not season_avg or not season_sum: for x in reversed(stats_list): if ( isinstance(x, dict) and str(x.get("class")).lower() == "avg" and not season_avg ): season_avg = x.get("stats", {}) or {} if ( isinstance(x, dict) and str(x.get("class")).lower() == "sum" and not season_sum ): season_sum = x.get("stats", {}) or {} if season_avg and season_sum: break return season_sum, season_avg def _pick_career_sum_and_avg(carrier_list: list) -> tuple[dict, dict]: """Возвращает (career_sum, career_avg) из carrier. В API встречаются блоки с class: Normal/Sum/Avg.""" if not isinstance(carrier_list, list) or len(carrier_list) == 0: return {}, {} career_sum, career_avg = {}, {} # Ищем явные «Sum» и «Avg» for x in reversed(carrier_list): if isinstance(x, dict): cls = str(x.get("class", "")).lower() stats = x.get("stats", {}) or {} if cls == "sum" and not career_sum: career_sum = stats elif cls == "avg" and not career_avg: career_avg = stats if career_sum and career_avg: break # Если «Avg» нет (часто для карьеры бывает только Normal/Sum) — ок, оставим пустым return career_sum, career_avg def _as_int(v, default=0): try: # в JSON часто строки; пустые строки -> 0 if v in ("", None): return default return int(float(v)) except Exception: return default def _safe(d: dict) -> dict: return d if isinstance(d, dict) else {} async def team(who: str): """ Возвращает данные по команде (team1 / team2) из актуального game. Защищена от ситуации, когда latest_data["game"] ещё не прогрелся или в него прилетел "плохой" ответ от API. """ game = get_latest_game_safe("game") if not game: # игра ещё не подгружена или структура кривоватая raise HTTPException(status_code=503, detail="game data not ready") full_stat = get_latest_game_safe("pregame-full-stats") if not full_stat: # ⚠️ full_stat_data отсутствует — работаем только с game_data logger.debug( f"[{who}] full_stat_data not found → continuing with game_data only" ) full_stat_data = {} else: full_stat_data = full_stat["data"] if "data" in full_stat else full_stat # нормализуем доступ к данным game_data = game["data"] if "data" in game else game result = game_data[ "result" ] # здесь уже безопасно, мы проверили в get_latest_game_safe result_full = full_stat_data.get("result", {}) if full_stat_data else {} # в result ожидаем "teams" teams = result.get("teams") if not teams: raise HTTPException(status_code=503, detail="game teams not ready") # выбираем команду if who == "team1": payload = next((t for t in teams if t.get("teamNumber") == 1), None) payload_full = result_full.get("team1PlayersStats") if result_full else [] else: payload = next((t for t in teams if t.get("teamNumber") == 2), None) payload_full = result_full.get("team2PlayersStats") if result_full else [] if payload is None: raise HTTPException(status_code=404, detail=f"{who} not found in game data") # дальше — твоя исходная логика формирования ответа по команде # я не знаю весь твой оригинальный код ниже, поэтому вставляю каркас # и показываю, где нужно аккуратно брать plays/box-score из latest_data role_list = [ ("Center", "C"), ("Guard", "G"), ("Forward", "F"), ("Power Forward", "PF"), ("Small Forward", "SF"), ("Shooting Guard", "SG"), ("Point Guard", "PG"), ("Forward-Center", "FC"), ] starts = payload.get("starts", []) team_rows = [] for item in starts: stats = item.get("stats") or {} pid = str(item.get("personId")) full_obj = next( (p for p in (payload_full or []) if str(p.get("personId")) == pid), None ) season_sum = season_avg = career_sum = career_avg = {} if full_obj: # сезон season_sum, season_avg = _pick_last_avg_and_sum( full_obj.get("seasonStats") or [] ) # карьера career_sum, career_avg = _pick_career_sum_and_avg( full_obj.get("carrier") or [] ) season_sum = _safe(season_sum) season_avg = _safe(season_avg) career_sum = _safe(career_sum) career_avg = _safe(career_avg) # Полезные числа для Totals+Live # live-поля в box-score называются goal1/2/3, shot1/2/3, defReb/offReb и т.п. g1 = _as_int(stats.get("goal1")) s1 = _as_int(stats.get("shot1")) g2 = _as_int(stats.get("goal2")) s2 = _as_int(stats.get("shot2")) g3 = _as_int(stats.get("goal3")) s3 = _as_int(stats.get("shot3")) # Сезонные суммы из pregame-full-stats ss_pts = _as_int(season_sum.get("points")) ss_ast = _as_int(season_sum.get("assist")) ss_blk = _as_int(season_sum.get("blockShot")) ss_dreb = _as_int(season_sum.get("defRebound")) ss_oreb = _as_int(season_sum.get("offRebound")) ss_reb = _as_int(season_sum.get("rebound")) ss_stl = _as_int(season_sum.get("steal")) ss_to = _as_int(season_sum.get("turnover")) ss_foul = _as_int(season_sum.get("foul")) ss_sec = _as_int(season_sum.get("second")) ss_gms = _as_int(season_sum.get("games")) ss_st = _as_int(season_sum.get("isStarts")) ss_g1 = _as_int(season_sum.get("goal1")) ss_s1 = _as_int(season_sum.get("shot1")) ss_g2 = _as_int(season_sum.get("goal2")) ss_s2 = _as_int(season_sum.get("shot2")) ss_g3 = _as_int(season_sum.get("goal3")) ss_s3 = _as_int(season_sum.get("shot3")) # Карьерные суммы из pregame-full-stats car_ss_pts = _as_int(career_sum.get("points")) car_ss_ast = _as_int(career_sum.get("assist")) car_ss_blk = _as_int(career_sum.get("blockShot")) car_ss_dreb = _as_int(career_sum.get("defRebound")) car_ss_oreb = _as_int(career_sum.get("offRebound")) car_ss_reb = _as_int(career_sum.get("rebound")) car_ss_stl = _as_int(career_sum.get("steal")) car_ss_to = _as_int(career_sum.get("turnover")) car_ss_foul = _as_int(career_sum.get("foul")) car_ss_sec = _as_int(career_sum.get("second")) car_ss_gms = _as_int(career_sum.get("games")) car_ss_st = _as_int(career_sum.get("isStarts")) car_ss_g1 = _as_int(career_sum.get("goal1")) car_ss_s1 = _as_int(career_sum.get("shot1")) car_ss_g2 = _as_int(career_sum.get("goal2")) car_ss_s2 = _as_int(career_sum.get("shot2")) car_ss_g3 = _as_int(career_sum.get("goal3")) car_ss_s3 = _as_int(career_sum.get("shot3")) # Totals по сезону, «с учётом текущего матча»: T_points = ss_pts + _as_int(stats.get("points")) T_assist = ss_ast + _as_int(stats.get("assist")) T_block = ss_blk + _as_int(stats.get("block")) T_dreb = ss_dreb + _as_int(stats.get("defReb")) T_oreb = ss_oreb + _as_int(stats.get("offReb")) T_reb = ss_reb + (_as_int(stats.get("defReb")) + _as_int(stats.get("offReb"))) T_steal = ss_stl + _as_int(stats.get("steal")) T_turn = ss_to + _as_int(stats.get("turnover")) T_foul = ss_foul + _as_int(stats.get("foul")) T_sec = ss_sec + _as_int(stats.get("second")) T_gms = ss_gms + (1 if _as_int(stats.get("second")) > 0 else 0) T_starts = ss_st + (1 if bool(stats.get("isStart")) else 0) T_g1 = ss_g1 + g1 T_s1 = ss_s1 + s1 T_g2 = ss_g2 + g2 T_s2 = ss_s2 + s2 T_g3 = ss_g3 + g3 T_s3 = ss_s3 + s3 # Totals по карьере, «с учётом текущего матча»: car_T_points = car_ss_pts + _as_int(stats.get("points")) car_T_assist = car_ss_ast + _as_int(stats.get("assist")) car_T_block = car_ss_blk + _as_int(stats.get("block")) car_T_dreb = car_ss_dreb + _as_int(stats.get("defReb")) car_T_oreb = car_ss_oreb + _as_int(stats.get("offReb")) car_T_reb = car_ss_reb + ( _as_int(stats.get("defReb")) + _as_int(stats.get("offReb")) ) car_T_steal = car_ss_stl + _as_int(stats.get("steal")) car_T_turn = car_ss_to + _as_int(stats.get("turnover")) car_T_foul = car_ss_foul + _as_int(stats.get("foul")) car_T_sec = car_ss_sec + _as_int(stats.get("second")) car_T_gms = car_ss_gms + (1 if _as_int(stats.get("second")) > 0 else 0) car_T_starts = car_ss_st + (1 if bool(stats.get("isStart")) else 0) car_T_g1 = car_ss_g1 + g1 car_T_s1 = car_ss_s1 + s1 car_T_g2 = car_ss_g2 + g2 car_T_s2 = car_ss_s2 + s2 car_T_g3 = car_ss_g3 + g3 car_T_s3 = car_ss_s3 + s3 # Проценты (без деления на 0) def _pct(goal, shot): return f"{round(goal*100/shot, 1)}%" if shot else "0.0%" # Для «23» используем сумму 2-х и 3-х T_g23 = T_g2 + T_g3 T_s23 = T_s2 + T_s3 car_T_g23 = car_T_g2 + car_T_g3 car_T_s23 = car_T_s2 + car_T_s3 # print(avg_season, total_season) row = { "id": item.get("personId") or "", "num": item.get("displayNumber"), "startRole": item.get("startRole"), "role": item.get("positionName"), "roleShort": ( [ r[1] for r in role_list if r[0].lower() == (item.get("positionName") or "").lower() ][0] if any( r[0].lower() == (item.get("positionName") or "").lower() for r in role_list ) else "" ), "NameGFX": ( f"{(item.get('firstName') or '').strip()} {(item.get('lastName') or '').strip()}".strip() if item.get("firstName") is not None and item.get("lastName") is not None else "Команда" ), "Name1GFX": (item.get("firstName") or "").strip(), "Name2GFX": (item.get("lastName") or "").strip(), "captain": item.get("isCapitan", False), "age": item.get("age") or 0, "height": f"{item.get('height')} cm" if item.get("height") else 0, "weight": f"{item.get('weight')} kg" if item.get("weight") else 0, "isStart": stats.get("isStart", False), "isOn": "🏀" if stats.get("isOnCourt") is True else "", "isOnCourt": stats.get("isOnCourt", False), "flag": ( "https://flagicons.lipis.dev/flags/4x3/" + ( "ru" if item.get("countryId") is None and item.get("countryName") == "Russia" else ( "" if item.get("countryId") is None else ( (item.get("countryId") or "").lower() if item.get("countryName") is not None else "" ) ) ) + ".svg" ), "photoGFX": ( os.path.join( "D:\\Photos", result["league"]["abcName"], result[who]["name"], f"{item.get('displayNumber')}.png", ) if item.get("startRole") == "Player" else "" ), # live-стата "pts": _as_int(stats.get("points")), "pt-2": f"{g2}/{s2}", "pt-3": f"{g3}/{s3}", "pt-1": f"{g1}/{s1}", "fg": f"{g2+g3}/{s2+s3}", "ast": _as_int(stats.get("assist")), "stl": _as_int(stats.get("steal")), "blk": _as_int(stats.get("block")), "blkVic": _as_int(stats.get("blocked")), "dreb": _as_int(stats.get("defReb")), "oreb": _as_int(stats.get("offReb")), "reb": _as_int(stats.get("defReb")) + _as_int(stats.get("offReb")), "to": _as_int(stats.get("turnover")), "foul": _as_int(stats.get("foul")), "foulT": _as_int(stats.get("foulT")), "foulD": _as_int(stats.get("foulD")), "foulC": _as_int(stats.get("foulC")), "foulB": _as_int(stats.get("foulB")), "fouled": _as_int(stats.get("foulsOn")), "plusMinus": _as_int(stats.get("plusMinus")), "dunk": _as_int(stats.get("dunk")), "kpi": ( _as_int(stats.get("points")) + _as_int(stats.get("defReb")) + _as_int(stats.get("offReb")) + _as_int(stats.get("assist")) + _as_int(stats.get("steal")) + _as_int(stats.get("block")) + _as_int(stats.get("foulsOn")) + (g1 - s1) + (g2 - s2) + (g3 - s3) - _as_int(stats.get("turnover")) - _as_int(stats.get("foul")) ), "time": format_time(_as_int(stats.get("second"))), # сезон — средние (из последнего Avg) "AvgPoints": season_avg.get("points") or "0.0", "AvgAssist": season_avg.get("assist") or "0.0", "AvgBlocks": season_avg.get("blockShot") or "0.0", "AvgDefRebound": season_avg.get("defRebound") or "0.0", "AvgOffRebound": season_avg.get("offRebound") or "0.0", "AvgRebound": season_avg.get("rebound") or "0.0", "AvgSteal": season_avg.get("steal") or "0.0", "AvgTurnover": season_avg.get("turnover") or "0.0", "AvgFoul": season_avg.get("foul") or "0.0", "AvgOpponentFoul": season_avg.get("foulsOnPlayer") or "0.0", "AvgDunk": season_avg.get("dunk") or "0.0", "AvgPlayedTime": season_avg.get("playedTime") or "0:00", "Shot1Percent": season_avg.get("shot1Percent") or "0.0%", "Shot2Percent": season_avg.get("shot2Percent") or "0.0%", "Shot3Percent": season_avg.get("shot3Percent") or "0.0%", "Shot23Percent": season_avg.get("shot23Percent") or "0.0%", # сезон — Totals (суммы из Sum + live) "TPoints": T_points, "TShots1": f"{T_g1}/{T_s1}", "TShots2": f"{T_g2}/{T_s2}", "TShots3": f"{T_g3}/{T_s3}", "TShots23": f"{T_g23}/{T_s23}", "TShot1Percent": _pct(T_g1, T_s1), "TShot2Percent": _pct(T_g2, T_s2), "TShot3Percent": _pct(T_g3, T_s3), "TShot23Percent": _pct(T_g23, T_s23), "TAssist": T_assist, "TBlocks": T_block, "TDefRebound": T_dreb, "TOffRebound": T_oreb, "TRebound": T_reb, "TSteal": T_steal, "TTurnover": T_turn, "TFoul": T_foul, "TPlayedTime": format_time(T_sec), "TGameCount": T_gms, "TStartCount": T_starts, # карьера — средние (из последнего Avg) "Career_AvgPoints": career_avg.get("points") or "0.0", "Career_AvgAssist": career_avg.get("assist") or "0.0", "Career_AvgBlocks": career_avg.get("blockShot") or "0.0", "Career_AvgDefRebound": career_avg.get("defRebound") or "0.0", "Career_AvgOffRebound": career_avg.get("offRebound") or "0.0", "Career_AvgRebound": career_avg.get("rebound") or "0.0", "Career_AvgSteal": career_avg.get("steal") or "0.0", "Career_AvgTurnover": career_avg.get("turnover") or "0.0", "Career_AvgFoul": career_avg.get("foul") or "0.0", "Career_AvgOpponentFoul": career_avg.get("foulsOnPlayer") or "0.0", "Career_AvgDunk": career_avg.get("dunk") or "0.0", "Career_AvgPlayedTime": career_avg.get("playedTime") or "0:00", "Career_Shot1Percent": career_avg.get("shot1Percent") or "0.0%", "Career_Shot2Percent": career_avg.get("shot2Percent") or "0.0%", "Career_Shot3Percent": career_avg.get("shot3Percent") or "0.0%", "Career_Shot23Percent": career_avg.get("shot23Percent") or "0.0%", # карьера — Totals (суммы из Sum + live) "Career_TPoints": car_T_points, "Career_TShots1": f"{car_T_g1}/{car_T_s1}", "Career_TShots2": f"{car_T_g2}/{car_T_s2}", "Career_TShots3": f"{car_T_g3}/{car_T_s3}", "Career_TShots23": f"{car_T_g23}/{car_T_s23}", "Career_TShot1Percent": _pct(car_T_g1, car_T_s1), "Career_TShot2Percent": _pct(car_T_g2, car_T_s2), "Career_TShot3Percent": _pct(car_T_g3, car_T_s3), "Career_TShot23Percent": _pct(car_T_g23, car_T_s23), "Career_TAssist": car_T_assist, "Career_TBlocks": car_T_block, "Career_TDefRebound": car_T_dreb, "Career_TOffRebound": car_T_oreb, "Career_TRebound": car_T_reb, "Career_TSteal": car_T_steal, "Career_TTurnover": car_T_turn, "Career_TFoul": car_T_foul, "Career_TPlayedTime": format_time(car_T_sec), "Career_TGameCount": car_T_gms, "Career_TStartCount": car_T_starts, "startNum": stats.get("startNum"), "photoShotMapGFX": "", } team_rows.append(row) # добиваем до 12 строк, чтобы UI был ровный count_player = sum(1 for x in team_rows if x["startRole"] == "Player") if count_player < 12 and team_rows: filler_count = (4 if count_player <= 4 else 12) - count_player template_keys = list(team_rows[0].keys()) for _ in range(filler_count): empty_row = {} for key in template_keys: if key in ["captain", "isStart", "isOnCourt"]: empty_row[key] = False elif key in [ "id", "pts", "weight", "height", "age", "ast", "stl", "blk", "blkVic", "dreb", "oreb", "reb", "to", "foul", "foulT", "foulD", "foulC", "foulB", "fouled", "plusMinus", "dunk", "kpi", ]: empty_row[key] = 0 else: empty_row[key] = "" team_rows.append(empty_row) # сортируем игроков по типу роли: сначала "Player", потом "", потом "Coach" и т.д. role_priority = { "Player": 0, "": 1, "Coach": 2, "Team": 3, None: 4, "Other": 5, } sorted_team = sorted( team_rows, key=lambda x: role_priority.get(x.get("startRole", 99), 99), ) # --- 👇 ДОБАВЛЯЕМ КАРТЫ БРОСКОВ ПО startNum --- # # Берём play-by-play из текущего game, если он есть plays = result.get("plays") or [] # Собираем множество startNum текущей команды, чтобы не ловить чужих игроков team_startnums = { p.get("startNum") for p in payload.get("starts", []) if p.get("startRole") == "Player" } # startNum -> список (x, y, is_made) shots_by_startnum: dict[int, list[tuple[float, float, bool]]] = {} for ev in plays: play_code = ev.get("play") if play_code not in (2, 3, 5, 6): continue sn = ev.get("startNum") if sn not in team_startnums: continue x = ev.get("x") y = ev.get("y") if x is None or y is None: continue sec = ev.get("sec") period = ev.get("period") is_made = play_code in (2, 3) # 2,3 — точные, 5,6 — промахи shots_by_startnum.setdefault(sn, []).append((x, y, is_made, sec, period)) # Рендерим по картинке на каждого startNum shotmap_paths: dict[int, str] = {} for sn, points in shots_by_startnum.items(): # timestamp/версия — просто количество бросков этого игрока. # Увеличилось кол-во бросков → поменялся путь → vMix не возьмёт картинку из кеша. version = str(len(points)) bib = str(sn) # можно заменить на номер игрока, если нужно path = get_image(points, bib, version) shotmap_paths[sn] = path # Присоединяем пути к игрокам по startNum for row in sorted_team: sn = row.get("startNum") if sn in shotmap_paths: row["photoShotMapGFX"] = f"{shotmap_paths[sn]}" else: # если бросков не было — оставляем пустую строку row["photoShotMapGFX"] = "" return sorted_team async def started_team(data): return data def add_new_team_stat( data: dict, avg_age: float, points, avg_height: float, timeout_str: str, timeout_left: int, ) -> dict: """ Берёт словарь total по команде (очки, подборы, броски и т.д.), добавляет: - проценты попаданий - средний возраст / рост - очки старт / бенч - информацию по таймаутам и всё приводит к строкам (для UI, чтобы не ловить типы). Возвращает обновлённый словарь. """ def safe_int(v): try: return int(v) except (ValueError, TypeError): return 0 def format_percent(goal, shot): goal, shot = safe_int(goal), safe_int(shot) return f"{round(goal * 100 / shot)}%" if shot else "0%" goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1")) goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2")) goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3")) def_reb = safe_int(data.get("defReb")) off_reb = safe_int(data.get("offReb")) data.update( { "pt-1": f"{goal1}/{shot1}", "pt-2": f"{goal2}/{shot2}", "pt-3": f"{goal3}/{shot3}", "fg": f"{goal2 + goal3}/{shot2 + shot3}", "pt-1_pro": format_percent(goal1, shot1), "pt-2_pro": format_percent(goal2, shot2), "pt-3_pro": format_percent(goal3, shot3), "fg_pro": format_percent(goal2 + goal3, shot2 + shot3), "Reb": str(def_reb + off_reb), "avgAge": str(avg_age), "ptsStart": str(points[0]), "ptsStart_pro": str(points[1]), "ptsBench": str(points[2]), "ptsBench_pro": str(points[3]), "avgHeight": f"{avg_height} cm", "timeout_left": str(timeout_left), "timeout_str": str(timeout_str), } ) for k in data: data[k] = str(data[k]) return data def time_outs_func(data_pbp): """ Считает таймауты для обеих команд и формирует читабельные строки вида: "2 Time-outs left in 2nd half" Возвращает: (строка_для_команды1, остаток1, строка_для_команды2, остаток2) """ timeout1 = [] timeout2 = [] for event in data_pbp: if event.get("play") == 23: # 23 == таймаут if event.get("startNum") == 1: timeout1.append(event) elif event.get("startNum") == 2: timeout2.append(event) def timeout_status(timeout_list, last_event: dict): period = last_event.get("period", 0) sec = last_event.get("sec", 0) if period < 3: timeout_max = 2 count = sum(1 for t in timeout_list if t.get("period", 0) <= period) quarter = "1st half" elif period < 5: count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period) quarter = "2nd half" if period == 4 and sec >= 4800 and count in (0, 1): timeout_max = 2 else: timeout_max = 3 else: timeout_max = 1 count = sum(1 for t in timeout_list if t.get("period", 0) == period) quarter = f"OverTime {period - 4}" left = max(0, timeout_max - count) word = "Time-outs" if left != 1 else "Time-out" text = f"{left if left != 0 else 'No'} {word} left in {quarter}" return text, left if not data_pbp: return "", 0, "", 0 last_event = data_pbp[-1] t1_str, t1_left = timeout_status(timeout1, last_event) t2_str, t2_left = timeout_status(timeout2, last_event) return t1_str, t1_left, t2_str, t2_left def add_data_for_teams(new_data): """ Считает командные агрегаты: - средний возраст - очки со старта vs со скамейки, + их проценты - средний рост Возвращает кортеж: (avg_age, [start_pts, start%, bench_pts, bench%], avg_height_cm) """ players = [item for item in new_data if item["startRole"] == "Player"] points_start = 0 points_bench = 0 total_age = 0 total_height = 0 player_count = len(players) for player in players: # print(player) stats = player["stats"] if stats: # print(stats) if stats["isStart"] is True: points_start += stats["points"] elif stats["isStart"] is False: points_bench += stats["points"] total_age += player["age"] total_height += player["height"] total_points = points_start + points_bench points_start_pro = ( f"{round(points_start * 100 / total_points)}%" if total_points else "0%" ) points_bench_pro = ( f"{round(points_bench * 100 / total_points)}%" if total_points else "0%" ) avg_age = round(total_age / player_count, 1) if player_count else 0 avg_height = round(total_height / player_count, 1) if player_count else 0 points = [points_start, points_start_pro, points_bench, points_bench_pro] return avg_age, points, avg_height stat_name_list = [ ("points", "Очки", "points"), ("pt-1", "Штрафные", "free throws"), ("pt-1_pro", "штрафные, процент", "free throws pro"), ("pt-2", "2-очковые", "2-points"), ("pt-2_pro", "2-очковые, процент", "2-points pro"), ("pt-3", "3-очковые", "3-points"), ("pt-3_pro", "3-очковые, процент", "3-points pro"), ("fg", "очки с игры", "field goals"), ("fg_pro", "Очки с игры, процент", "field goals pro"), ("assist", "Передачи", "assists"), ("pass", "", ""), ("defReb", "подборы в защите", ""), ("offReb", "подборы в нападении", ""), ("Reb", "Подборы", "rebounds"), ("steal", "Перехваты", "steals"), ("block", "Блокшоты", "blocks"), ("blocked", "", ""), ("turnover", "Потери", "turnovers"), ("foul", "Фолы", "fouls"), ("foulsOn", "", ""), ("foulT", "", ""), ("foulD", "", ""), ("foulC", "", ""), ("foulB", "", ""), ("second", "секунды", "seconds"), ("dunk", "данки", "dunks"), ("fastBreak", "", "fast breaks"), ("plusMinus", "+/-", "+/-"), ("avgAge", "", "avg Age"), ("ptsBench", "", "Bench PTS"), ("ptsBench_pro", "", "Bench PTS, %"), ("ptsStart", "", "Start PTS"), ("ptsStart_pro", "", "Start PTS, %"), ("avgHeight", "", "avg height"), ("timeout_left", "", "timeout left"), ("timeout_str", "", "timeout str"), ] @app.get("/team_stats") async def team_stats(): teams = latest_data["game"]["data"]["result"]["teams"] plays = latest_data["game"]["data"]["result"]["plays"] team_1 = next((t for t in teams if t["teamNumber"] == 1), None) team_2 = next((t for t in teams if t["teamNumber"] == 2), None) timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays) avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1["starts"]) avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2["starts"]) total_1 = add_new_team_stat( team_1["total"], avg_age_1, points_1, avg_height_1, timeout_str1, timeout_left1, ) total_2 = add_new_team_stat( team_2["total"], avg_age_2, points_2, avg_height_2, timeout_str2, timeout_left2, ) result_json = [] for key in total_1: val1 = total_1[key] val2 = total_2[key] stat_rus = "" stat_eng = "" for metric_name, rus, eng in stat_name_list: if metric_name == key: stat_rus, stat_eng = rus, eng break result_json.append( { "name": key, "nameGFX_rus": stat_rus, "nameGFX_eng": stat_eng, "val1": val1, "val2": val2, } ) return maybe_clear_for_vmix(result_json) @app.get("/referee") async def referee(): desired_order = [ "Crew chief", "Referee 1", "Referee 2", "Commissioner", "Ст.судья", "Судья 1", "Судья 2", "Комиссар", ] # Найти судей (teamNumber == 0) team_ref = next( ( t for t in latest_data["game"]["data"]["result"]["teams"] if t["teamNumber"] == 0 ), None, ) referees_raw = team_ref.get("starts", []) referees = [] for r in referees_raw: flag_code = r.get("countryId", "").lower() if r.get("countryName") else "" referees.append( { "displayNumber": r.get("displayNumber", ""), "positionName": r.get("positionName", ""), "lastNameGFX": f"{r.get('firstName', '')} {r.get('lastName', '')}".strip(), "secondName": r.get("secondName", ""), "birthday": r.get("birthday", ""), "age": r.get("age", 0), "flag": f"https://flagicons.lipis.dev/flags/4x3/{flag_code}.svg", } ) # Сортировка по позиции referees = sorted( referees, key=lambda x: ( desired_order.index(x["positionName"]) if x["positionName"] in desired_order else len(desired_order) ), ) return maybe_clear_for_vmix(referees) @app.get("/team_comparison") async def team_comparison(): if STATUS not in ["no_game_today", "finished_today"]: data = latest_data["pregame"]["data"]["result"] teams = [] for data_team in (data["teamStats1"], data["teamStats2"]): temp_team = { "team": data_team["team"]["name"], "games": data_team["games"], "points": round( (data_team["totalStats"]["points"] / data_team["games"]), 1 ), "points_2": round( ( data_team["totalStats"]["goal2"] * 100 / data_team["totalStats"]["shot2"] ), 1, ), "points_3": round( ( data_team["totalStats"]["goal3"] * 100 / data_team["totalStats"]["shot3"] ), 1, ), "points_23": round( ( data_team["totalStats"]["goal23"] * 100 / data_team["totalStats"]["shot23"] ), 1, ), "points_1": round( ( data_team["totalStats"]["goal1"] * 100 / data_team["totalStats"]["shot1"] ), 1, ), "assists": round( (data_team["totalStats"]["assist"] / data_team["games"]), 1 ), "rebounds": round( ( ( data_team["totalStats"]["defRebound"] + data_team["totalStats"]["offRebound"] ) / data_team["games"] ), 1, ), "steals": round( (data_team["totalStats"]["steal"] / data_team["games"]), 1 ), "turnovers": round( (data_team["totalStats"]["turnover"] / data_team["games"]), 1 ), "blocks": round( (data_team["totalStats"]["blockShot"] / data_team["games"]), 1 ), "fouls": round( (data_team["totalStats"]["foul"] / data_team["games"]), 1 ), } teams.append(temp_team) return maybe_clear_for_vmix(teams) else: return [{"Данных о сравнении команд нет!"}] @app.get("/standings") async def regular_standings(): data = latest_data["actual-standings"]["data"]["items"] for item in data: # if item["comp"]["name"] == "Regular Season": if item.get("standings"): standings_rows = item["standings"] df = pd.json_normalize(standings_rows) if "scores" in df.columns: df = df.drop(columns=["scores"]) if ( "totalWin" in df.columns and "totalDefeat" in df.columns and "totalGames" in df.columns and "totalGoalPlus" in df.columns and "totalGoalMinus" in df.columns ): tw = ( pd.to_numeric(df["totalWin"], errors="coerce").fillna(0).astype(int) ) td = ( pd.to_numeric(df["totalDefeat"], errors="coerce") .fillna(0) .astype(int) ) df["w_l"] = tw.astype(str) + " / " + td.astype(str) def calc_percent(row): win = row.get("totalWin", 0) games = row.get("totalGames", 0) # гарантируем числа try: win = int(win) except (TypeError, ValueError): win = 0 try: games = int(games) except (TypeError, ValueError): games = 0 if games == 0 or row["w_l"] == "0 / 0": return 0 return round(win * 100 / games + 0.000005) df["procent"] = df.apply(calc_percent, axis=1) tg_plus = ( pd.to_numeric(df["totalGoalPlus"], errors="coerce") .fillna(0) .astype(int) ) tg_minus = ( pd.to_numeric(df["totalGoalMinus"], errors="coerce") .fillna(0) .astype(int) ) df["plus_minus"] = tg_plus - tg_minus standings_payload = df.to_dict(orient="records") return maybe_clear_for_vmix(standings_payload) @app.get("/live_status") async def live_status(): # если матч реально идёт/вот-вот — пытаемся отдать то, что есть if STATUS in ["live", "live_soon"]: ls = latest_data.get("live-status") if not ls: # live-status ещё не прилетел return maybe_clear_for_vmix( [{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}] ) raw = ls.get("data") # 1) если это уже готовый dict и в нём есть result → как было раньше if isinstance(raw, dict): # иногда API кладёт всё прямо в root, иногда внутрь result if "result" in raw and isinstance(raw["result"], dict): return maybe_clear_for_vmix([raw["result"]]) else: # отдадим как есть, но в списке, чтобы фронт не сломать return maybe_clear_for_vmix([raw]) # 2) если это просто строка статуса ("ok" / "no-status" / "error") if isinstance(raw, str): return maybe_clear_for_vmix([{"status": raw}]) # fallback return maybe_clear_for_vmix( [{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}] ) else: # матч не идёт — как у тебя было return maybe_clear_for_vmix( [{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}] ) def get_excel_row_for_team(team_name: str) -> dict: """ Ищем строку из Excel по имени команды (колонка 'Team'). Читаем из latest_data["excel"]["data"] (список dict'ов). Возвращаем dict по команде или {} если не нашли / нет Excel. """ excel_wrap = latest_data.get("excel_TEAMS_LEGEND") if not excel_wrap: return {} rows = excel_wrap.get("data") or [] # это list[dict] после df.to_dict("records") team_norm = (team_name or "").strip().casefold() for row in rows: name = str(row.get("Team", "")).strip().casefold() if name == team_norm: return row return {} @app.get("/info") async def info(): data = latest_data["game"]["data"]["result"] team1_name = data["team1"]["name"] team2_name = data["team2"]["name"] team1_name_short_api = data["team1"]["abcName"] team2_name_short_api = data["team2"]["abcName"] team1_logo_api = data["team1"]["logo"] team2_logo_api = data["team2"]["logo"] arena_api = data["arena"]["name"] arena_short_api = data["arena"]["shortName"] region_api = data["region"]["name"] date_obj_api = datetime.strptime(data["game"]["localDate"], "%d.%m.%Y") league_api = data["league"]["abcName"] league_full_api = data["league"]["name"] season_api = ( f'{str(data["league"]["season"]-1)}/{str(data["league"]["season"])[2:]}' ) stadia_api = data["comp"]["name"] row_team1 = get_excel_row_for_team(team1_name) row_team2 = get_excel_row_for_team(team2_name) team1_logo_exl = row_team1.get("TeamLogo", "") team1_logo_left_exl = row_team1.get("TeamLogo(LEFT-LOOP)", "") team1_logo_right_exl = row_team1.get("TeamLogo(RIGHT-LOOP)", "") team1_tla_exl = row_team1.get("TeamTLA", "") team1_teamstat_exl = row_team1.get("TeamStats", "") team1_2line_exl = row_team1.get("TeamName2Lines", "") team2_logo_exl = row_team2.get("TeamLogo", "") team2_logo_left_exl = row_team2.get("TeamLogo(LEFT-LOOP)", "") team2_logo_right_exl = row_team2.get("TeamLogo(RIGHT-LOOP)", "") team2_tla_exl = row_team2.get("TeamTLA", "") team2_teamstat_exl = row_team2.get("TeamStats", "") team2_2line_exl = row_team2.get("TeamName2Lines", "") fon_exl = latest_data.get("excel_INFO")["data"][1]["SELECT TEAM1"] swape_exl = latest_data.get("excel_INFO")["data"][2]["SELECT TEAM1"] logo_exl = latest_data.get("excel_INFO")["data"][3]["SELECT TEAM1"] try: full_format = date_obj_api.strftime("%A, %-d %B %Y") short_format = date_obj_api.strftime("%A, %-d %b") except ValueError: full_format = date_obj_api.strftime("%A, %#d %B %Y") short_format = date_obj_api.strftime("%A, %#d %b") return maybe_clear_for_vmix( [ { "team1": team1_name, "team2": team2_name, "team1_short_api": team1_name_short_api, "team2_short_api": team2_name_short_api, "logo1_api": team1_logo_api, "logo2_api": team2_logo_api, "arena_api": arena_api, "short_arena_api": arena_short_api, "region_api": region_api, "league_api": league_api, "league_full_api": league_full_api, "season_api": season_api, "stadia_api": stadia_api, "date1_api": str(full_format), "date2_api": str(short_format), "team1_logo_exl": team1_logo_exl, "team1_logo_left_exl": team1_logo_left_exl, "team1_logo_right_exl": team1_logo_right_exl, "team1_tla_exl": team1_tla_exl, "team1_teamstat_exl": team1_teamstat_exl, "team1_2line_exl": team1_2line_exl, "team2_logo_exl": team2_logo_exl, "team2_logo_left_exl": team2_logo_left_exl, "team2_logo_right_exl": team2_logo_right_exl, "team2_tla_exl": team2_tla_exl, "team2_teamstat_exl": team2_teamstat_exl, "team2_2line_exl": team2_2line_exl, "fon_exl": fon_exl, "swape_exl": swape_exl, "logo_exl": logo_exl, } ] ) @app.get("/play_by_play") async def play_by_play(): data = latest_data["game"]["data"]["result"] data_pbp = data["plays"] team1_name = data["team1"]["name"] team2_name = data["team2"]["name"] team1_startnum = [ i["startNum"] for i in next( (t for t in data["teams"] if t["teamNumber"] == 1), None, )["starts"] if i["startRole"] == "Player" ] team2_startnum = [ i["startNum"] for i in next( (t for t in data["teams"] if t["teamNumber"] == 2), None, )["starts"] if i["startRole"] == "Player" ] # если вообще нет плей-бай-плея — просто отдаём пустой список if not data_pbp: return maybe_clear_for_vmix([]) df_data_pbp = pd.DataFrame(data_pbp[::-1]) last_event = data_pbp[-1] if "play" not in df_data_pbp: return maybe_clear_for_vmix([]) if ( "live-status" in latest_data and latest_data["live-status"]["data"] != "Not Found" ): json_quarter = latest_data["live-status"]["data"]["result"]["period"] json_second = latest_data["live-status"]["data"]["result"]["second"] else: json_quarter = last_event["period"] json_second = 0 if "3x3" in LEAGUE: df_data_pbp["play"].replace({2: 1, 3: 2}, inplace=True) df_goals = df_data_pbp.loc[df_data_pbp["play"].isin([1, 2, 3])].copy() if df_goals.empty: return maybe_clear_for_vmix([]) df_goals.loc[df_goals["startNum"].isin(team1_startnum), "score1"] = df_goals["play"] df_goals.loc[df_goals["startNum"].isin(team2_startnum), "score2"] = df_goals["play"] df_goals["score_sum1"] = df_goals["score1"].fillna(0).cumsum() df_goals["score_sum2"] = df_goals["score2"].fillna(0).cumsum() df_goals["new_sec"] = df_goals["sec"].astype(str).str.slice(0, -1).astype(int) df_goals["time_now"] = (600 if json_quarter < 5 else 300) - json_second df_goals["quar"] = json_quarter - df_goals["period"] # без numpy: diff_time через маски pandas same_quarter = df_goals["quar"] == 0 other_quarter = ~same_quarter df_goals.loc[same_quarter, "diff_time"] = ( df_goals.loc[same_quarter, "time_now"] - df_goals.loc[same_quarter, "new_sec"] ) df_goals.loc[other_quarter, "diff_time"] = ( 600 * df_goals.loc[other_quarter, "quar"] - df_goals.loc[other_quarter, "new_sec"] + df_goals.loc[other_quarter, "time_now"] ) df_goals["diff_time"] = df_goals["diff_time"].astype(int) df_goals["diff_time_str"] = df_goals["diff_time"].apply( lambda x: f"{x // 60}:{str(x % 60).zfill(2)}" ) df_goals["team"] = df_goals.apply( lambda row: team1_name if not pd.isna(row["score1"]) else team2_name, axis=1, ) df_goals["text_rus"] = df_goals.apply( lambda row: ( f"рывок {int(row['score_sum1'])}-{int(row['score_sum2'])}" if not pd.isna(row["score1"]) else f"рывок {int(row['score_sum2'])}-{int(row['score_sum1'])}" ), axis=1, ) df_goals["text_time_rus"] = df_goals.apply( lambda row: ( f"рывок {int(row['score_sum1'])}-{int(row['score_sum2'])} за {row['diff_time_str']}" if not pd.isna(row["score1"]) else f"рывок {int(row['score_sum2'])}-{int(row['score_sum1'])} за {row['diff_time_str']}" ), axis=1, ) df_goals["text"] = df_goals.apply( lambda row: ( f"{team1_name} {int(row['score_sum1'])}-{int(row['score_sum2'])} run" if not pd.isna(row["score1"]) else f"{team2_name} {int(row['score_sum2'])}-{int(row['score_sum1'])} run" ), axis=1, ) df_goals["text_time"] = df_goals.apply( lambda row: ( f"{team1_name} {int(row['score_sum1'])}-{int(row['score_sum2'])} run in last {row['diff_time_str']}" if not pd.isna(row["score1"]) else f"{team2_name} {int(row['score_sum2'])}-{int(row['score_sum1'])} run in last {row['diff_time_str']}" ), axis=1, ) new_order = ["text", "text_time"] + [ col for col in df_goals.columns if col not in ["text", "text_time"] ] df_goals = df_goals[new_order] for _ in ["children", "start", "stop", "hl", "sort", "startNum", "zone", "x", "y"]: if _ in df_goals.columns: del df_goals[_] # 👉 здесь избавляемся от NaN: только для score1/score2 df_goals["score1"] = df_goals["score1"].fillna("") df_goals["score2"] = df_goals["score2"].fillna("") # если хочешь вообще никаких NaN во всём JSON — можно так: # df_goals = df_goals.fillna("") # print(payload) payload = df_goals.to_dict(orient="records") return maybe_clear_for_vmix(payload) def change_vmix_datasource_urls(xml_data, new_base_url: str) -> bytes: """ Ищет все и меняет внутри на new_base_url + endpoint. """ # 1. Приводим вход к bytes if isinstance(xml_data, (bytes, bytearray)): raw_bytes = bytes(xml_data) elif isinstance(xml_data, str): raw_bytes = xml_data.encode("utf-8") elif isinstance(xml_data, io.IOBase) or hasattr(xml_data, "read"): # nasio.load_bio, скорее всего, возвращает BytesIO raw_bytes = xml_data.read() try: xml_data.seek(0) except Exception: pass else: raise TypeError(f"Unsupported xml_data type: {type(xml_data)}") # 2. Декодируем text = raw_bytes.decode("utf-8", errors="replace") # 3. Парсим XML root = ET.fromstring(text) # 4. Меняем URL for ds in root.findall(".//datasource[@friendlyName='JSON']"): for inst in ds.findall(".//instance"): url_tag = inst.find(".//state/xml/url") if url_tag is not None and url_tag.text: old_url = url_tag.text.strip() pattern = r"https?\:\/\/\w+\.\w+\.\w{2,}|https?\:\/\/\d{,3}\.\d{,3}\.\d{,3}\.\d{,3}\:\d*" new_url = re.sub(pattern, new_base_url, old_url, count=0, flags=0) url_tag.text = new_url # 5. Сериализуем обратно в bytes new_xml = ET.tostring(root, encoding="utf-8", method="xml") return new_xml @app.get("/vmix") async def vmix_project(): vmix_bio = nasio.load_bio( user=SYNO_USERNAME, password=SYNO_PASSWORD, nas_ip=SYNO_URL, nas_port="443", path=SYNO_PATH_VMIX, ) # system_name = platform.system() # if system_name == "Windows": # pass # else: # ❗ На Linux/Synology/Docker — заменяем URL edited_vmix = change_vmix_datasource_urls(vmix_bio, f"https://{MYHOST}.tvstart.ru") # 2. гарантируем, что это bytes if isinstance(edited_vmix, str): edited_vmix = edited_vmix.encode("utf-8") return StreamingResponse( io.BytesIO(edited_vmix), media_type="application/octet-stream", headers={"Content-Disposition": f'attachment; filename="VTB_{MYHOST}.vmix"'}, ) @app.get("/quarter") async def select_quarter(): return latest_data["excel_QUARTER"] def resolve_period(ls: dict, game: dict) -> str: try: period_num = int(ls.get("period", 0)) except (TypeError, ValueError): return game.get("period", "") try: seconds_left = int(ls.get("second", 0)) except (TypeError, ValueError): seconds_left = 0 if period_num <= 0: return game.get("period", "") score_a = ls.get("scoreA", game.get("score1")) score_b = ls.get("scoreB", game.get("score2")) if seconds_left == 0: # период закончился if period_num == 1: return "End 1q" if period_num == 2: return "HT" if period_num == 3: return "End 3q" if period_num == 4: return "End 4q" if score_a == score_b else "" # овертаймы return f"End OT{period_num - 4}".replace("1", "") else: # период в процессе if period_num <= 4: return f"Q{period_num}" return f"OT{period_num - 4}".replace("1", "") @app.get("/games_online") async def games_online(): if not CALENDAR or "items" not in CALENDAR: raise HTTPException(status_code=503, detail="calendar data not ready") today = datetime.now().date() # today = (datetime.now() + timedelta(days=1)).date() todays_games = [] final_states = {"result", "resultconfirmed", "finished"} for item in CALENDAR["items"]: game = item.get("game") or {} status_raw = str(game.get("gameStatus", "") or "").lower() need_refresh = status_raw not in final_states dt_str = game.get("defaultZoneDateTime") or "" try: game_dt = datetime.fromisoformat(dt_str).date() except ValueError: continue if game_dt == today: row_team1 = get_excel_row_for_team(item["team1"]["name"]) or {} row_team2 = get_excel_row_for_team(item["team2"]["name"]) or {} game["team1_xls"] = row_team1.get("TeamName2Lines", "") game["team1_logo_xls"] = row_team1.get("TeamLogo", "") game["team2_xls"] = row_team2.get("TeamName2Lines", "") game["team2_logo_xls"] = row_team2.get("TeamLogo", "") game_id = game.get("id") if game_id and need_refresh: try: resp = requests.get( URLS["live-status"].format(host=HOST, game_id=game_id), timeout=5, ).json() ls = resp.get("result") or resp msg = str(ls.get("message") or "").lower() status = str(ls.get("status") or "").lower() if msg == "not found" or status == "404": pass elif ( ls.get("message") != "Not found" and str(ls.get("gameStatus")).lower() == "online" ): game["score1"] = ls.get("scoreA", game.get("score1", "")) game["score2"] = ls.get("scoreB", game.get("score2", "")) game["period"] = resolve_period(ls, game) game["gameStatus"] = ls.get( "gameStatus", game.get("gameStatus", "") ) except Exception as ex: print(ex) scores = [game.get("score1"), game.get("score2")] todays_games.append( { "gameStatus": game["gameStatus"], "score1": ( game["score1"] if any((s or 0) > 0 for s in scores) else "" ), "score2": ( game["score2"] if any((s or 0) > 0 for s in scores) else "" ), "period": ( game["period"] if "period" in game and any((s or 0) > 0 for s in scores) else "" ), "defaultZoneTime": game["defaultZoneTime"], "team1": item["team1"]["name"], "team2": item["team2"]["name"], "team1_xls": game["team1_xls"], "team1_logo_xls": game["team1_logo_xls"], "team2_xls": game["team2_xls"], "team2_logo_xls": game["team2_logo_xls"], "mask1": ( "#FFFFFF00" if any((s or 0) > 0 for s in scores) else "#FFFFFF" ), "mask2": ( "#FFFFFF00" if ( game["period"] if "period" in game and any((s or 0) > 0 for s in scores) else "" ) == "" else "#FFFFFF" ), "mask3": ( "#FFFFFF00" if (game["period"] if "period" in game else "") != "" else "#FFFFFF" ), "split": ( ":" if any((s or 0) > 0 for s in scores) and ( game["period"] if "period" in game and any((s or 0) > 0 for s in scores) else "" ) == "" else "" ), } ) return todays_games def get_image(points, bib, count_point): """ points: список кортежей (x, y, is_made, sec, period) x, y — координаты из API, где (0,0) = центр кольца is_made — True (play 2,3) или False (play 5,6) bib: startNum/номер игрока count_point: строка-версия (анти-кэш vMix) """ if not points: return "" base_image = Image.new("RGBA", (1500, 2800), (0, 0, 0, 0)) width, height = base_image.size draw = ImageDraw.Draw(base_image) # === Диапазон координат === COURT_WIDTH_UNITS = 150.0 # по X COURT_LENGTH_UNITS = 280.0 # по Y scale_x = height / COURT_LENGTH_UNITS scale_y = width / COURT_WIDTH_UNITS HOOP_X_PX = 750 HOOP_Y_PX = 157.5 def to_px(x, y): px = int(HOOP_X_PX - x * scale_x) py = int(HOOP_Y_PX + y * scale_y) return px, py point_radius = 30 # --- шрифт --- font = ImageFont.load_default() try: nasio_font = SYNO_FONT_PATH if nasio_font: if isinstance(nasio_font, BytesIO): nasio_font = nasio_font.getvalue() if isinstance(nasio_font, (bytes, bytearray)): font = ImageFont.truetype(BytesIO(nasio_font), 18) else: font = ImageFont.truetype(nasio_font, 18) except Exception as e: logger.warning(f"[shotmap] не удалось загрузить шрифт: {e}") for x_raw, y_raw, is_made, sec, period in points: try: x = float(x_raw) y = float(y_raw) except (TypeError, ValueError): continue px, py = to_px(x, y) # --- выбираем bytes иконки --- icon_bytes = SYNO_GOAL if is_made else SYNO_MISS used_icon = False if isinstance(icon_bytes, (bytes, bytearray)) and icon_bytes: try: buf = BytesIO(icon_bytes) icon = Image.open(buf).convert("RGBA") size = point_radius * 2 icon = icon.resize((size, size), Image.LANCZOS) base_image.paste(icon, (px - point_radius, py - point_radius), icon) used_icon = True except Exception as e: logger.warning( f"[shotmap] не удалось открыть иконку из bytes " f"(is_made={is_made}, len={len(icon_bytes)}): {e}" ) else: logger.warning( f"[shotmap] icon_bytes пустой или не bytes " f"(is_made={is_made}, type={type(icon_bytes)})" ) # --- Fallback: кружок, если картинка не нарисовалась --- if not used_icon: bbox = ( px - point_radius, py - point_radius, px + point_radius, py + point_radius, ) color = (0, 255, 0, 255) if is_made else (255, 0, 0, 255) draw.ellipse(bbox, fill=color) # --- подпись Q{period} по центру --- label = f"Q{period}" # узнаём размер текста (Pillow 10+ — textbbox, старые — textsize) try: bbox = draw.textbbox((0, 0), label, font=font) text_w = bbox[2] - bbox[0] text_h = bbox[3] - bbox[1] except AttributeError: # fallback для старых версий Pillow text_w, text_h = draw.textsize(label, font=font) # центр иконки = центр текста center_x, center_y = px, py text_x = center_x - text_w // 2 text_y = center_y - text_h // 2 # тень draw.text((text_x + 1, text_y + 1), label, font=font, fill=(0, 0, 0, 255)) # текст draw.text((text_x, text_y), label, font=font, fill=(255, 255, 255, 255)) # --- сохраняем картинку в оперативную память --- filename = f"{bib}_shots_{count_point}" buf = BytesIO() try: # compress_level=1 — быстрее, чем дефолт base_image.save(buf, format="PNG", compress_level=1) except Exception as e: logger.warning(f"[shotmap] не удалось сохранить shotmap в память: {e}") return "" data = buf.getvalue() SHOTMAP_CACHE[filename] = data # кладём в RAM # формируем URL для vMix public_base = f"https://{MYHOST}.tvstart.ru" public_url = f"{public_base.rstrip('/')}/image/{filename}" # logger.info( # f"[shotmap] generated in-memory shotmap for bib={bib}, ver={count_point} " # f"-> {filename}, url={public_url}" # ) return public_url @app.get("/image/{player_id_shots}") async def get_shotmap_image(player_id_shots: str): """ Отдаёт картинку карты бросков из оперативной памяти. player_id_shots должен совпадать с ключом в SHOTMAP_CACHE, например "23_shots". """ data = SHOTMAP_CACHE.get(player_id_shots) if not data: raise HTTPException(status_code=404, detail="Shotmap not found in memory") return Response(content=data, media_type="image/png") if __name__ == "__main__": uvicorn.run( "get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="debug" )