Files
RFB/get_data.py
2025-11-18 18:49:11 +03:00

3503 lines
130 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import Response, HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager
import requests, uvicorn, json
import threading, queue
import argparse
import pandas as pd
from datetime import datetime, time as dtime, timedelta
from fastapi.responses import Response
import logging
import logging.config
from dotenv import load_dotenv
from pprint import pprint
import nasio
import io, os, platform, time
import xml.etree.ElementTree as ET
import re
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument("--league", default="vtb")
parser.add_argument("--team", required=True)
parser.add_argument("--lang", default="en")
args = parser.parse_args()
MYHOST = platform.node()
if not os.path.exists("logs"):
os.makedirs("logs")
telegram_bot_token = os.getenv("TELEGRAM_TOKEN")
telegram_chat_id = str(os.getenv("TELEGRAM_CHAT_ID"))
log_config = {
"version": 1,
"handlers": {
"telegram": {
"class": "telegram_handler.TelegramHandler",
"level": "INFO",
"token": telegram_bot_token,
"chat_id": telegram_chat_id,
"formatter": "telegram",
},
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "simple",
"filename": f"logs/GFX_{MYHOST}.log",
"encoding": "utf-8",
},
},
"loggers": {
__name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"},
},
"formatters": {
"telegram": {
"class": "telegram_handler.HtmlFormatter",
"format": f"%(levelname)s <b>[{MYHOST.upper()}]</b>\n%(message)s",
"use_emoji": "True",
},
"simple": {
"class": "logging.Formatter",
"format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s",
"datefmt": "%d.%m.%Y %H:%M:%S",
},
},
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
logger.handlers[2].formatter.use_emoji = True
pprint(f"Локальный файл окружения ={load_dotenv(verbose=True)}")
LEAGUE = args.league
TEAM = args.team
LANG = args.lang
HOST = os.getenv("API_BASE_URL")
SYNO_PATH_EXCEL = f'{os.getenv("SYNO_PATH_EXCEL")}MATCH INFO.xlsx'
SYNO_URL = os.getenv("SYNO_URL")
SYNO_USERNAME = os.getenv("SYNO_USERNAME")
SYNO_PASSWORD = os.getenv("SYNO_PASSWORD")
SYNO_PATH_VMIX = os.getenv("SYNO_PATH_VMIX")
SYNO_FONT_PATH = os.getenv("SYNO_FONT_PATH")
_syno_font_path = nasio.load_bio(
user=SYNO_USERNAME,
password=SYNO_PASSWORD,
nas_ip=SYNO_URL,
nas_port="443",
path=os.getenv("SYNO_FONT_PATH"),
)
if isinstance(_syno_font_path, BytesIO):
_syno_font_path = _syno_font_path.getvalue()
SYNO_FONT_PATH = _syno_font_path # bytes или None
# ---- ИКОНКА ПРОМАХА ----
_syno_miss_raw = nasio.load_bio(
user=SYNO_USERNAME,
password=SYNO_PASSWORD,
nas_ip=SYNO_URL,
nas_port="443",
path=os.getenv("SYNO_MISS"),
)
if isinstance(_syno_miss_raw, BytesIO):
_syno_miss_raw = _syno_miss_raw.getvalue()
SYNO_MISS = _syno_miss_raw # bytes или None
# ---- ИКОНКА ПОПАДАНИЯ ----
_syno_goal_raw = nasio.load_bio(
user=SYNO_USERNAME,
password=SYNO_PASSWORD,
nas_ip=SYNO_URL,
nas_port="443",
path=os.getenv("SYNO_GOAL"),
)
if isinstance(_syno_goal_raw, BytesIO):
_syno_goal_raw = _syno_goal_raw.getvalue()
SYNO_GOAL = _syno_goal_raw # bytes или None
SHOTMAP_SUBDIR = "shotmaps"
SHOTMAP_DIR = os.path.join(os.getcwd(), "shotmaps")
os.makedirs(SHOTMAP_DIR, exist_ok=True)
try:
for name in os.listdir(SHOTMAP_DIR):
fp = os.path.join(SHOTMAP_DIR, name)
if os.path.isfile(fp):
os.remove(fp)
logger.info(f"[shotmap] очищена папка {SHOTMAP_DIR} при старте")
except Exception as e:
logger.warning(f"[shotmap] не удалось очистить {SHOTMAP_DIR}: {e}")
CALENDAR = None
STATUS = False
GAME_ID = None
SEASON = None
GAME_START_DT = None # datetime начала матча (локальная из календаря)
GAME_TODAY = False # флаг: игра сегодня
GAME_SOON = False # флаг: игра сегодня и скоро (<1 часа)
OFFLINE_SWITCH_AT = None # timestamp, когда надо уйти в оффлайн
OFFLINE_DELAY_SEC = 600 # 10 минут
SLEEP_NOTICE_SENT = False # 👈 чтобы не слать уведомление повторно
# --- preload lock ---
PRELOAD_LOCK = False # когда True — consumer будет принимать только preloaded game
PRELOADED_GAME_ID = None # ID матча, который мы держим «тёплым»
PRELOAD_HOLD_UNTIL = None # timestamp, до какого момента держим (T-1:15)
# 🔥 кэш картинок в оперативной памяти
SHOTMAP_CACHE: dict[str, bytes] = {}
# общая очередь
results_q = queue.Queue()
# тут будем хранить последние данные
latest_data = {}
# событие для остановки потоков
stop_event = threading.Event()
# отдельные события для разных наборов потоков
stop_event_live = threading.Event()
stop_event_offline = threading.Event()
# чтобы из consumer можно было их гасить
threads_live = []
threads_offline = []
# какой режим сейчас запущен: "live" / "offline" / None
CURRENT_THREADS_MODE = None
CLEAR_OUTPUT_FOR_VMIX = False
EMPTY_PHOTO_PATH = r"D:\Графика\БАСКЕТБОЛ\VTB League\ASSETS\EMPTY.png"
URLS = {
"seasons": "{host}api/abc/comps/seasons?Tag={league}",
"actual-standings": "{host}api/abc/comps/actual-standings?tag={league}&season={season}&lang={lang}",
"calendar": "{host}api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000",
"game": "{host}api/abc/games/game?Id={game_id}&Lang={lang}",
"pregame": "{host}api/abc/games/pregame?tag={league}&season={season}&id={game_id}&lang={lang}",
"pregame-full-stats": "{host}api/abc/games/pregame-full-stats?tag={league}&season={season}&id={game_id}&lang={lang}",
"live-status": "{host}api/abc/games/live-status?id={game_id}",
"box-score": "{host}api/abc/games/box-score?id={game_id}",
"play-by-play": "{host}api/abc/games/play-by-play?id={game_id}",
}
def maybe_clear_for_vmix(payload):
"""
Если включён режим очистки — возвращаем payload,
где все значения заменены на "".
Иначе — возвращаем как есть.
"""
if CLEAR_OUTPUT_FOR_VMIX:
return wipe_json_values(payload)
return payload
def start_offline_threads(season, game_id):
"""Запускаем редкие запросы, когда матча нет или он уже сыгран."""
global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data
if CURRENT_THREADS_MODE == "offline":
logger.debug("[threads] already in OFFLINE mode → skip start_offline_threads")
return
stop_live_threads()
stop_offline_threads()
logger.info("[threads] switching to OFFLINE mode ...")
stop_event_offline.clear()
threads_offline = [
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
150, # опрашиваем раз в секунду/реже
stop_event_offline,
False,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
300,
stop_event_offline,
False,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
300,
stop_event_offline,
False,
True,
),
daemon=True,
),
]
for t in threads_offline:
t.start()
CURRENT_THREADS_MODE = "offline"
logger.info("[threads] OFFLINE threads started (data cleaned)")
def start_live_threads(season, game_id):
"""Запускаем частые онлайн-запросы, когда матч идёт/вот-вот."""
global threads_live, CURRENT_THREADS_MODE, stop_event_live
# если уже в лайве — не дублируем
if CURRENT_THREADS_MODE == "live":
return
# на всякий случай гасим офлайн
stop_offline_threads()
stop_event_live.clear()
threads_live = [
threading.Thread(
target=get_data_from_API,
args=(
"pregame",
URLS["pregame"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
300,
stop_event_live,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
300,
stop_event_live,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
300,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
150, # часто
stop_event_live,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"live-status",
URLS["live-status"].format(host=HOST, game_id=game_id),
0.5,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"box-score",
URLS["box-score"].format(host=HOST, game_id=game_id),
0.5,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"play-by-play",
URLS["play-by-play"].format(host=HOST, game_id=game_id),
1,
stop_event_live,
),
daemon=True,
),
]
for t in threads_live:
t.start()
CURRENT_THREADS_MODE = "live"
logger.info("[threads] LIVE threads started")
def stop_live_threads():
"""Гасим только live-треды."""
global threads_live
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
if not threads_live:
logger.info("[threads] LIVE threads stopped (nothing to stop)")
return
logger.info(f"[threads] Stopping {len(threads_live)} LIVE thread(s)...")
stop_event_live.set()
still_alive = []
for t in threads_live:
t.join(timeout=2)
if t.is_alive():
logger.warning(
f"[{current_time}] [threads] LIVE thread is still alive: {t.name}"
)
still_alive.append(t.name)
threads_live = []
if still_alive:
logger.warning(
f"[{current_time}] [threads] Some LIVE threads did not stop: {still_alive}"
)
else:
logger.info("[threads] LIVE threads stopped")
CURRENT_THREADS_MODE = None # 👈 сбрасываем режим
def stop_offline_threads():
"""Гасим только offline-треды."""
global threads_offline
if not threads_offline:
return
stop_event_offline.set()
for t in threads_offline:
t.join(timeout=1)
threads_offline = []
CURRENT_THREADS_MODE = None # 👈 сбрасываем режим
logger.info("[threads] OFFLINE threads stopped")
def has_full_game_ready() -> bool:
game = latest_data.get("game")
if not game:
return False
payload = game.get("data", game)
return (
isinstance(payload, dict)
and isinstance(payload.get("data"), dict)
and isinstance(payload["data"].get("result"), dict)
and "teams" in payload["data"]["result"]
)
# Функция запускаемая в потоках
def get_data_from_API(
name: str,
url: str,
sleep_time: float,
stop_event: threading.Event,
stop_when_live=False,
stop_after_success: bool = False, # 👈 новый флаг
):
did_first_fetch = False
while not stop_event.is_set():
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
if (
stop_when_live
and globals().get("STATUS") == "live"
and has_full_game_ready()
):
logger.info(
f"{[{current_time}]} [{name}] stopping because STATUS='live' and full game is ready"
)
break
try:
value = requests.get(url, timeout=5).json()
did_first_fetch = True # помечаем, что один заход сделали
except json.JSONDecodeError as json_err:
logger.warning(
f"[{current_time}] [{name}] Ошибка парсинга JSON: {json_err}"
)
value = {"error": f"JSON decode error: {json_err}"}
except requests.exceptions.Timeout:
logger.warning(f"[{current_time}] [{name}] Таймаут при запросе {url}")
value = {"error": "timeout"}
except requests.exceptions.RequestException as req_err:
logger.warning(f"[{current_time}] [{name}] Ошибка запроса: {req_err}")
value = {"error": str(req_err)}
except Exception as ex:
logger.warning(f"[{current_time}] [{name}] Неизвестная ошибка: {ex}")
value = {"error": str(ex)}
# Проверяем, нет ли явного статуса ошибки в JSON
if isinstance(value, dict) and str(value.get("status", "")).lower() in (
"error",
"fail",
"no-status",
):
logger.warning(
f"[{current_time}] [{name}] API вернул статус '{value.get('status')}'"
)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
results_q.put({"source": name, "ts": ts, "data": value})
logger.debug(f"[{ts}] name: {name}, status: {value.get('status', 'no-status')}")
ok_status = not (
isinstance(value, dict)
and str(value.get("status", "")).lower() in ("error", "fail", "no-status")
)
if stop_after_success and ok_status:
logger.info(
f"[{name}] got successful response → stopping thread (stop_after_success)"
)
return
slept = 0
while slept < sleep_time:
if stop_event.is_set():
break
if (
stop_when_live
and globals().get("STATUS") == "live"
and has_full_game_ready()
):
logger.info(
f"[{name}] stopping during sleep because STATUS='live' and full game is ready"
)
return
time.sleep(1)
slept += 1
# если запрос занял дольше — просто сразу следующую итерацию
# Получение результатов из всех запущенных потоков
def results_consumer():
while not stop_event.is_set():
# ⬇️ проверяем, не пора ли в оффлайн (отложенный переход)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
off_at = globals().get("OFFLINE_SWITCH_AT")
if off_at is not None and time.time() >= off_at:
# делаем переход ТОЛЬКО если ещё не оффлайн
if globals().get("CURRENT_THREADS_MODE") != "offline":
logger.info("[status] switching to OFFLINE (delayed)")
stop_live_threads()
start_offline_threads(SEASON, GAME_ID)
# чтобы не повторять
globals()["OFFLINE_SWITCH_AT"] = None
try:
msg = results_q.get(timeout=0.5)
except queue.Empty:
continue
try:
source = msg.get("source")
payload = msg.get("data") or {}
# универсальный статус (может не быть)
incoming_status = payload.get("status") # может быть None
# print(source, incoming_status)
if source == "game":
# принимаем ТОЛЬКО тот game_id, который держим в PRELOAD_LOCK
try:
if PRELOAD_LOCK:
incoming_gid = extract_game_id_from_payload(payload)
if not incoming_gid or str(incoming_gid) != str(
PRELOADED_GAME_ID
):
logger.debug(
f"results_consumer: skip game (gid={incoming_gid}) due to PRELOAD_LOCK; keep {PRELOADED_GAME_ID}"
)
continue
except Exception as _e:
logger.debug(f"results_consumer: preload lock check error: {_e}")
else:
latest_data[source] = {
"ts": msg["ts"],
"data": incoming_status if incoming_status is not None else payload,
}
# 1) play-by-play
if "play-by-play" in source:
game = latest_data.get("game")
# если игра уже нормальная — приклеиваем плейи
if (
game
and isinstance(game, dict)
and "data" in game
and "result" in game["data"]
):
# у pbp тоже может не быть data/result
if "result" in payload:
game["data"]["result"]["plays"] = payload["result"]
# а вот статус у play-by-play иногда просто "no-status"
# 2) box-score
elif "box-score" in source:
game = latest_data.get("game")
if (
game
and "data" in game
and "result" in game["data"]
and "teams" in game["data"]["result"]
and "result" in payload
and "teams" in payload["result"]
):
# обновляем команды
game["data"]["result"]["game"]["fullScore"] = payload["result"][
"fullScore"
]
game["data"]["result"]["game"][
"score"
] = f'{payload["result"]["teams"][0]["total"]["points"]}:{payload["result"]["teams"][1]["total"]["points"]}'
for team in game["data"]["result"]["teams"]:
if team["teamNumber"] != 0:
box_team = [
t
for t in payload["result"]["teams"]
if t["teamNumber"] == team["teamNumber"]
]
if not box_team:
print("ERROR: box-score team not found")
continue
box_team = box_team[0]
for player in team["starts"]:
box_player = [
p
for p in box_team["starts"]
if p["startNum"] == player["startNum"]
]
if box_player:
player["stats"] = box_player[0]
team["total"] = box_team["total"]
team["startTotal"] = box_team["startTotal"]
team["benchTotal"] = box_team["benchTotal"]
team["maxLeading"] = box_team["maxLeading"]
team["pointsInRow"] = box_team["pointsInRow"]
team["maxPointsInRow"] = box_team["maxPointsInRow"]
elif "live-status" in source:
latest_data[source] = {
"ts": msg["ts"],
"data": payload,
}
try:
ls_data = payload.get("result") or payload
raw_ls_status = (
ls_data.get("status")
or ls_data.get("gameStatus")
or ls_data.get("state")
)
if raw_ls_status:
raw_ls_status_low = str(raw_ls_status).lower()
finished_markers = [
"finished",
"result",
"resultconfirmed",
"ended",
"final",
"game over",
]
# 1) матч ЗАКОНЧЕН → запускаем ОТСРОЧЕННЫЙ переход
if any(m in raw_ls_status_low for m in finished_markers):
now_ts = time.time()
# если ещё не назначали переход — назначим
if globals().get("OFFLINE_SWITCH_AT") is None:
switch_at = now_ts + globals().get(
"OFFLINE_DELAY_SEC", 600
)
globals()["OFFLINE_SWITCH_AT"] = switch_at
# статус тоже обозначим, что он завершён, но ждёт
if (
GAME_START_DT
and GAME_START_DT.date() == datetime.now().date()
):
globals()["STATUS"] = "finished_wait"
globals()["CLEAR_OUTPUT_FOR_VMIX"] = False
else:
globals()["STATUS"] = "finished_wait"
globals()["CLEAR_OUTPUT_FOR_VMIX"] = False
human_time = datetime.fromtimestamp(switch_at).strftime(
"%H:%M:%S"
)
logger.info(
f"[status] match finished → will switch to OFFLINE at {human_time} "
f"(in {globals().get('OFFLINE_DELAY_SEC', 600)}s)"
)
else:
# уже ждём — можно в debug
logger.debug(
"[status] match finished → OFFLINE already scheduled"
)
# 2) матч снова стал онлайном → СБРАСЫВАЕМ отложенный переход
elif (
"online" in raw_ls_status_low or "live" in raw_ls_status_low
):
# если до этого стояла отложка — уберём
globals()[
"CLEAR_OUTPUT_FOR_VMIX"
] = False # 👈 выключаем очистку
if globals().get("OFFLINE_SWITCH_AT") is not None:
logger.info(
"[status] match back to LIVE → cancel scheduled OFFLINE"
)
globals()["OFFLINE_SWITCH_AT"] = None
if globals().get("STATUS") != "live":
logger.info(
"[status] match became LIVE → switch to LIVE threads"
)
globals()["STATUS"] = "live"
start_live_threads(SEASON, GAME_ID)
except Exception as e:
logger.warning(
f"[{current_time}] results_consumer: live-status postprocess error: {e}"
)
else:
if source == "game":
has_game_already = "game" in latest_data and isinstance(
latest_data.get("game"), dict
)
# Полная структура?
is_full = (
isinstance(payload, dict)
and "data" in payload
and isinstance(payload["data"], dict)
and "result" in payload["data"]
and "teams" in payload["data"]["result"]
)
# ⚙️ ЛОГИКА:
# 1) Пока матч НЕ online (STATUS != 'live'): обновляем всегда,
# чтобы /status видел "живость" раз в 5 минут независимо от полноты JSON.
if globals().get("STATUS") != "live":
latest_data["game"] = {"ts": msg["ts"], "data": payload}
logger.debug(
"results_consumer: pre-live game → updated (full=%s)",
is_full,
)
else:
# ✅ если игры ещё НЕТ в кэше — примем ПЕРВЫЙ game даже неполный,
# чтобы box-score/play-by-play могли его дорастить
if is_full or not has_game_already:
latest_data["game"] = {"ts": msg["ts"], "data": payload}
logger.debug(
"results_consumer: LIVE → stored (full=%s, had=%s)",
is_full,
has_game_already,
)
else:
logger.debug(
"results_consumer: LIVE & partial game → keep previous one"
)
continue
else:
latest_data[source] = {
"ts": msg["ts"],
"data": payload,
}
continue
# ... остальная обработка ...
except Exception as e:
logger.warning(f"[{current_time}] results_consumer error: {repr(e)}")
continue
def get_items(data: dict) -> list:
"""
Мелкий хелпер: берём первый список в ответе API.
Многие ручки отдают {"result":[...]} или {"seasons":[...]}.
Если находим список — возвращаем его.
Если нет — возвращаем None (значит, нужно брать весь dict).
"""
for k, v in data.items():
if isinstance(v, list):
return data[k]
return None
def pick_game_for_team(calendar_json):
"""
Возвращает:
game_id: str | None
game_dt: datetime | None
is_today: bool
cal_status: str | None # Scheduled / Online / Result / ResultConfirmed
Логика:
1. если в календаре есть игра КОМАНДЫ на сегодня — берём ЕЁ и возвращаем её gameStatus
2. иначе — берём последнюю прошедшую и тоже возвращаем её gameStatus
"""
items = get_items(calendar_json)
if not items:
return None, None, False, None
today = datetime.now().date()
# 1) сначала — сегодняшняя
for game in reversed(items):
if game["team1"]["name"].lower() != TEAM.lower():
continue
gdt = extract_game_datetime(game)
gdate = (
gdt.date()
if gdt
else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date()
)
if gdate == today:
cal_status = game["game"].get("gameStatus")
return game["game"]["id"], gdt, True, cal_status
# 2) если на сегодня нет — берём последнюю прошедшую
last_id = None
last_dt = None
last_status = None
for game in reversed(items):
if game["team1"]["name"].lower() != TEAM.lower():
continue
gdt = extract_game_datetime(game)
gdate = (
gdt.date()
if gdt
else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date()
)
if gdate <= today:
last_id = game["game"]["id"]
last_dt = gdt
last_status = game["game"].get("gameStatus")
break
return last_id, last_dt, False, last_status
def extract_game_datetime(game_item: dict) -> datetime | None:
"""
Из элемента календаря достаём datetime матча.
В календаре есть localDate и часто localTime. Если localTime нет — берём 00:00.
"""
try:
date_str = game_item["game"]["localDate"] # '31.10.2025'
dt_date = datetime.strptime(date_str, "%d.%m.%Y").date()
time_str = game_item["game"].get("defaultZoneTime") # '19:30'
if time_str:
hh, mm = map(int, time_str.split(":"))
dt_time = dtime(hour=hh, minute=mm)
else:
dt_time = dtime(hour=0, minute=0)
return datetime.combine(dt_date, dt_time)
except Exception:
return None
def build_pretty_status_message():
"""
Собирает одно красивое сообщение про текущее состояние онлайна.
Если game ещё нет — шлём хотя бы статусы источников.
"""
lines = []
cgid = get_cached_game_id()
lines.append(f"🏀 <b>{LEAGUE.upper()}</b> • {TEAM}")
lines.append(f"📌 Game ID: <code>{cgid or GAME_ID}</code>")
lines.append(f"🕒 {GAME_START_DT}")
# сначала попробуем собрать нормальный game
game_wrap = latest_data.get("game")
has_game = False
if game_wrap:
raw = game_wrap.get("data") if isinstance(game_wrap, dict) else game_wrap
# raw может быть: dict (полный payload) | dict (уже result) | str ("ok"/"no-status")
result = {}
if isinstance(raw, dict):
# ваш нормальный полный ответ по game имеет структуру: {"data": {"result": {...}}}
# но на всякий случай поддержим и вариант, где сразу {"result": {...}} или уже {"game": ...}
result = (
raw.get("data", {}).get("result", {})
if "data" in raw
else (raw.get("result") or raw)
)
else:
result = {}
game_info = result.get("game") or {}
team1_name = (result.get("team1") or {}).get("name", "Team 1")
team2_name = (result.get("team2") or {}).get("name", "Team 2")
lines.append(f"👥 {team1_name} vs {team2_name}")
score_now = game_info.get("score") or ""
full_score = game_info.get("fullScore") or ""
if score_now:
lines.append(f"🔢 Score: <b>{score_now}</b>")
if isinstance(full_score, str) and full_score:
quarters = full_score.split(",")
q_text = " | ".join(f"Q{i+1} {q}" for i, q in enumerate(quarters) if q)
if q_text:
lines.append(f"🧱 By quarters: {q_text}")
has_game = bool(result)
# live-status отдельно
# ls = latest_data.get("live-status", {})
# ls_raw = ls.get("data") or {}
# ls_status = (
# ls_raw.get("status") or ls_raw.get("gameStatus") or ls_raw.get("state") or "—"
# )
# lines.append(f"🟢 LIVE status: <b>{ls_status}</b>")
ls_wrap = latest_data.get("live-status")
ls_status = ""
if ls_wrap:
raw = ls_wrap.get("data")
if isinstance(raw, dict):
ls_dict = raw.get("result") or raw
ls_status = (
ls_dict.get("status")
or ls_dict.get("gameStatus")
or ls_dict.get("state")
or ""
)
elif isinstance(raw, str):
# API/consumer могли положить просто строку статуса: "ok", "no-status", "error"
ls_status = raw
lines.append(f"🟢 LIVE status: <b>{ls_status}</b>")
# добавим блок по источникам — это как раз “состояние запросов”
sort_order = ["game", "live-status", "box-score", "play-by-play"]
keys = [k for k in sort_order if k in latest_data] + sorted(
[k for k in latest_data if k not in sort_order]
)
src_lines = []
for k in keys:
if "excel" not in k.lower():
d = latest_data.get(k) or {}
ts = d.get("ts", "")
dat = d.get("data")
if isinstance(dat, dict) and "status" in dat:
st = str(dat["status"]).lower()
else:
st = str(dat).lower()
# Эмодзи-кружки для статусов
if any(x in st for x in ["ok", "success", "live", "online"]):
emoji = "🟢"
elif any(
x in st for x in ["error", "fail", "no-status", "none", "timeout"]
):
emoji = "🔴"
else:
emoji = "🟡"
src_lines.append(f"{emoji} <b>{k}</b>: {st} ({ts})")
if src_lines:
lines.append("📡 Sources:")
lines.extend(src_lines)
# даже если game не успел — мы всё равно что-то вернём
return "\n".join(lines)
def status_broadcaster():
"""
Если матч live — сразу шлём статус.
Потом — раз в 5 минут.
Если матч не live — ждём и проверяем снова.
"""
INTERVAL = 300 # 5 минут
last_text = None
first_live_sent = False
while not stop_event.is_set():
# если игра не идёт — спим по чуть-чуть и крутимся
if STATUS not in ("live", "live_soon"):
first_live_sent = False # чтобы при новом лайве снова сразу отправить
time.sleep(5)
continue
# сюда попадаем только если live
text = build_pretty_status_message()
if text and text != last_text:
logger.info(text)
last_text = text
first_live_sent = True
# после первого лайва ждём 5 минут, а до него — 10 секунд
wait_sec = INTERVAL if first_live_sent else 10
for _ in range(wait_sec):
if stop_event.is_set():
break
time.sleep(1)
def get_cached_game_id() -> str | None:
game = latest_data.get("game")
if not game:
return None
payload = game.get("data", game)
if not isinstance(payload, dict):
return None
# структура может быть {"data":{"result":{...}}} или {"result":{...}}
result = (
payload.get("data", {}).get("result")
if "data" in payload
else payload.get("result")
)
if not isinstance(result, dict):
return None
g = result.get("game")
if isinstance(g, dict):
return g.get("id")
return None
def extract_game_id_from_payload(payload: dict) -> str | None:
if not isinstance(payload, dict):
return None
root = payload.get("data") if isinstance(payload.get("data"), dict) else payload
res = root.get("result") if isinstance(root.get("result"), dict) else None
if not isinstance(res, dict):
return None
g = res.get("game")
if isinstance(g, dict):
return g.get("id")
return None
def start_offline_prevgame(season, prev_game_id: str):
"""
Специальный оффлайн для ПРЕДЫДУЩЕЙ игры:
- гасит любые текущие треды
- запускает только 'game' для prev_game_id
- НЕ останавливается после первого 'ok' (stop_after_success=False)
"""
global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data
# всегда переключаемся чисто
stop_live_threads()
stop_offline_threads()
logger.info("[threads] switching to OFFLINE mode (previous game) ...")
stop_event_offline.clear()
threads_offline = [
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=prev_game_id, lang=LANG),
150, # редкий опрос
stop_event_offline,
False, # stop_when_live
False, # ✅ stop_after_success=False (держим тред)
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST,
league=LEAGUE,
season=season,
game_id=prev_game_id,
lang=LANG,
),
600,
stop_event_offline,
False,
False,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
600,
stop_event_offline,
False,
False,
),
daemon=True,
),
]
for t in threads_offline:
t.start()
CURRENT_THREADS_MODE = "offline"
logger.info(f"[threads] OFFLINE prev-game thread started for {prev_game_id}")
def start_prestart_watcher(game_dt: datetime | None):
"""
Логика на день игры:
1) Немедленно подгружаем ДАННЫЕ ПРОШЛОГО МАТЧА (один раз, оффлайн-поток 'game'),
чтобы программа имела данные до старта.
2) Ровно за 1:15 до начала — СБРАСЫВАЕМ эти данные (останавливаем оффлайн, чистим latest_data).
3) Ровно за 1:10 до начала — ВКЛЮЧАЕМ LIVE-треды.
"""
if not game_dt:
return
# разовое уведомление о "спячке"
global SLEEP_NOTICE_SENT, STATUS, SEASON, GAME_ID
now = datetime.now()
if not SLEEP_NOTICE_SENT and game_dt > now:
logger.info(
"🛌 Тред ушёл в спячку до начала игры.\n"
f"⏰ Матч начинается сегодня в {game_dt.strftime('%H:%M')}."
)
SLEEP_NOTICE_SENT = True
def _runner():
from datetime import time as dtime # для резервного парсинга времени
global STATUS
PRELOAD_LEAD = timedelta(hours=1, minutes=15) # T-1:15 → сброс
LIVE_LEAD = timedelta(hours=1, minutes=10) # T-1:10 → live
RESET_AT = game_dt - PRELOAD_LEAD
LIVE_AT = game_dt - LIVE_LEAD
PRELOAD_MAXWAIT_SEC = 180 # ждём до 3 мин готовности full game при предзагрузке
did_preload = False
did_reset = False
did_live = False
# --- вспомогательное: поиск предыдущей игры команды ДО сегодняшнего матча ---
def _find_prev_game_id(
calendar_json: dict, cutoff_dt: datetime
) -> tuple[str | None, datetime | None]:
items = get_items(calendar_json) or []
prev_id, prev_dt = None, None
team_norm = (TEAM or "").strip().casefold()
for g in reversed(items):
try:
t1 = (g["team1"]["name"] or "").strip().casefold()
if team_norm not in (t1):
continue
except Exception:
continue
print(g)
gdt = extract_game_datetime(g)
if not gdt:
try:
gd = datetime.strptime(
g["game"]["localDate"], "%d.%m.%Y"
).date()
gdt = datetime.combine(gd, dtime(0, 0))
except Exception:
continue
if gdt < cutoff_dt:
prev_id, prev_dt = g["game"]["id"], gdt
break
return prev_id, prev_dt
# --- Шаг 1: сразу включаем оффлайн по ПРЕДЫДУЩЕЙ игре и держим до T-1:15 ---
try:
now = datetime.now()
if now < RESET_AT:
calendar_resp = requests.get(
URLS["calendar"].format(
host=HOST, league=LEAGUE, season=SEASON, lang=LANG
),
timeout=6,
).json()
prev_game_id, prev_game_dt = _find_prev_game_id(calendar_resp, game_dt)
# print(prev_game_id, prev_game_dt)
if prev_game_id and str(prev_game_id) != str(GAME_ID):
logger.info(
f"[preload] старт оффлайна по предыдущей игре {prev_game_id} ({prev_game_dt})"
)
# включаем «замок», чтобы consumer принимал только старую игру
globals()["PRELOAD_LOCK"] = True
globals()["PRELOADED_GAME_ID"] = str(prev_game_id)
globals()["PRELOAD_HOLD_UNTIL"] = RESET_AT.timestamp()
# поднимаем один оффлайн-тред по старой игре (без stop_after_success)
start_offline_prevgame(SEASON, prev_game_id)
did_preload = True
else:
logger.warning("[preload] предыдущая игра не найдена — пропускаем")
else:
logger.info(
"[preload] уже поздно для предзагрузки (прошло T-1:15) — пропуск"
)
except Exception as e:
logger.warning(f"[preload] ошибка предзагрузки прошлой игры: {e}")
# --- Основной цикл ожидания контрольных моментов ---
while not stop_event.is_set():
now = datetime.now()
# если матч уже в другом конечном состоянии — выходим
if STATUS in ("live", "finished", "finished_wait", "finished_today"):
break
# Шаг 2: ровно T-1:15 — сбрасываем предзагруженные данные
if not did_reset and now >= RESET_AT:
logger.info(
f"[reset] {now:%H:%M:%S} → T-1:15: сбрасываем предзагруженные данные"
)
try:
stop_offline_threads() # на всякий
# for key in latest_data:
# latest_data[key] = wipe_json_values(latest_data[key])
# latest_data.clear() # полный сброс кэша
# снять замок предзагрузки
globals()["PRELOAD_LOCK"] = False
globals()["PRELOADED_GAME_ID"] = None
globals()["PRELOAD_HOLD_UNTIL"] = None
logger.info(
"[reset] latest_data очищен; ждём T-1:10 для запуска live"
)
except Exception as e:
logger.warning(f"[reset] ошибка при очистке: {e}")
globals()["CLEAR_OUTPUT_FOR_VMIX"] = True
did_reset = True
# Шаг 3: T-1:10 — включаем live-треды
if not did_live and now >= LIVE_AT:
logger.info(
f"[prestart] {now:%H:%M:%S}, игра в {game_dt:%H:%M}, включаем LIVE threads по правилу T-1:10"
)
STATUS = "live_soon"
globals()[
"CLEAR_OUTPUT_FOR_VMIX"
] = False # можно оставить пустоту до первых живых данных
stop_offline_threads() # на всякий случай
start_live_threads(SEASON, GAME_ID)
did_live = True
break
time.sleep(15)
t = threading.Thread(target=_runner, daemon=True)
t.start()
def get_excel():
return nasio.load_formatted(
user=SYNO_USERNAME,
password=SYNO_PASSWORD,
nas_ip=SYNO_URL,
nas_port="443",
path=SYNO_PATH_EXCEL,
# sheet="TEAMS LEGEND",
)
def excel_worker():
"""
Раз в минуту читает ВСЕ вкладки Excel
и сохраняет их в latest_data с префиксом excel_<sheet_name>.
"""
global latest_data
while not stop_event.is_set():
try:
sheets = get_excel() # <- теперь это dict: {sheet_name: DataFrame}
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
if isinstance(sheets, dict):
for sheet_name, df in sheets.items():
# пропускаем странные объекты
if not hasattr(df, "fillna"):
# logger.warning(f"[excel] Лист '{sheet_name}' не DataFrame")
continue
# ЧИСТИМ NaN и конвертируем
df = df.fillna("")
data_json = df.to_dict(orient="records")
# ключ в latest_data: excel_<имя_вкладки>
key = f"excel_{sheet_name}".replace(" ", "_").replace("-", "_")
latest_data[key] = {
"ts": ts,
"data": data_json,
}
# logger.info("[excel] Все вкладки Excel обновлены")
else:
pass
# logger.warning("[excel] get_excel() вернул не словарь")
except Exception as e:
logger.warning(f"[excel] ошибка при чтении Excel: {e}")
# пауза 60 сек
for _ in range(60):
if stop_event.is_set():
break
time.sleep(1)
@asynccontextmanager
async def lifespan(app: FastAPI):
global STATUS, GAME_ID, SEASON, GAME_START_DT, GAME_TODAY, GAME_SOON, CALENDAR
# 1. проверяем API: seasons
try:
seasons_resp = requests.get(
URLS["seasons"].format(host=HOST, league=LEAGUE)
).json()
season = seasons_resp["items"][0]["season"]
except Exception:
now = datetime.now()
if now.month > 9:
season = now.year + 1
else:
season = now.year
logger.info(f"предположили номер сезона: {season}")
SEASON = season
# 2. берём календарь
try:
calendar = requests.get(
URLS["calendar"].format(host=HOST, league=LEAGUE, season=season, lang=LANG)
).json()
except Exception as ex:
logger.error(f"не получилось проверить работу API. код ошибки: {ex}")
# тут можно вообще не запускать сервер, но оставим как есть
calendar = None
CALENDAR = calendar
# 3. определяем игру
game_id, game_dt, is_today, cal_status = (
pick_game_for_team(calendar) if calendar else (None, None, False, None)
)
GAME_ID = game_id
GAME_START_DT = game_dt
GAME_TODAY = is_today
logger.info(f"Лига: {LEAGUE}\nСезон: {season}\nКоманда: {TEAM}\nGame ID: {game_id}")
thread_excel = threading.Thread(
target=excel_worker,
daemon=True,
)
thread_excel.start()
# 4. запускаем "длинные" потоки (они у тебя и так всегда)
thread_result_consumer = threading.Thread(
target=results_consumer,
daemon=True,
)
thread_result_consumer.start()
thread_status_broadcaster = threading.Thread(
target=status_broadcaster,
daemon=True,
)
thread_status_broadcaster.start()
# 5. решаем, что запускать
if not is_today:
STATUS = "no_game_today"
start_offline_threads(SEASON, GAME_ID)
else:
# игра сегодня
# в любом случае запускаем сторож, если знаем время игры
start_prestart_watcher(game_dt)
if cal_status is None:
STATUS = "today_not_started"
start_offline_threads(SEASON, GAME_ID)
elif cal_status == "Scheduled":
if game_dt:
delta = game_dt - datetime.now()
# если мы уже МЕНЬШЕ чем за 1:10 до игры — сразу в live
if delta <= timedelta(hours=1, minutes=10):
STATUS = "live_soon"
start_live_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
# start_offline_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
# start_offline_threads(SEASON, GAME_ID)
elif cal_status == "Online":
STATUS = "live"
start_live_threads(SEASON, GAME_ID)
elif cal_status in ["Result", "ResultConfirmed"]:
STATUS = "finished_today"
start_offline_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
start_offline_threads(SEASON, GAME_ID)
yield
# -------- shutdown --------
stop_event.set()
stop_live_threads()
stop_offline_threads()
thread_result_consumer.join(timeout=1)
thread_status_broadcaster.join(timeout=1)
thread_excel.join(timeout=1)
app = FastAPI(
lifespan=lifespan,
docs_url=None, # ❌ отключает /docs
redoc_url=None, # ❌ отключает /redoc
openapi_url=None, # ❌ отключает /openapi.json
)
# раздаём /shotmaps как статику из SHOTMAP_DIR
app.mount("/shotmaps", StaticFiles(directory=SHOTMAP_DIR), name="shotmaps")
# @app.get("/shotmaps/{filename}")
# async def get_shotmap(filename: str):
# data = SHOTMAP_CACHE.get(filename)
# if not data:
# # если вдруг перезапустился процесс или такой карты нет
# raise HTTPException(status_code=404, detail="Shotmap not found")
# return Response(content=data, media_type="image/png")
def format_time(seconds: float | int) -> str:
"""
Удобный формат времени для игроков:
71 -> "1:11"
0 -> "0:00"
Любые кривые значения -> "0:00".
"""
try:
total_seconds = int(float(seconds))
minutes = total_seconds // 60
sec = total_seconds % 60
return f"{minutes}:{sec:02}"
except (ValueError, TypeError):
return "0:00"
@app.get("/team1")
async def team1():
game = get_latest_game_safe("game")
if not game:
# если данных вообще нет (ещё ни одной игры) — тут реально нечего отдавать
raise HTTPException(status_code=503, detail="game data not ready")
data = await team("team1")
return maybe_clear_for_vmix(data)
@app.get("/team2")
async def team2():
game = get_latest_game_safe("game")
if not game:
raise HTTPException(status_code=503, detail="game data not ready")
data = await team("team2")
return maybe_clear_for_vmix(data)
@app.get("/top_team1")
async def top_team1():
data = await team("team1")
top = await top_sorted_team(data)
return maybe_clear_for_vmix(top)
@app.get("/top_team2")
async def top_team2():
data = await team("team2")
top = await top_sorted_team(data)
return maybe_clear_for_vmix(top)
def _b(v) -> bool:
if isinstance(v, bool):
return v
if isinstance(v, (int, float)):
return v != 0
if isinstance(v, str):
return v.strip().lower() in ("1", "true", "yes", "on")
return False
def _placeholders(n=5):
return [
{
"NameGFX": "",
"Name1GFX": "",
"Name2GFX": "",
"isOnCourt": False,
"num": "",
"photoGFX": EMPTY_PHOTO_PATH,
}
for _ in range(n)
]
def wipe_json_values(obj):
"""
Рекурсивно заменяет все значения JSON на пустые строки.
Если ключ содержит "photo", заменяет значение на EMPTY_PHOTO_PATH.
"""
# если словарь — обрабатываем ключи
if isinstance(obj, dict):
new_dict = {}
for k, v in obj.items():
if "photo" in str(k).lower():
# ключ содержит photo → отдаём пустую картинку
new_dict[k] = EMPTY_PHOTO_PATH
else:
new_dict[k] = wipe_json_values(v)
return new_dict
# если список — рекурсивно обработать элементы
elif isinstance(obj, list):
return [wipe_json_values(v) for v in obj]
# любое конечное значение → ""
else:
return ""
@app.get("/started_team1")
async def started_team1(sort_by: str = None):
data = await team("team1")
players = await started_team(data) or []
# нормализуем флаги
for p in players:
p["isStart"] = _b(p.get("isStart", False))
p["isOnCourt"] = _b(p.get("isOnCourt", False))
if sort_by and sort_by.strip().lower() == "isstart":
starters = [p for p in players if p["isStart"]]
return maybe_clear_for_vmix(starters[:5] if starters else _placeholders(5))
if sort_by and sort_by.strip().lower() == "isoncourt":
on_court = [p for p in players if p["isOnCourt"]]
return maybe_clear_for_vmix(on_court[:5] if on_court else _placeholders(5))
# дефолт — без фильтра, как раньше
return maybe_clear_for_vmix(players)
@app.get("/started_team2")
async def started_team2(sort_by: str = None):
data = await team("team2")
players = await started_team(data) or []
for p in players:
p["isStart"] = _b(p.get("isStart", False))
p["isOnCourt"] = _b(p.get("isOnCourt", False))
if sort_by and sort_by.strip().lower() == "isstart":
starters = [p for p in players if p["isStart"]]
return maybe_clear_for_vmix(starters[:5] if starters else _placeholders(5))
if sort_by and sort_by.strip().lower() == "isoncourt":
on_court = [p for p in players if p["isOnCourt"]]
return maybe_clear_for_vmix(on_court[:5] if on_court else _placeholders(5))
return maybe_clear_for_vmix(players)
@app.get("/latest_data")
async def game():
return latest_data
@app.get("/status")
async def status(request: Request):
global STATUS # будем его править, если live-status свежее
def color_for_status(status_value: str) -> str:
"""Подбор цвета статуса в HEX"""
status_value = str(status_value).lower()
if status_value in ["ok", "success", "live", "live_soon", "online"]:
return "#00FF00" # зелёный
elif status_value in ["scheduled", "today_not_started", "upcoming"]:
return "#FFFF00" # жёлтый
elif status_value in [
"result",
"resultconfirmed",
"finished",
"finished_today",
]:
return "#FF0000" # красный
elif status_value in ["no_game_today", "unknown", "none"]:
return "#FFFFFF" # белый
else:
return "#FF7700" # серый (неизвестный статус)
# ✳️ сортируем latest_data в нужном порядке
sort_order = ["game", "live-status", "box-score", "play-by-play"]
sorted_keys = [k for k in sort_order if k in latest_data] + sorted(
[k for k in latest_data if k not in sort_order]
)
# убираем excel_* из списка ключей
sorted_keys = [k for k in sorted_keys if "excel" not in k.lower()]
cached_game_id = get_cached_game_id() or GAME_ID
note = ""
if cached_game_id and GAME_ID and str(cached_game_id) != str(GAME_ID):
note = (
f' <span style="color:#ffb84d;">(предзагружены данные прошлой игры)</span>'
)
data = {
"league": LEAGUE,
"team": TEAM,
"game_id": cached_game_id,
"game_status": STATUS,
"statuses": [
{
"name": TEAM,
"status": STATUS,
"ts": (
GAME_START_DT.strftime("%Y-%m-%d %H:%M") if GAME_START_DT else "N/A"
),
"link": LEAGUE,
"color": color_for_status(STATUS),
}
]
+ [
{
"name": item,
"status": (
latest_data[item]["data"]["status"]
if isinstance(latest_data[item]["data"], dict)
and "status" in latest_data[item]["data"]
else latest_data[item]["data"]
),
"ts": latest_data[item]["ts"],
"link": URLS[item].format(
host=HOST,
league=LEAGUE,
season=SEASON,
lang=LANG,
game_id=cached_game_id,
),
"color": color_for_status(
latest_data[item]["data"]["status"]
if isinstance(latest_data[item]["data"], dict)
and "status" in latest_data[item]["data"]
else latest_data[item]["data"]
),
}
for item in sorted_keys
],
}
accept = request.headers.get("accept", "")
if "text/html" in accept:
status_raw = str(STATUS).lower()
if status_raw in ["live", "online"]:
gs_class = "live"
gs_text = "🟢 LIVE"
elif status_raw in ["live_soon", "today_not_started"]:
gs_class = "live"
gs_text = "🟢 GAME TODAY (soon)"
elif status_raw in ["finished_wait"]:
gs_class = "upcoming"
off_at = OFFLINE_SWITCH_AT
if off_at:
human = datetime.fromtimestamp(off_at).strftime("%H:%M:%S")
gs_text = f"🟡 Game finished, cooling down → OFFLINE at {human}"
else:
gs_text = "🟡 Game finished, cooling down"
elif status_raw in ["finished_today", "finished"]:
gs_class = "finished"
gs_text = "🔴 Game finished"
else:
gs_class = status_raw
gs_text = "⚪ Unknown"
html = f"""
<html>
<head>
<meta http-equiv="refresh" content="1">
<style>
body {{
font-family: "Segoe UI", Roboto, monospace;
background: #0f0f0f;
color: #eee;
padding: 20px;
}}
table {{
border-collapse: collapse;
width: 100%;
margin-top: 10px;
}}
th, td {{
border: 1px solid #333;
padding: 6px 10px;
text-align: left;
}}
th {{
background: #222;
color: #ccc;
}}
tr:nth-child(even) {{ background-color: #1a1a1a; }}
.ok {{ color: #00ff7f; font-weight: bold; }}
.warn {{ color: #ffff66; font-weight: bold; }}
.fail {{ color: #ff4d4d; font-weight: bold; }}
.live {{ color: #00ff7f; font-weight: bold; }}
.finished {{ color: #ff4d4d; font-weight: bold; }}
.unknown {{ color: #cccccc; font-weight: bold; }}
a {{
color: #66b3ff;
text-decoration: none;
}}
a:hover {{ text-decoration: underline; }}
h2 {{ margin-bottom: 5px; }}
.header-info p {{ margin: 2px 0; }}
</style>
</head>
<body>
<h2>📊 Game Status Monitor</h2>
<div class="header-info">
<p><b>League:</b> {LEAGUE}</p>
<p><b>Team:</b> {TEAM}</p>
<p><b>Game ID:</b> {cached_game_id}{note}</p>
<p><b>Game Status:</b> <span class="{gs_class}">{gs_text}</span></p>
</div>
<table>
<tr><th>Name</th><th>Status</th><th>Timestamp</th><th>Link</th></tr>
"""
# ВАЖНО: цикл только добавляет строки, return будет ПОСЛЕ цикла
for s in data["statuses"]:
status_text = str(s["status"]).strip().lower()
if any(
x in status_text
for x in ["ok", "success", "live", "live_soon", "online"]
):
color_class = "ok"
elif any(
x in status_text for x in ["scheduled", "today_not_started", "upcoming"]
):
color_class = "warn"
elif any(
x in status_text
for x in ["result", "resultconfirmed", "finished", "finished_today"]
):
color_class = "fail"
else:
color_class = "unknown"
html += f"""
<tr>
<td>{s["name"]}</td>
<td class="{color_class}">{status_text}</td>
<td>{s["ts"]}</td>
<td><a href="{s["link"]}" target="_blank">{s["link"]}</a></td>
</tr>
"""
# закрываем таблицу и страницу УЖЕ ПОСЛЕ цикла
html += """
</table>
</body>
</html>
"""
return HTMLResponse(content=html, media_type="text/html")
# JSON для API (красиво отформатированный)
formatted = json.dumps(data, indent=4, ensure_ascii=False)
response = Response(content=formatted, media_type="application/json")
response.headers["Refresh"] = "1"
return maybe_clear_for_vmix(response)
@app.get("/scores")
async def scores():
game = get_latest_game_safe("game")
if not game:
# игры ещё нет или пришёл только частичный ответ
# отдаём пустую структуру, чтобы фронт не падал
return [
{"Q": "Q1", "score1": "", "score2": ""},
{"Q": "Q2", "score1": "", "score2": ""},
{"Q": "Q3", "score1": "", "score2": ""},
{"Q": "Q4", "score1": "", "score2": ""},
]
game_data = game["data"] if "data" in game else game
result = game_data.get("result", {})
game_info = result.get("game", {})
full_score = game_info.get("fullScore")
if not full_score:
# поле есть, но ещё пустое/None
return [
{"Q": "Q1", "score1": "", "score2": ""},
{"Q": "Q2", "score1": "", "score2": ""},
{"Q": "Q3", "score1": "", "score2": ""},
{"Q": "Q4", "score1": "", "score2": ""},
]
quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"]
score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters]
full_score_list = full_score.split(",")
for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]):
parts = score_str.split(":")
if len(parts) == 2:
score_by_quarter[i]["score1"] = parts[0]
score_by_quarter[i]["score2"] = parts[1]
return maybe_clear_for_vmix(score_by_quarter)
async def top_sorted_team(data):
top_sorted_team = sorted(
(p for p in data if p.get("startRole") in ["Player", ""]),
key=lambda x: (
x.get("pts", 0),
x.get("dreb", 0) + x.get("oreb", 0),
x.get("ast", 0),
x.get("stl", 0),
x.get("blk", 0),
x.get("time", "0:00"),
),
reverse=True,
)
# пустые строки не должны ломать UI процентами фолов/очков
for player in top_sorted_team:
if player.get("num", "") == "":
player["pts"] = ""
player["foul"] = ""
return top_sorted_team
def get_latest_game_safe(name: str):
"""
Безопасно достаём актуальный game из latest_data.
Возвращаем None, если структура ещё не готова или прилетел "плохой" game
(например, с {"status": "no-status"} без data/result).
"""
game = latest_data.get(name)
if not game:
return None
# у нас в latest_data["game"] лежит {"ts": ..., "data": {...}} или сразу {...}
# в consumer мы клали {"ts": ..., "data": payload}, так что берём .get("data")
if "data" in game:
game_data = game["data"]
else:
# на всякий случай, если где-то клали сразу payload
game_data = game
if not isinstance(game_data, dict):
return None
result = game_data.get("result")
if not result:
return None
# если всё ок — вернём в исходном виде (с ts и т.п.)
return game
def _pick_last_avg_and_sum(stats_list: list) -> tuple[dict, dict]:
"""Возвращает (season_sum, season_avg) из seasonStats. Безопасно при пустых данных."""
if not isinstance(stats_list, list) or len(stats_list) == 0:
return {}, {}
# В JSON конец массива: ... {"class":"Sum"}, {"class":"Avg"}
last = stats_list[-1] if stats_list else None
prev = stats_list[-2] if len(stats_list) >= 2 else None
season_avg = (
last.get("stats", {})
if isinstance(last, dict) and str(last.get("class")).lower() == "avg"
else {}
)
season_sum = (
prev.get("stats", {})
if isinstance(prev, dict) and str(prev.get("class")).lower() == "sum"
else {}
)
# Бывают инверсии порядка (на всякий случай): попробуем найти явно
if not season_avg or not season_sum:
for x in reversed(stats_list):
if (
isinstance(x, dict)
and str(x.get("class")).lower() == "avg"
and not season_avg
):
season_avg = x.get("stats", {}) or {}
if (
isinstance(x, dict)
and str(x.get("class")).lower() == "sum"
and not season_sum
):
season_sum = x.get("stats", {}) or {}
if season_avg and season_sum:
break
return season_sum, season_avg
def _pick_career_sum_and_avg(carrier_list: list) -> tuple[dict, dict]:
"""Возвращает (career_sum, career_avg) из carrier. В API встречаются блоки с class: Normal/Sum/Avg."""
if not isinstance(carrier_list, list) or len(carrier_list) == 0:
return {}, {}
career_sum, career_avg = {}, {}
# Ищем явные «Sum» и «Avg»
for x in reversed(carrier_list):
if isinstance(x, dict):
cls = str(x.get("class", "")).lower()
stats = x.get("stats", {}) or {}
if cls == "sum" and not career_sum:
career_sum = stats
elif cls == "avg" and not career_avg:
career_avg = stats
if career_sum and career_avg:
break
# Если «Avg» нет (часто для карьеры бывает только Normal/Sum) — ок, оставим пустым
return career_sum, career_avg
def _as_int(v, default=0):
try:
# в JSON часто строки; пустые строки -> 0
if v in ("", None):
return default
return int(float(v))
except Exception:
return default
def _safe(d: dict) -> dict:
return d if isinstance(d, dict) else {}
async def team(who: str):
"""
Возвращает данные по команде (team1 / team2) из актуального game.
Защищена от ситуации, когда latest_data["game"] ещё не прогрелся
или в него прилетел "плохой" ответ от API.
"""
game = get_latest_game_safe("game")
if not game:
# игра ещё не подгружена или структура кривоватая
raise HTTPException(status_code=503, detail="game data not ready")
full_stat = get_latest_game_safe("pregame-full-stats")
if not full_stat:
# ⚠️ full_stat_data отсутствует — работаем только с game_data
logger.debug(
f"[{who}] full_stat_data not found → continuing with game_data only"
)
full_stat_data = {}
else:
full_stat_data = full_stat["data"] if "data" in full_stat else full_stat
# нормализуем доступ к данным
game_data = game["data"] if "data" in game else game
result = game_data[
"result"
] # здесь уже безопасно, мы проверили в get_latest_game_safe
result_full = full_stat_data.get("result", {}) if full_stat_data else {}
# в result ожидаем "teams"
teams = result.get("teams")
if not teams:
raise HTTPException(status_code=503, detail="game teams not ready")
# выбираем команду
if who == "team1":
payload = next((t for t in teams if t.get("teamNumber") == 1), None)
payload_full = result_full.get("team1PlayersStats") if result_full else []
else:
payload = next((t for t in teams if t.get("teamNumber") == 2), None)
payload_full = result_full.get("team2PlayersStats") if result_full else []
if payload is None:
raise HTTPException(status_code=404, detail=f"{who} not found in game data")
# дальше — твоя исходная логика формирования ответа по команде
# я не знаю весь твой оригинальный код ниже, поэтому вставляю каркас
# и показываю, где нужно аккуратно брать plays/box-score из latest_data
role_list = [
("Center", "C"),
("Guard", "G"),
("Forward", "F"),
("Power Forward", "PF"),
("Small Forward", "SF"),
("Shooting Guard", "SG"),
("Point Guard", "PG"),
("Forward-Center", "FC"),
]
starts = payload.get("starts", [])
team_rows = []
for item in starts:
stats = item.get("stats") or {}
pid = str(item.get("personId"))
full_obj = next(
(p for p in (payload_full or []) if str(p.get("personId")) == pid), None
)
season_sum = season_avg = career_sum = career_avg = {}
if full_obj:
# сезон
season_sum, season_avg = _pick_last_avg_and_sum(
full_obj.get("seasonStats") or []
)
# карьера
career_sum, career_avg = _pick_career_sum_and_avg(
full_obj.get("carrier") or []
)
season_sum = _safe(season_sum)
season_avg = _safe(season_avg)
career_sum = _safe(career_sum)
career_avg = _safe(career_avg)
# Полезные числа для Totals+Live
# live-поля в box-score называются goal1/2/3, shot1/2/3, defReb/offReb и т.п.
g1 = _as_int(stats.get("goal1"))
s1 = _as_int(stats.get("shot1"))
g2 = _as_int(stats.get("goal2"))
s2 = _as_int(stats.get("shot2"))
g3 = _as_int(stats.get("goal3"))
s3 = _as_int(stats.get("shot3"))
# Сезонные суммы из pregame-full-stats
ss_pts = _as_int(season_sum.get("points"))
ss_ast = _as_int(season_sum.get("assist"))
ss_blk = _as_int(season_sum.get("blockShot"))
ss_dreb = _as_int(season_sum.get("defRebound"))
ss_oreb = _as_int(season_sum.get("offRebound"))
ss_reb = _as_int(season_sum.get("rebound"))
ss_stl = _as_int(season_sum.get("steal"))
ss_to = _as_int(season_sum.get("turnover"))
ss_foul = _as_int(season_sum.get("foul"))
ss_sec = _as_int(season_sum.get("second"))
ss_gms = _as_int(season_sum.get("games"))
ss_st = _as_int(season_sum.get("isStarts"))
ss_g1 = _as_int(season_sum.get("goal1"))
ss_s1 = _as_int(season_sum.get("shot1"))
ss_g2 = _as_int(season_sum.get("goal2"))
ss_s2 = _as_int(season_sum.get("shot2"))
ss_g3 = _as_int(season_sum.get("goal3"))
ss_s3 = _as_int(season_sum.get("shot3"))
# Карьерные суммы из pregame-full-stats
car_ss_pts = _as_int(career_sum.get("points"))
car_ss_ast = _as_int(career_sum.get("assist"))
car_ss_blk = _as_int(career_sum.get("blockShot"))
car_ss_dreb = _as_int(career_sum.get("defRebound"))
car_ss_oreb = _as_int(career_sum.get("offRebound"))
car_ss_reb = _as_int(career_sum.get("rebound"))
car_ss_stl = _as_int(career_sum.get("steal"))
car_ss_to = _as_int(career_sum.get("turnover"))
car_ss_foul = _as_int(career_sum.get("foul"))
car_ss_sec = _as_int(career_sum.get("second"))
car_ss_gms = _as_int(career_sum.get("games"))
car_ss_st = _as_int(career_sum.get("isStarts"))
car_ss_g1 = _as_int(career_sum.get("goal1"))
car_ss_s1 = _as_int(career_sum.get("shot1"))
car_ss_g2 = _as_int(career_sum.get("goal2"))
car_ss_s2 = _as_int(career_sum.get("shot2"))
car_ss_g3 = _as_int(career_sum.get("goal3"))
car_ss_s3 = _as_int(career_sum.get("shot3"))
# Totals по сезону, «с учётом текущего матча»:
T_points = ss_pts + _as_int(stats.get("points"))
T_assist = ss_ast + _as_int(stats.get("assist"))
T_block = ss_blk + _as_int(stats.get("block"))
T_dreb = ss_dreb + _as_int(stats.get("defReb"))
T_oreb = ss_oreb + _as_int(stats.get("offReb"))
T_reb = ss_reb + (_as_int(stats.get("defReb")) + _as_int(stats.get("offReb")))
T_steal = ss_stl + _as_int(stats.get("steal"))
T_turn = ss_to + _as_int(stats.get("turnover"))
T_foul = ss_foul + _as_int(stats.get("foul"))
T_sec = ss_sec + _as_int(stats.get("second"))
T_gms = ss_gms + (1 if _as_int(stats.get("second")) > 0 else 0)
T_starts = ss_st + (1 if bool(stats.get("isStart")) else 0)
T_g1 = ss_g1 + g1
T_s1 = ss_s1 + s1
T_g2 = ss_g2 + g2
T_s2 = ss_s2 + s2
T_g3 = ss_g3 + g3
T_s3 = ss_s3 + s3
# Totals по карьере, «с учётом текущего матча»:
car_T_points = car_ss_pts + _as_int(stats.get("points"))
car_T_assist = car_ss_ast + _as_int(stats.get("assist"))
car_T_block = car_ss_blk + _as_int(stats.get("block"))
car_T_dreb = car_ss_dreb + _as_int(stats.get("defReb"))
car_T_oreb = car_ss_oreb + _as_int(stats.get("offReb"))
car_T_reb = car_ss_reb + (
_as_int(stats.get("defReb")) + _as_int(stats.get("offReb"))
)
car_T_steal = car_ss_stl + _as_int(stats.get("steal"))
car_T_turn = car_ss_to + _as_int(stats.get("turnover"))
car_T_foul = car_ss_foul + _as_int(stats.get("foul"))
car_T_sec = car_ss_sec + _as_int(stats.get("second"))
car_T_gms = car_ss_gms + (1 if _as_int(stats.get("second")) > 0 else 0)
car_T_starts = car_ss_st + (1 if bool(stats.get("isStart")) else 0)
car_T_g1 = car_ss_g1 + g1
car_T_s1 = car_ss_s1 + s1
car_T_g2 = car_ss_g2 + g2
car_T_s2 = car_ss_s2 + s2
car_T_g3 = car_ss_g3 + g3
car_T_s3 = car_ss_s3 + s3
# Проценты (без деления на 0)
def _pct(goal, shot):
return f"{round(goal*100/shot, 1)}%" if shot else "0.0%"
# Для «23» используем сумму 2-х и 3-х
T_g23 = T_g2 + T_g3
T_s23 = T_s2 + T_s3
car_T_g23 = car_T_g2 + car_T_g3
car_T_s23 = car_T_s2 + car_T_s3
# print(avg_season, total_season)
row = {
"id": item.get("personId") or "",
"num": item.get("displayNumber"),
"startRole": item.get("startRole"),
"role": item.get("positionName"),
"roleShort": (
[
r[1]
for r in role_list
if r[0].lower() == (item.get("positionName") or "").lower()
][0]
if any(
r[0].lower() == (item.get("positionName") or "").lower()
for r in role_list
)
else ""
),
"NameGFX": (
f"{(item.get('firstName') or '').strip()} {(item.get('lastName') or '').strip()}".strip()
if item.get("firstName") is not None
and item.get("lastName") is not None
else "Команда"
),
"Name1GFX": (item.get("firstName") or "").strip(),
"Name2GFX": (item.get("lastName") or "").strip(),
"captain": item.get("isCapitan", False),
"age": item.get("age") or 0,
"height": f"{item.get('height')} cm" if item.get("height") else 0,
"weight": f"{item.get('weight')} kg" if item.get("weight") else 0,
"isStart": stats.get("isStart", False),
"isOn": "🏀" if stats.get("isOnCourt") is True else "",
"isOnCourt": stats.get("isOnCourt", False),
"flag": (
"https://flagicons.lipis.dev/flags/4x3/"
+ (
"ru"
if item.get("countryId") is None
and item.get("countryName") == "Russia"
else (
""
if item.get("countryId") is None
else (
(item.get("countryId") or "").lower()
if item.get("countryName") is not None
else ""
)
)
)
+ ".svg"
),
"photoGFX": (
os.path.join(
"D:\\Photos",
result["league"]["abcName"],
result[who]["name"],
f"{item.get('displayNumber')}.png",
)
if item.get("startRole") == "Player"
else ""
),
# live-стата
"pts": _as_int(stats.get("points")),
"pt-2": f"{g2}/{s2}",
"pt-3": f"{g3}/{s3}",
"pt-1": f"{g1}/{s1}",
"fg": f"{g2+g3}/{s2+s3}",
"ast": _as_int(stats.get("assist")),
"stl": _as_int(stats.get("steal")),
"blk": _as_int(stats.get("block")),
"blkVic": _as_int(stats.get("blocked")),
"dreb": _as_int(stats.get("defReb")),
"oreb": _as_int(stats.get("offReb")),
"reb": _as_int(stats.get("defReb")) + _as_int(stats.get("offReb")),
"to": _as_int(stats.get("turnover")),
"foul": _as_int(stats.get("foul")),
"foulT": _as_int(stats.get("foulT")),
"foulD": _as_int(stats.get("foulD")),
"foulC": _as_int(stats.get("foulC")),
"foulB": _as_int(stats.get("foulB")),
"fouled": _as_int(stats.get("foulsOn")),
"plusMinus": _as_int(stats.get("plusMinus")),
"dunk": _as_int(stats.get("dunk")),
"kpi": (
_as_int(stats.get("points"))
+ _as_int(stats.get("defReb"))
+ _as_int(stats.get("offReb"))
+ _as_int(stats.get("assist"))
+ _as_int(stats.get("steal"))
+ _as_int(stats.get("block"))
+ _as_int(stats.get("foulsOn"))
+ (g1 - s1)
+ (g2 - s2)
+ (g3 - s3)
- _as_int(stats.get("turnover"))
- _as_int(stats.get("foul"))
),
"time": format_time(_as_int(stats.get("second"))),
# сезон — средние (из последнего Avg)
"AvgPoints": season_avg.get("points") or "0.0",
"AvgAssist": season_avg.get("assist") or "0.0",
"AvgBlocks": season_avg.get("blockShot") or "0.0",
"AvgDefRebound": season_avg.get("defRebound") or "0.0",
"AvgOffRebound": season_avg.get("offRebound") or "0.0",
"AvgRebound": season_avg.get("rebound") or "0.0",
"AvgSteal": season_avg.get("steal") or "0.0",
"AvgTurnover": season_avg.get("turnover") or "0.0",
"AvgFoul": season_avg.get("foul") or "0.0",
"AvgOpponentFoul": season_avg.get("foulsOnPlayer") or "0.0",
"AvgDunk": season_avg.get("dunk") or "0.0",
"AvgPlayedTime": season_avg.get("playedTime") or "0:00",
"Shot1Percent": season_avg.get("shot1Percent") or "0.0%",
"Shot2Percent": season_avg.get("shot2Percent") or "0.0%",
"Shot3Percent": season_avg.get("shot3Percent") or "0.0%",
"Shot23Percent": season_avg.get("shot23Percent") or "0.0%",
# сезон — Totals (суммы из Sum + live)
"TPoints": T_points,
"TShots1": f"{T_g1}/{T_s1}",
"TShots2": f"{T_g2}/{T_s2}",
"TShots3": f"{T_g3}/{T_s3}",
"TShots23": f"{T_g23}/{T_s23}",
"TShot1Percent": _pct(T_g1, T_s1),
"TShot2Percent": _pct(T_g2, T_s2),
"TShot3Percent": _pct(T_g3, T_s3),
"TShot23Percent": _pct(T_g23, T_s23),
"TAssist": T_assist,
"TBlocks": T_block,
"TDefRebound": T_dreb,
"TOffRebound": T_oreb,
"TRebound": T_reb,
"TSteal": T_steal,
"TTurnover": T_turn,
"TFoul": T_foul,
"TPlayedTime": format_time(T_sec),
"TGameCount": T_gms,
"TStartCount": T_starts,
# карьера — средние (из последнего Avg)
"Career_AvgPoints": career_avg.get("points") or "0.0",
"Career_AvgAssist": career_avg.get("assist") or "0.0",
"Career_AvgBlocks": career_avg.get("blockShot") or "0.0",
"Career_AvgDefRebound": career_avg.get("defRebound") or "0.0",
"Career_AvgOffRebound": career_avg.get("offRebound") or "0.0",
"Career_AvgRebound": career_avg.get("rebound") or "0.0",
"Career_AvgSteal": career_avg.get("steal") or "0.0",
"Career_AvgTurnover": career_avg.get("turnover") or "0.0",
"Career_AvgFoul": career_avg.get("foul") or "0.0",
"Career_AvgOpponentFoul": career_avg.get("foulsOnPlayer") or "0.0",
"Career_AvgDunk": career_avg.get("dunk") or "0.0",
"Career_AvgPlayedTime": career_avg.get("playedTime") or "0:00",
"Career_Shot1Percent": career_avg.get("shot1Percent") or "0.0%",
"Career_Shot2Percent": career_avg.get("shot2Percent") or "0.0%",
"Career_Shot3Percent": career_avg.get("shot3Percent") or "0.0%",
"Career_Shot23Percent": career_avg.get("shot23Percent") or "0.0%",
# карьера — Totals (суммы из Sum + live)
"Career_TPoints": car_T_points,
"Career_TShots1": f"{car_T_g1}/{car_T_s1}",
"Career_TShots2": f"{car_T_g2}/{car_T_s2}",
"Career_TShots3": f"{car_T_g3}/{car_T_s3}",
"Career_TShots23": f"{car_T_g23}/{car_T_s23}",
"Career_TShot1Percent": _pct(car_T_g1, car_T_s1),
"Career_TShot2Percent": _pct(car_T_g2, car_T_s2),
"Career_TShot3Percent": _pct(car_T_g3, car_T_s3),
"Career_TShot23Percent": _pct(car_T_g23, car_T_s23),
"Career_TAssist": car_T_assist,
"Career_TBlocks": car_T_block,
"Career_TDefRebound": car_T_dreb,
"Career_TOffRebound": car_T_oreb,
"Career_TRebound": car_T_reb,
"Career_TSteal": car_T_steal,
"Career_TTurnover": car_T_turn,
"Career_TFoul": car_T_foul,
"Career_TPlayedTime": format_time(car_T_sec),
"Career_TGameCount": car_T_gms,
"Career_TStartCount": car_T_starts,
"startNum": stats.get("startNum"),
"photoShotMapGFX": "",
}
team_rows.append(row)
# добиваем до 12 строк, чтобы UI был ровный
count_player = sum(1 for x in team_rows if x["startRole"] == "Player")
if count_player < 12 and team_rows:
filler_count = (4 if count_player <= 4 else 12) - count_player
template_keys = list(team_rows[0].keys())
for _ in range(filler_count):
empty_row = {}
for key in template_keys:
if key in ["captain", "isStart", "isOnCourt"]:
empty_row[key] = False
elif key in [
"id",
"pts",
"weight",
"height",
"age",
"ast",
"stl",
"blk",
"blkVic",
"dreb",
"oreb",
"reb",
"to",
"foul",
"foulT",
"foulD",
"foulC",
"foulB",
"fouled",
"plusMinus",
"dunk",
"kpi",
]:
empty_row[key] = 0
else:
empty_row[key] = ""
team_rows.append(empty_row)
# сортируем игроков по типу роли: сначала "Player", потом "", потом "Coach" и т.д.
role_priority = {
"Player": 0,
"": 1,
"Coach": 2,
"Team": 3,
None: 4,
"Other": 5,
}
sorted_team = sorted(
team_rows,
key=lambda x: role_priority.get(x.get("startRole", 99), 99),
)
# --- 👇 ДОБАВЛЯЕМ КАРТЫ БРОСКОВ ПО startNum --- #
# Берём play-by-play из текущего game, если он есть
plays = result.get("plays") or []
# Собираем множество startNum текущей команды, чтобы не ловить чужих игроков
team_startnums = {
p.get("startNum")
for p in payload.get("starts", [])
if p.get("startRole") == "Player"
}
# startNum -> список (x, y, is_made)
shots_by_startnum: dict[int, list[tuple[float, float, bool]]] = {}
for ev in plays:
play_code = ev.get("play")
if play_code not in (2, 3, 5, 6):
continue
sn = ev.get("startNum")
if sn not in team_startnums:
continue
x = ev.get("x")
y = ev.get("y")
if x is None or y is None:
continue
sec = ev.get("sec")
period = ev.get("period")
is_made = play_code in (2, 3) # 2,3 — точные, 5,6 — промахи
shots_by_startnum.setdefault(sn, []).append((x, y, is_made, sec, period))
# Рендерим по картинке на каждого startNum
shotmap_paths: dict[int, str] = {}
for sn, points in shots_by_startnum.items():
# timestamp/версия — просто количество бросков этого игрока.
# Увеличилось кол-во бросков → поменялся путь → vMix не возьмёт картинку из кеша.
version = str(len(points))
bib = str(sn) # можно заменить на номер игрока, если нужно
path = get_image(points, bib, version)
shotmap_paths[sn] = path
# Присоединяем пути к игрокам по startNum
for row in sorted_team:
sn = row.get("startNum")
if sn in shotmap_paths:
row["photoShotMapGFX"] = f"{shotmap_paths[sn]}"
else:
# если бросков не было — оставляем пустую строку
row["photoShotMapGFX"] = ""
return sorted_team
async def started_team(data):
return data
def add_new_team_stat(
data: dict,
avg_age: float,
points,
avg_height: float,
timeout_str: str,
timeout_left: int,
) -> dict:
"""
Берёт словарь total по команде (очки, подборы, броски и т.д.),
добавляет:
- проценты попаданий
- средний возраст / рост
- очки старт / бенч
- информацию по таймаутам
и всё приводит к строкам (для UI, чтобы не ловить типы).
Возвращает обновлённый словарь.
"""
def safe_int(v):
try:
return int(v)
except (ValueError, TypeError):
return 0
def format_percent(goal, shot):
goal, shot = safe_int(goal), safe_int(shot)
return f"{round(goal * 100 / shot)}%" if shot else "0%"
goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1"))
goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2"))
goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3"))
def_reb = safe_int(data.get("defReb"))
off_reb = safe_int(data.get("offReb"))
data.update(
{
"pt-1": f"{goal1}/{shot1}",
"pt-2": f"{goal2}/{shot2}",
"pt-3": f"{goal3}/{shot3}",
"fg": f"{goal2 + goal3}/{shot2 + shot3}",
"pt-1_pro": format_percent(goal1, shot1),
"pt-2_pro": format_percent(goal2, shot2),
"pt-3_pro": format_percent(goal3, shot3),
"fg_pro": format_percent(goal2 + goal3, shot2 + shot3),
"Reb": str(def_reb + off_reb),
"avgAge": str(avg_age),
"ptsStart": str(points[0]),
"ptsStart_pro": str(points[1]),
"ptsBench": str(points[2]),
"ptsBench_pro": str(points[3]),
"avgHeight": f"{avg_height} cm",
"timeout_left": str(timeout_left),
"timeout_str": str(timeout_str),
}
)
for k in data:
data[k] = str(data[k])
return data
def time_outs_func(data_pbp):
"""
Считает таймауты для обеих команд и формирует читабельные строки вида:
"2 Time-outs left in 2nd half"
Возвращает:
(строка_для_команды1, остаток1, строка_для_команды2, остаток2)
"""
timeout1 = []
timeout2 = []
for event in data_pbp:
if event.get("play") == 23: # 23 == таймаут
if event.get("startNum") == 1:
timeout1.append(event)
elif event.get("startNum") == 2:
timeout2.append(event)
def timeout_status(timeout_list, last_event: dict):
period = last_event.get("period", 0)
sec = last_event.get("sec", 0)
if period < 3:
timeout_max = 2
count = sum(1 for t in timeout_list if t.get("period", 0) <= period)
quarter = "1st half"
elif period < 5:
count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period)
quarter = "2nd half"
if period == 4 and sec >= 4800 and count in (0, 1):
timeout_max = 2
else:
timeout_max = 3
else:
timeout_max = 1
count = sum(1 for t in timeout_list if t.get("period", 0) == period)
quarter = f"OverTime {period - 4}"
left = max(0, timeout_max - count)
word = "Time-outs" if left != 1 else "Time-out"
text = f"{left if left != 0 else 'No'} {word} left in {quarter}"
return text, left
if not data_pbp:
return "", 0, "", 0
last_event = data_pbp[-1]
t1_str, t1_left = timeout_status(timeout1, last_event)
t2_str, t2_left = timeout_status(timeout2, last_event)
return t1_str, t1_left, t2_str, t2_left
def add_data_for_teams(new_data):
"""
Считает командные агрегаты:
- средний возраст
- очки со старта vs со скамейки, + их проценты
- средний рост
Возвращает кортеж:
(avg_age, [start_pts, start%, bench_pts, bench%], avg_height_cm)
"""
players = [item for item in new_data if item["startRole"] == "Player"]
points_start = 0
points_bench = 0
total_age = 0
total_height = 0
player_count = len(players)
for player in players:
# print(player)
stats = player["stats"]
if stats:
# print(stats)
if stats["isStart"] is True:
points_start += stats["points"]
elif stats["isStart"] is False:
points_bench += stats["points"]
total_age += player["age"]
total_height += player["height"]
total_points = points_start + points_bench
points_start_pro = (
f"{round(points_start * 100 / total_points)}%" if total_points else "0%"
)
points_bench_pro = (
f"{round(points_bench * 100 / total_points)}%" if total_points else "0%"
)
avg_age = round(total_age / player_count, 1) if player_count else 0
avg_height = round(total_height / player_count, 1) if player_count else 0
points = [points_start, points_start_pro, points_bench, points_bench_pro]
return avg_age, points, avg_height
stat_name_list = [
("points", "Очки", "points"),
("pt-1", "Штрафные", "free throws"),
("pt-1_pro", "штрафные, процент", "free throws pro"),
("pt-2", "2-очковые", "2-points"),
("pt-2_pro", "2-очковые, процент", "2-points pro"),
("pt-3", "3-очковые", "3-points"),
("pt-3_pro", "3-очковые, процент", "3-points pro"),
("fg", "очки с игры", "field goals"),
("fg_pro", "Очки с игры, процент", "field goals pro"),
("assist", "Передачи", "assists"),
("pass", "", ""),
("defReb", "подборы в защите", ""),
("offReb", "подборы в нападении", ""),
("Reb", "Подборы", "rebounds"),
("steal", "Перехваты", "steals"),
("block", "Блокшоты", "blocks"),
("blocked", "", ""),
("turnover", "Потери", "turnovers"),
("foul", "Фолы", "fouls"),
("foulsOn", "", ""),
("foulT", "", ""),
("foulD", "", ""),
("foulC", "", ""),
("foulB", "", ""),
("second", "секунды", "seconds"),
("dunk", "данки", "dunks"),
("fastBreak", "", "fast breaks"),
("plusMinus", "+/-", "+/-"),
("avgAge", "", "avg Age"),
("ptsBench", "", "Bench PTS"),
("ptsBench_pro", "", "Bench PTS, %"),
("ptsStart", "", "Start PTS"),
("ptsStart_pro", "", "Start PTS, %"),
("avgHeight", "", "avg height"),
("timeout_left", "", "timeout left"),
("timeout_str", "", "timeout str"),
]
@app.get("/team_stats")
async def team_stats():
teams = latest_data["game"]["data"]["result"]["teams"]
plays = latest_data["game"]["data"]["result"]["plays"]
team_1 = next((t for t in teams if t["teamNumber"] == 1), None)
team_2 = next((t for t in teams if t["teamNumber"] == 2), None)
timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays)
avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1["starts"])
avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2["starts"])
total_1 = add_new_team_stat(
team_1["total"],
avg_age_1,
points_1,
avg_height_1,
timeout_str1,
timeout_left1,
)
total_2 = add_new_team_stat(
team_2["total"],
avg_age_2,
points_2,
avg_height_2,
timeout_str2,
timeout_left2,
)
result_json = []
for key in total_1:
val1 = total_1[key]
val2 = total_2[key]
stat_rus = ""
stat_eng = ""
for metric_name, rus, eng in stat_name_list:
if metric_name == key:
stat_rus, stat_eng = rus, eng
break
result_json.append(
{
"name": key,
"nameGFX_rus": stat_rus,
"nameGFX_eng": stat_eng,
"val1": val1,
"val2": val2,
}
)
return maybe_clear_for_vmix(result_json)
@app.get("/referee")
async def referee():
desired_order = [
"Crew chief",
"Referee 1",
"Referee 2",
"Commissioner",
"Ст.судья",
"Судья 1",
"Судья 2",
"Комиссар",
]
# Найти судей (teamNumber == 0)
team_ref = next(
(
t
for t in latest_data["game"]["data"]["result"]["teams"]
if t["teamNumber"] == 0
),
None,
)
referees_raw = team_ref.get("starts", [])
referees = []
for r in referees_raw:
flag_code = r.get("countryId", "").lower() if r.get("countryName") else ""
referees.append(
{
"displayNumber": r.get("displayNumber", ""),
"positionName": r.get("positionName", ""),
"lastNameGFX": f"{r.get('firstName', '')} {r.get('lastName', '')}".strip(),
"secondName": r.get("secondName", ""),
"birthday": r.get("birthday", ""),
"age": r.get("age", 0),
"flag": f"https://flagicons.lipis.dev/flags/4x3/{flag_code}.svg",
}
)
# Сортировка по позиции
referees = sorted(
referees,
key=lambda x: (
desired_order.index(x["positionName"])
if x["positionName"] in desired_order
else len(desired_order)
),
)
return maybe_clear_for_vmix(referees)
@app.get("/team_comparison")
async def team_comparison():
if STATUS not in ["no_game_today", "finished_today"]:
data = latest_data["pregame"]["data"]["result"]
teams = []
for data_team in (data["teamStats1"], data["teamStats2"]):
temp_team = {
"team": data_team["team"]["name"],
"games": data_team["games"],
"points": round(
(data_team["totalStats"]["points"] / data_team["games"]), 1
),
"points_2": round(
(
data_team["totalStats"]["goal2"]
* 100
/ data_team["totalStats"]["shot2"]
),
1,
),
"points_3": round(
(
data_team["totalStats"]["goal3"]
* 100
/ data_team["totalStats"]["shot3"]
),
1,
),
"points_23": round(
(
data_team["totalStats"]["goal23"]
* 100
/ data_team["totalStats"]["shot23"]
),
1,
),
"points_1": round(
(
data_team["totalStats"]["goal1"]
* 100
/ data_team["totalStats"]["shot1"]
),
1,
),
"assists": round(
(data_team["totalStats"]["assist"] / data_team["games"]), 1
),
"rebounds": round(
(
(
data_team["totalStats"]["defRebound"]
+ data_team["totalStats"]["offRebound"]
)
/ data_team["games"]
),
1,
),
"steals": round(
(data_team["totalStats"]["steal"] / data_team["games"]), 1
),
"turnovers": round(
(data_team["totalStats"]["turnover"] / data_team["games"]), 1
),
"blocks": round(
(data_team["totalStats"]["blockShot"] / data_team["games"]), 1
),
"fouls": round(
(data_team["totalStats"]["foul"] / data_team["games"]), 1
),
}
teams.append(temp_team)
return maybe_clear_for_vmix(teams)
else:
return [{"Данных о сравнении команд нет!"}]
@app.get("/standings")
async def regular_standings():
data = latest_data["actual-standings"]["data"]["items"]
for item in data:
# if item["comp"]["name"] == "Regular Season":
if item.get("standings"):
standings_rows = item["standings"]
df = pd.json_normalize(standings_rows)
if "scores" in df.columns:
df = df.drop(columns=["scores"])
if (
"totalWin" in df.columns
and "totalDefeat" in df.columns
and "totalGames" in df.columns
and "totalGoalPlus" in df.columns
and "totalGoalMinus" in df.columns
):
tw = (
pd.to_numeric(df["totalWin"], errors="coerce").fillna(0).astype(int)
)
td = (
pd.to_numeric(df["totalDefeat"], errors="coerce")
.fillna(0)
.astype(int)
)
df["w_l"] = tw.astype(str) + " / " + td.astype(str)
def calc_percent(row):
win = row.get("totalWin", 0)
games = row.get("totalGames", 0)
# гарантируем числа
try:
win = int(win)
except (TypeError, ValueError):
win = 0
try:
games = int(games)
except (TypeError, ValueError):
games = 0
if games == 0 or row["w_l"] == "0 / 0":
return 0
return round(win * 100 / games + 0.000005)
df["procent"] = df.apply(calc_percent, axis=1)
tg_plus = (
pd.to_numeric(df["totalGoalPlus"], errors="coerce")
.fillna(0)
.astype(int)
)
tg_minus = (
pd.to_numeric(df["totalGoalMinus"], errors="coerce")
.fillna(0)
.astype(int)
)
df["plus_minus"] = tg_plus - tg_minus
standings_payload = df.to_dict(orient="records")
return maybe_clear_for_vmix(standings_payload)
@app.get("/live_status")
async def live_status():
# если матч реально идёт/вот-вот — пытаемся отдать то, что есть
if STATUS in ["live", "live_soon"]:
ls = latest_data.get("live-status")
if not ls:
# live-status ещё не прилетел
return maybe_clear_for_vmix(
[{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}]
)
raw = ls.get("data")
# 1) если это уже готовый dict и в нём есть result → как было раньше
if isinstance(raw, dict):
# иногда API кладёт всё прямо в root, иногда внутрь result
if "result" in raw and isinstance(raw["result"], dict):
return maybe_clear_for_vmix([raw["result"]])
else:
# отдадим как есть, но в списке, чтобы фронт не сломать
return maybe_clear_for_vmix([raw])
# 2) если это просто строка статуса ("ok" / "no-status" / "error")
if isinstance(raw, str):
return maybe_clear_for_vmix([{"status": raw}])
# fallback
return maybe_clear_for_vmix(
[{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}]
)
else:
# матч не идёт — как у тебя было
return maybe_clear_for_vmix(
[{"foulsA": 0, "foulsB": 0, "scoreA": 0, "scoreB": 0}]
)
def get_excel_row_for_team(team_name: str) -> dict:
"""
Ищем строку из Excel по имени команды (колонка 'Team').
Читаем из latest_data["excel"]["data"] (список dict'ов).
Возвращаем dict по команде или {} если не нашли / нет Excel.
"""
excel_wrap = latest_data.get("excel_TEAMS_LEGEND")
if not excel_wrap:
return {}
rows = excel_wrap.get("data") or [] # это list[dict] после df.to_dict("records")
team_norm = (team_name or "").strip().casefold()
for row in rows:
name = str(row.get("Team", "")).strip().casefold()
if name == team_norm:
return row
return {}
@app.get("/info")
async def info():
data = latest_data["game"]["data"]["result"]
team1_name = data["team1"]["name"]
team2_name = data["team2"]["name"]
team1_name_short_api = data["team1"]["abcName"]
team2_name_short_api = data["team2"]["abcName"]
team1_logo_api = data["team1"]["logo"]
team2_logo_api = data["team2"]["logo"]
arena_api = data["arena"]["name"]
arena_short_api = data["arena"]["shortName"]
region_api = data["region"]["name"]
date_obj_api = datetime.strptime(data["game"]["localDate"], "%d.%m.%Y")
league_api = data["league"]["abcName"]
league_full_api = data["league"]["name"]
season_api = (
f'{str(data["league"]["season"]-1)}/{str(data["league"]["season"])[2:]}'
)
stadia_api = data["comp"]["name"]
row_team1 = get_excel_row_for_team(team1_name)
row_team2 = get_excel_row_for_team(team2_name)
team1_logo_exl = row_team1.get("TeamLogo", "")
team1_logo_left_exl = row_team1.get("TeamLogo(LEFT-LOOP)", "")
team1_logo_right_exl = row_team1.get("TeamLogo(RIGHT-LOOP)", "")
team1_tla_exl = row_team1.get("TeamTLA", "")
team1_teamstat_exl = row_team1.get("TeamStats", "")
team1_2line_exl = row_team1.get("TeamName2Lines", "")
team2_logo_exl = row_team2.get("TeamLogo", "")
team2_logo_left_exl = row_team2.get("TeamLogo(LEFT-LOOP)", "")
team2_logo_right_exl = row_team2.get("TeamLogo(RIGHT-LOOP)", "")
team2_tla_exl = row_team2.get("TeamTLA", "")
team2_teamstat_exl = row_team2.get("TeamStats", "")
team2_2line_exl = row_team2.get("TeamName2Lines", "")
fon_exl = latest_data.get("excel_INFO")["data"][1]["SELECT TEAM1"]
swape_exl = latest_data.get("excel_INFO")["data"][2]["SELECT TEAM1"]
logo_exl = latest_data.get("excel_INFO")["data"][3]["SELECT TEAM1"]
try:
full_format = date_obj_api.strftime("%A, %-d %B %Y")
short_format = date_obj_api.strftime("%A, %-d %b")
except ValueError:
full_format = date_obj_api.strftime("%A, %#d %B %Y")
short_format = date_obj_api.strftime("%A, %#d %b")
return maybe_clear_for_vmix(
[
{
"team1": team1_name,
"team2": team2_name,
"team1_short_api": team1_name_short_api,
"team2_short_api": team2_name_short_api,
"logo1_api": team1_logo_api,
"logo2_api": team2_logo_api,
"arena_api": arena_api,
"short_arena_api": arena_short_api,
"region_api": region_api,
"league_api": league_api,
"league_full_api": league_full_api,
"season_api": season_api,
"stadia_api": stadia_api,
"date1_api": str(full_format),
"date2_api": str(short_format),
"team1_logo_exl": team1_logo_exl,
"team1_logo_left_exl": team1_logo_left_exl,
"team1_logo_right_exl": team1_logo_right_exl,
"team1_tla_exl": team1_tla_exl,
"team1_teamstat_exl": team1_teamstat_exl,
"team1_2line_exl": team1_2line_exl,
"team2_logo_exl": team2_logo_exl,
"team2_logo_left_exl": team2_logo_left_exl,
"team2_logo_right_exl": team2_logo_right_exl,
"team2_tla_exl": team2_tla_exl,
"team2_teamstat_exl": team2_teamstat_exl,
"team2_2line_exl": team2_2line_exl,
"fon_exl": fon_exl,
"swape_exl": swape_exl,
"logo_exl": logo_exl,
}
]
)
@app.get("/play_by_play")
async def play_by_play():
data = latest_data["game"]["data"]["result"]
data_pbp = data["plays"]
team1_name = data["team1"]["name"]
team2_name = data["team2"]["name"]
team1_startnum = [
i["startNum"]
for i in next(
(t for t in data["teams"] if t["teamNumber"] == 1),
None,
)["starts"]
if i["startRole"] == "Player"
]
team2_startnum = [
i["startNum"]
for i in next(
(t for t in data["teams"] if t["teamNumber"] == 2),
None,
)["starts"]
if i["startRole"] == "Player"
]
# если вообще нет плей-бай-плея — просто отдаём пустой список
if not data_pbp:
return maybe_clear_for_vmix([])
df_data_pbp = pd.DataFrame(data_pbp[::-1])
last_event = data_pbp[-1]
if "play" not in df_data_pbp:
return maybe_clear_for_vmix([])
if (
"live-status" in latest_data
and latest_data["live-status"]["data"] != "Not Found"
):
json_quarter = latest_data["live-status"]["data"]["result"]["period"]
json_second = latest_data["live-status"]["data"]["result"]["second"]
else:
json_quarter = last_event["period"]
json_second = 0
if "3x3" in LEAGUE:
df_data_pbp["play"].replace({2: 1, 3: 2}, inplace=True)
df_goals = df_data_pbp.loc[df_data_pbp["play"].isin([1, 2, 3])].copy()
if df_goals.empty:
return maybe_clear_for_vmix([])
df_goals.loc[df_goals["startNum"].isin(team1_startnum), "score1"] = df_goals["play"]
df_goals.loc[df_goals["startNum"].isin(team2_startnum), "score2"] = df_goals["play"]
df_goals["score_sum1"] = df_goals["score1"].fillna(0).cumsum()
df_goals["score_sum2"] = df_goals["score2"].fillna(0).cumsum()
df_goals["new_sec"] = df_goals["sec"].astype(str).str.slice(0, -1).astype(int)
df_goals["time_now"] = (600 if json_quarter < 5 else 300) - json_second
df_goals["quar"] = json_quarter - df_goals["period"]
# без numpy: diff_time через маски pandas
same_quarter = df_goals["quar"] == 0
other_quarter = ~same_quarter
df_goals.loc[same_quarter, "diff_time"] = (
df_goals.loc[same_quarter, "time_now"] - df_goals.loc[same_quarter, "new_sec"]
)
df_goals.loc[other_quarter, "diff_time"] = (
600 * df_goals.loc[other_quarter, "quar"]
- df_goals.loc[other_quarter, "new_sec"]
+ df_goals.loc[other_quarter, "time_now"]
)
df_goals["diff_time"] = df_goals["diff_time"].astype(int)
df_goals["diff_time_str"] = df_goals["diff_time"].apply(
lambda x: f"{x // 60}:{str(x % 60).zfill(2)}"
)
df_goals["team"] = df_goals.apply(
lambda row: team1_name if not pd.isna(row["score1"]) else team2_name,
axis=1,
)
df_goals["text_rus"] = df_goals.apply(
lambda row: (
f"рывок {int(row['score_sum1'])}-{int(row['score_sum2'])}"
if not pd.isna(row["score1"])
else f"рывок {int(row['score_sum2'])}-{int(row['score_sum1'])}"
),
axis=1,
)
df_goals["text_time_rus"] = df_goals.apply(
lambda row: (
f"рывок {int(row['score_sum1'])}-{int(row['score_sum2'])} за {row['diff_time_str']}"
if not pd.isna(row["score1"])
else f"рывок {int(row['score_sum2'])}-{int(row['score_sum1'])} за {row['diff_time_str']}"
),
axis=1,
)
df_goals["text"] = df_goals.apply(
lambda row: (
f"{team1_name} {int(row['score_sum1'])}-{int(row['score_sum2'])} run"
if not pd.isna(row["score1"])
else f"{team2_name} {int(row['score_sum2'])}-{int(row['score_sum1'])} run"
),
axis=1,
)
df_goals["text_time"] = df_goals.apply(
lambda row: (
f"{team1_name} {int(row['score_sum1'])}-{int(row['score_sum2'])} run in last {row['diff_time_str']}"
if not pd.isna(row["score1"])
else f"{team2_name} {int(row['score_sum2'])}-{int(row['score_sum1'])} run in last {row['diff_time_str']}"
),
axis=1,
)
new_order = ["text", "text_time"] + [
col for col in df_goals.columns if col not in ["text", "text_time"]
]
df_goals = df_goals[new_order]
for _ in ["children", "start", "stop", "hl", "sort", "startNum", "zone", "x", "y"]:
if _ in df_goals.columns:
del df_goals[_]
# 👉 здесь избавляемся от NaN: только для score1/score2
df_goals["score1"] = df_goals["score1"].fillna("")
df_goals["score2"] = df_goals["score2"].fillna("")
# если хочешь вообще никаких NaN во всём JSON — можно так:
# df_goals = df_goals.fillna("")
# print(payload)
payload = df_goals.to_dict(orient="records")
return maybe_clear_for_vmix(payload)
def change_vmix_datasource_urls(xml_data, new_base_url: str) -> bytes:
"""
Ищет все <datasource friendlyName="JSON"> и меняет <url> внутри на new_base_url + endpoint.
"""
# 1. Приводим вход к bytes
if isinstance(xml_data, (bytes, bytearray)):
raw_bytes = bytes(xml_data)
elif isinstance(xml_data, str):
raw_bytes = xml_data.encode("utf-8")
elif isinstance(xml_data, io.IOBase) or hasattr(xml_data, "read"):
# nasio.load_bio, скорее всего, возвращает BytesIO
raw_bytes = xml_data.read()
try:
xml_data.seek(0)
except Exception:
pass
else:
raise TypeError(f"Unsupported xml_data type: {type(xml_data)}")
# 2. Декодируем
text = raw_bytes.decode("utf-8", errors="replace")
# 3. Парсим XML
root = ET.fromstring(text)
# 4. Меняем URL
for ds in root.findall(".//datasource[@friendlyName='JSON']"):
for inst in ds.findall(".//instance"):
url_tag = inst.find(".//state/xml/url")
if url_tag is not None and url_tag.text:
old_url = url_tag.text.strip()
pattern = r"https?\:\/\/\w+\.\w+\.\w{2,}|https?\:\/\/\d{,3}\.\d{,3}\.\d{,3}\.\d{,3}\:\d*"
new_url = re.sub(pattern, new_base_url, old_url, count=0, flags=0)
url_tag.text = new_url
# 5. Сериализуем обратно в bytes
new_xml = ET.tostring(root, encoding="utf-8", method="xml")
return new_xml
@app.get("/vmix")
async def vmix_project():
vmix_bio = nasio.load_bio(
user=SYNO_USERNAME,
password=SYNO_PASSWORD,
nas_ip=SYNO_URL,
nas_port="443",
path=SYNO_PATH_VMIX,
)
# system_name = platform.system()
# if system_name == "Windows":
# pass
# else:
# ❗ На Linux/Synology/Docker — заменяем URL
edited_vmix = change_vmix_datasource_urls(vmix_bio, f"https://{MYHOST}.tvstart.ru")
# 2. гарантируем, что это bytes
if isinstance(edited_vmix, str):
edited_vmix = edited_vmix.encode("utf-8")
return StreamingResponse(
io.BytesIO(edited_vmix),
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="VTB_{MYHOST}.vmix"'},
)
@app.get("/quarter")
async def select_quarter():
return latest_data["excel_QUARTER"]
def resolve_period(ls: dict, game: dict) -> str:
try:
period_num = int(ls.get("period", 0))
except (TypeError, ValueError):
return game.get("period", "")
try:
seconds_left = int(ls.get("second", 0))
except (TypeError, ValueError):
seconds_left = 0
if period_num <= 0:
return game.get("period", "")
score_a = ls.get("scoreA", game.get("score1"))
score_b = ls.get("scoreB", game.get("score2"))
if seconds_left == 0: # период закончился
if period_num == 1:
return "End 1q"
if period_num == 2:
return "HT"
if period_num == 3:
return "End 3q"
if period_num == 4:
return "End 4q" if score_a == score_b else ""
# овертаймы
return f"End OT{period_num - 4}".replace("1", "")
else: # период в процессе
if period_num <= 4:
return f"Q{period_num}"
return f"OT{period_num - 4}".replace("1", "")
@app.get("/games_online")
async def games_online():
if not CALENDAR or "items" not in CALENDAR:
raise HTTPException(status_code=503, detail="calendar data not ready")
today = datetime.now().date()
# today = (datetime.now() + timedelta(days=1)).date()
todays_games = []
final_states = {"result", "resultconfirmed", "finished"}
for item in CALENDAR["items"]:
game = item.get("game") or {}
status_raw = str(game.get("gameStatus", "") or "").lower()
need_refresh = status_raw not in final_states
dt_str = game.get("defaultZoneDateTime") or ""
try:
game_dt = datetime.fromisoformat(dt_str).date()
except ValueError:
continue
if game_dt == today:
row_team1 = get_excel_row_for_team(item["team1"]["name"]) or {}
row_team2 = get_excel_row_for_team(item["team2"]["name"]) or {}
game["team1_xls"] = row_team1.get("TeamName2Lines", "")
game["team1_logo_xls"] = row_team1.get("TeamLogo", "")
game["team2_xls"] = row_team2.get("TeamName2Lines", "")
game["team2_logo_xls"] = row_team2.get("TeamLogo", "")
game_id = game.get("id")
if game_id and need_refresh:
try:
resp = requests.get(
URLS["live-status"].format(host=HOST, game_id=game_id),
timeout=5,
).json()
ls = resp.get("result") or resp
msg = str(ls.get("message") or "").lower()
status = str(ls.get("status") or "").lower()
if msg == "not found" or status == "404":
pass
elif (
ls.get("message") != "Not found"
and str(ls.get("gameStatus")).lower() == "online"
):
game["score1"] = ls.get("scoreA", game.get("score1", ""))
game["score2"] = ls.get("scoreB", game.get("score2", ""))
game["period"] = resolve_period(ls, game)
game["gameStatus"] = ls.get(
"gameStatus", game.get("gameStatus", "")
)
except Exception as ex:
print(ex)
scores = [game.get("score1"), game.get("score2")]
todays_games.append(
{
"gameStatus": game["gameStatus"],
"score1": (
game["score1"] if any((s or 0) > 0 for s in scores) else ""
),
"score2": (
game["score2"] if any((s or 0) > 0 for s in scores) else ""
),
"period": (
game["period"]
if "period" in game and any((s or 0) > 0 for s in scores)
else ""
),
"defaultZoneTime": game["defaultZoneTime"],
"team1": item["team1"]["name"],
"team2": item["team2"]["name"],
"team1_xls": game["team1_xls"],
"team1_logo_xls": game["team1_logo_xls"],
"team2_xls": game["team2_xls"],
"team2_logo_xls": game["team2_logo_xls"],
"mask1": (
"#FFFFFF00" if any((s or 0) > 0 for s in scores) else "#FFFFFF"
),
"mask2": (
"#FFFFFF00"
if (
game["period"]
if "period" in game and any((s or 0) > 0 for s in scores)
else ""
)
== ""
else "#FFFFFF"
),
"mask3": (
"#FFFFFF00"
if (game["period"] if "period" in game else "") != ""
else "#FFFFFF"
),
"split": (
":"
if any((s or 0) > 0 for s in scores)
and (
game["period"]
if "period" in game and any((s or 0) > 0 for s in scores)
else ""
)
== ""
else ""
),
}
)
return todays_games
def get_image(points, bib, count_point):
"""
points: список кортежей (x, y, is_made, sec, period)
x, y — координаты из API, где (0,0) = центр кольца
is_made — True (play 2,3) или False (play 5,6)
bib: startNum/номер игрока
count_point: строка-версия (анти-кэш vMix)
"""
if not points:
return ""
base_image = Image.new("RGBA", (1500, 2800), (0, 0, 0, 0))
width, height = base_image.size
draw = ImageDraw.Draw(base_image)
# === Диапазон координат ===
COURT_WIDTH_UNITS = 150.0 # по X
COURT_LENGTH_UNITS = 280.0 # по Y
scale_x = height / COURT_LENGTH_UNITS
scale_y = width / COURT_WIDTH_UNITS
HOOP_X_PX = 750
HOOP_Y_PX = 157.5
def to_px(x, y):
px = int(HOOP_X_PX - x * scale_x)
py = int(HOOP_Y_PX + y * scale_y)
return px, py
point_radius = 30
# --- шрифт ---
font = ImageFont.load_default()
try:
nasio_font = SYNO_FONT_PATH
if nasio_font:
if isinstance(nasio_font, BytesIO):
nasio_font = nasio_font.getvalue()
if isinstance(nasio_font, (bytes, bytearray)):
font = ImageFont.truetype(BytesIO(nasio_font), 18)
else:
font = ImageFont.truetype(nasio_font, 18)
except Exception as e:
logger.warning(f"[shotmap] не удалось загрузить шрифт: {e}")
for x_raw, y_raw, is_made, sec, period in points:
try:
x = float(x_raw)
y = float(y_raw)
except (TypeError, ValueError):
continue
px, py = to_px(x, y)
# --- выбираем bytes иконки ---
icon_bytes = SYNO_GOAL if is_made else SYNO_MISS
used_icon = False
if isinstance(icon_bytes, (bytes, bytearray)) and icon_bytes:
try:
buf = BytesIO(icon_bytes)
icon = Image.open(buf).convert("RGBA")
size = point_radius * 2
icon = icon.resize((size, size), Image.LANCZOS)
base_image.paste(icon, (px - point_radius, py - point_radius), icon)
used_icon = True
except Exception as e:
logger.warning(
f"[shotmap] не удалось открыть иконку из bytes "
f"(is_made={is_made}, len={len(icon_bytes)}): {e}"
)
else:
logger.warning(
f"[shotmap] icon_bytes пустой или не bytes "
f"(is_made={is_made}, type={type(icon_bytes)})"
)
# --- Fallback: кружок, если картинка не нарисовалась ---
if not used_icon:
bbox = (
px - point_radius,
py - point_radius,
px + point_radius,
py + point_radius,
)
color = (0, 255, 0, 255) if is_made else (255, 0, 0, 255)
draw.ellipse(bbox, fill=color)
# --- подпись Q{period} по центру ---
label = f"Q{period}"
# узнаём размер текста (Pillow 10+ — textbbox, старые — textsize)
try:
bbox = draw.textbbox((0, 0), label, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
except AttributeError:
# fallback для старых версий Pillow
text_w, text_h = draw.textsize(label, font=font)
# центр иконки = центр текста
center_x, center_y = px, py
text_x = center_x - text_w // 2
text_y = center_y - text_h // 2
# тень
draw.text((text_x + 1, text_y + 1), label, font=font, fill=(0, 0, 0, 255))
# текст
draw.text((text_x, text_y), label, font=font, fill=(255, 255, 255, 255))
# --- сохраняем картинку в оперативную память ---
filename = f"shots_{bib}_{count_point}.png"
buf = BytesIO()
try:
# compress_level=1 — быстрее, чем дефолт
base_image.save(buf, format="PNG", compress_level=1)
except Exception as e:
logger.warning(f"[shotmap] не удалось сохранить shotmap в память: {e}")
return ""
data = buf.getvalue()
SHOTMAP_CACHE[filename] = data # кладём в RAM
# формируем URL для vMix
public_base = f"https://{MYHOST}.tvstart.ru"
public_url = f"{public_base.rstrip('/')}/shotmaps/{filename}"
# logger.info(
# f"[shotmap] generated in-memory shotmap for bib={bib}, ver={count_point} "
# f"-> {filename}, url={public_url}"
# )
return public_url
if __name__ == "__main__":
uvicorn.run(
"get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="debug"
)