Files
RFB/get_data.py

2139 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import os, tempfile
import json
import tempfile
import argparse
import platform
import logging
import logging.config
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from typing import Any, Dict, List, Tuple, Optional
from pathlib import Path
import pandas as pd
import numpy as np
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# ============================================================================
# 1. Константы / глобальные объекты
# ============================================================================
HOST = "https://ref.russiabasket.org"
# Таймзона, в которой мы считаем время матчей / расписания / сна до завтра
APP_TZ = ZoneInfo("Europe/Moscow")
MYHOST = platform.node()
TELEGRAM_BOT_TOKEN = "7639240596:AAH0YtdQoWZSC-_R_EW4wKAHHNLIA0F_ARY"
# TELEGRAM_CHAT_ID = 228977654
TELEGRAM_CHAT_ID = -4803699526
# Глобальный лок для потокобезопасной записи JSON
_write_lock_api = threading.Lock()
_write_lock_out = threading.Lock()
_pregame_done_for_game = {}
# Карта всех ручек API, с интервалами опроса в секундах.
URLS = [
{
"name": "seasons",
"url": "{host}/api/abc/comps/seasons?Tag={league}&Lang={lang}",
"interval": 86400, # раз в сутки
},
{
"name": "standings",
"url": "{host}/api/abc/comps/standings?tag={league}&season={season}&lang={lang}",
"interval": 1800, # раз в 30 минут
},
{
"name": "calendar",
"url": "{host}/api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000",
"interval": 86400, # раз в сутки
},
{
"name": "game",
"url": "{host}/api/abc/games/game?Id={game_id}&Lang={lang}",
"interval": 60, # раз в 10 минут
},
{
"name": "pregame",
"url": "{host}/api/abc/games/pregame?tag={league}&season={season}&Id={game_id}&Lang={lang}",
"interval": 86400, # раз в сутки
},
{
"name": "pregame-fullstats",
"url": "{host}/api/abc/games/pregame-fullstats?tag={league}&season={season}&id={game_id}&lang={lang}",
"interval": 600, # раз в 10 минут
},
{
"name": "live-status",
"url": "{host}/api/abc/games/live-status?id={game_id}",
"interval": 1, # каждую секунду
},
{
"name": "box-score",
"url": "{host}/api/abc/games/box-score?Id={game_id}",
"interval": 1, # каждую секунду
},
{
"name": "play-by-play",
"url": "{host}/api/abc/games/play-by-play?Id={game_id}",
"interval": 1, # каждую секунду
},
]
# ============================================================================
# 2. Логирование
# ============================================================================
if not os.path.exists("logs"):
os.makedirs("logs")
LOG_CONFIG = {
"version": 1,
"handlers": {
"telegram": {
"class": "telegram_handler.TelegramHandler",
"level": "INFO",
"token": TELEGRAM_BOT_TOKEN,
"chat_id": TELEGRAM_CHAT_ID,
"formatter": "telegram",
},
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "simple",
"filename": f"logs/GFX_{MYHOST}.log",
"encoding": "utf-8",
},
},
"loggers": {
__name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"},
},
"formatters": {
"telegram": {
"class": "telegram_handler.HtmlFormatter",
"format": f"%(levelname)s [{MYHOST.upper()}] %(message)s",
"use_emoji": "True",
},
"simple": {
"class": "logging.Formatter",
"format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s",
"datefmt": "%d.%m.%Y %H:%M:%S",
},
},
}
logging.config.dictConfig(LOG_CONFIG)
logger = logging.getLogger(__name__)
logger.handlers[2].formatter.use_emoji = True
# ============================================================================
# 3. I/O вспомогательные функции
# ============================================================================
def _select_lock(path: str):
filename = os.path.basename(path)
# все сырые файлы мы называем api_*.json (api_game.json, api_box-score.json, ...)
if filename.startswith("api_"):
return _write_lock_api
return _write_lock_out
def atomic_write_json(path: str, data: Any) -> None:
"""
Безопасно записывает JSON:
1. Сериализуем в память (без локов).
2. Под коротким локом - пишем tmp и делаем os.replace().
"""
# 1. Готовим данные заранее
# ensure_ascii=False чтобы не терять юникод, indent=None чтобы не раздувать файл
payload = json.dumps(data, ensure_ascii=False, separators=(",", ":"))
full_path = "static/" + path + ".json"
try:
with open(full_path, "r", encoding="utf-8") as f:
if f.read() == payload:
return # ничего не поменялось -> не пишем, не fsync'им
except FileNotFoundError:
pass
target = Path(full_path)
tmp_fd, tmp_path = tempfile.mkstemp(
dir=target.parent,
prefix=target.name + ".tmp.",
text=True,
)
os.close(tmp_fd) # мы будем писать сами
lock = _select_lock(full_path)
with lock:
# 2a. Записываем полностью во временный файл
with open(tmp_path, "w", encoding="utf-8") as f:
f.write(payload)
f.flush()
os.fsync(f.fileno())
# 2b. Атомарно подменяем
os.replace(tmp_path, target)
def read_local_json(name: str, in_dir: str = "static") -> Optional[dict]:
"""
Безопасно читает static/<name>.json.
Возвращает dict или None.
Не кидает исключение, если файл не существует или был в моменте перезаписи.
Это важно для render_loop, который читает файлы параллельно с poll_game_live.
"""
filename = os.path.join(in_dir, f"{name}.json")
try:
with open(filename, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return None
except json.JSONDecodeError:
# файл мог быть в моменте перезаписи -> пропустим тик
return None
except Exception as ex:
logger.exception(f"read_local_json({name}) error: {ex}")
return None
def _now_iso() -> str:
"""
Возвращает текущее время в ISO-формате UTC ("2025-10-27T12:34:56Z").
Это кладётся в итоговый JSON как метаданные генерации.
"""
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# ============================================================================
# 4. Работа с HTTP / API
# ============================================================================
def create_session() -> requests.Session:
"""
Создаёт requests.Session с ретраями и дефолтными заголовками.
Эту сессию потом используем для всех запросов (в том числе в live-пуле).
"""
session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
)
session.mount("https://", HTTPAdapter(max_retries=retries))
session.headers.update(
{
"Connection": "keep-alive",
"Accept": "application/json, */*",
"Accept-Encoding": "gzip, deflate, br",
"User-Agent": "game-watcher/1.0",
}
)
return session
def build_url(name: str, **kwargs) -> str:
"""
Собрать конечный URL по имени эндпоинта из URLS.
Пример:
build_url("standings", host=..., league=..., season=..., lang=...)
"""
template = next((u["url"] for u in URLS if u["name"] == name), None)
if not template:
raise ValueError(f"Unknown URL name: {name}")
return template.format(**kwargs)
def get_json(session: requests.Session, url: str, name: str) -> Any:
"""
Выполняет GET к API, падает, если HTTP != 2xx,
складывает ответ в static/api_<name>.json (сырой ответ API),
и возвращает распарсенный json.
"""
resp = session.get(url, timeout=10)
resp.raise_for_status()
data = resp.json()
atomic_write_json(f"api_{name}", data)
return data
def get_items(data: dict) -> Optional[list]:
"""
Мелкий хелпер: берём первый список в ответе API.
Многие ручки отдают {"result":[...]} или {"seasons":[...]}.
Если находим список — возвращаем его.
Если нет — возвращаем None (значит, нужно брать весь dict).
"""
for k, v in data.items():
if isinstance(v, list):
return data[k]
return None
def fetch_api_data(
session: requests.Session, name: str, name_save: str = None, **kwargs
) -> Any:
"""
Универсальный обёртчик над API:
- строит URL по имени ручки,
- тянет данные через get_json(),
- ищет "главный" список (get_items),
- возвращает список или весь dict.
Параллельно пишет в static/api_<name>.json (через get_json()).
"""
url = build_url(name, **kwargs)
try:
json_data = get_json(session, url, name_save or name)
if json_data:
items = get_items(json_data)
return items if items is not None else json_data
return None
except Exception as ex:
logger.error(f"{url} | {ex}")
def poll_one_endpoint(
session: requests.Session,
endpoint_name: str,
league: str,
season: str,
game_id: int,
lang: str,
) -> Tuple[str, Any]:
"""
Вызывает конкретный эндпоинт (box-score, live-status, play-by-play и т.д.),
возвращает кортеж (имя_эндпоинта, данные_или_None).
Используется внутри poll_game_live() для параллельного опроса API.
"""
if endpoint_name == "live-status":
data = fetch_api_data(session, "live-status", host=HOST, game_id=game_id)
return endpoint_name, data
if endpoint_name == "box-score":
data = fetch_api_data(session, "box-score", host=HOST, game_id=game_id)
return endpoint_name, data
if endpoint_name == "play-by-play":
data = fetch_api_data(session, "play-by-play", host=HOST, game_id=game_id)
return endpoint_name, data
if endpoint_name == "game":
data = fetch_api_data(session, "game", host=HOST, game_id=game_id, lang=lang)
return endpoint_name, data
if endpoint_name == "pregame-fullstats":
data = fetch_api_data(
session,
"pregame-fullstats",
host=HOST,
league=league,
season=season,
game_id=game_id,
lang=lang,
)
return endpoint_name, data
# fallback — "ну вдруг добавим что-то ещё"
data = fetch_api_data(session, endpoint_name, host=HOST, game_id=game_id, lang=lang)
return endpoint_name, data
def get_interval_by_name(name: str) -> int:
"""
Возвращает рекомендуемый интервал опроса эндпоинта в секундах,
как задано в URLS.
"""
for u in URLS:
if u["name"] == name:
return u["interval"]
raise ValueError(f"interval not found for {name}")
# ============================================================================
# 5. Работа с расписанием / статусом матча
# ============================================================================
def parse_game_start_dt(item: dict) -> datetime:
"""
Достаёт дату/время начала матча из объекта календаря и нормализует в APP_TZ.
Источники времени в порядке приоритета:
1. game.defaultZoneDateTime (обычно уже с таймзоной лиги)
2. game.scheduledTime (ISO8601 с оффсетом)
3. game.startTime
4. (fallback) game.localDate + game.localTime (считаем как APP_TZ)
Возвращает timezone-aware datetime в APP_TZ.
"""
g = item.get("game", {}) if "game" in item else item
raw = g.get("defaultZoneDateTime") or g.get("scheduledTime") or g.get("startTime")
if raw:
try:
dt = datetime.fromisoformat(raw) # ISO-8601
return dt.astimezone(APP_TZ)
except Exception as e:
raise RuntimeError(f"Ошибка парсинга ISO времени '{raw}': {e}")
# fallback: localDate + localTime, формата "30.09.2025" + "19:00"
ld, lt = g.get("localDate"), g.get("localTime")
if ld and lt:
try:
naive = datetime.strptime(f"{ld} {lt}", "%d.%m.%Y %H:%M")
aware = naive.replace(tzinfo=APP_TZ)
return aware
except Exception as e:
raise RuntimeError(f"Ошибка парсинга localDate/localTime '{ld} {lt}': {e}")
raise RuntimeError(
"Не найдено ни одного подходящего поля времени (defaultZoneDateTime/"
"scheduledTime/startTime/localDate+localTime)."
)
def get_game_id(
team_games: List[dict],
team: str,
) -> Tuple[Optional[dict], Optional[dict]]:
"""
Находим интересующую нас игру.
Логика (важно):
- считаем, что интересующая нас команда — это team1 (домашняя),
и сравниваем по имени.
- если есть игра сегодня -> это today_game
- иначе берём последнюю уже завершённую игру -> last_played
- возвращаем (today_game, last_played)
Если и того и другого нет -> (None, None).
"""
now = datetime.now(APP_TZ)
today = now.date()
today_game = None
last_played = None
for g in team_games:
start = parse_game_start_dt(g)
status = g.get("game", {}).get("gameStatus", "").lower()
# Игра сегодня для этой команды
if (
start.date() == today
and today_game is None
and g["team1"]["name"].lower() == team.lower()
):
today_game = g
last_played = None
# Последняя завершённая игра (resultconfirmed)
elif (
start <= now
and status == "resultconfirmed"
and g["team1"]["name"].lower() == team.lower()
):
today_game = None
last_played = g
return today_game, last_played
def is_game_live(game_obj: dict) -> bool:
"""
Пытаемся понять, идёт ли матч прямо сейчас.
Правила:
- 'resultconfirmed' / 'finished' / 'result' => матч уже окончен
- 'scheduled' / 'notstarted' / 'draft' => матч ещё не начался
- всё остальное считаем лайвом (в том числе 'online', 'inprogress', и т.п.)
"""
status = (game_obj.get("gameStatus") or "").lower()
if status in ("resultconfirmed", "finished", "result"):
return False
if status in ("notstarted", "draft"):
return False
return True
def classify_game_state_from_status(status_raw: str) -> str:
"""
Делит статус игры на три фазы:
- "finished" -> матч точно завершён
- "upcoming" -> матч ещё не начался, но он сегодня
- "live" -> матч идёт
Используется в get_data_API(), чтобы решить, что делать дальше.
"""
status = (status_raw or "").lower()
if status in ("resultconfirmed", "finished", "result"):
return "finished"
if status in ("", "notstarted", "draft"):
return "upcoming"
# всё остальное считаем лайвом
return "live"
# ============================================================================
# 6. Лайв-петля: опрос API и поток рендера
# ============================================================================
def poll_game_live(
session: requests.Session,
league: str,
season: str,
game_id: int,
lang: str,
game_meta: dict,
stop_event: threading.Event,
) -> None:
slow_endpoints = ["game"] # "pregame-fullstats" можно вернуть по желанию
fast_endpoints = ["live-status", "box-score", "play-by-play"]
last_call = {name: 0 for name in slow_endpoints + fast_endpoints}
with ThreadPoolExecutor(max_workers=5) as executor:
while True:
if stop_event.is_set():
logger.info(
f"[POLL] stop_event set -> break live poll for game {game_id}"
)
break
now = time.time()
to_run = []
for ep in fast_endpoints + slow_endpoints:
interval = get_interval_by_name(ep)
if now - last_call[ep] >= interval:
to_run.append(ep)
futures = []
if to_run:
for ep in to_run:
futures.append(
executor.submit(
poll_one_endpoint,
session,
ep,
league,
season,
game_id,
lang,
)
)
game_finished = False
for fut in as_completed(futures):
try:
ep_name, data = fut.result()
last_call[ep_name] = now
if ep_name == "live-status":
# data может быть:
# {"status":"404","message":"Not found","result":None}
# или {"status":"200","result":{"gameStatus":"Online", ...}}
ls_result = (
data.get("result") if isinstance(data, dict) else None
)
game_status = ""
if isinstance(ls_result, dict):
game_status = (ls_result.get("gameStatus") or "").lower()
if game_status in ("resultconfirmed", "finished", "result"):
logger.info(
f"[POLL] Game {game_id} finished by live-status"
)
game_finished = True
except Exception as e:
# логируем, но не роняем поток
logger.exception(f"[POLL] poll endpoint error: {e}")
# страховка: календарь говорит, что матч не лайв -> выходим
if not is_game_live(game_meta):
logger.info(f"[POLL] Game {game_id} no longer live by calendar meta")
break
if game_finished:
break
time.sleep(0.2)
if stop_event.is_set():
logger.info(
f"[POLL] stop_event set after sleep -> break live poll for game {game_id}"
)
break
def build_render_state() -> dict:
"""
Собирает итоговое состояние матча (merged dict) для графики/внешки.
Возвращает минимально возможный state, даже если часть данных ещё не доступна.
"""
game_data_raw = read_local_json("api_game")
live_status_data = read_local_json("api_live-status")
box_score_data = read_local_json("api_box-score")
play_by_play_data = read_local_json("api_play-by-play")
# Без api_game у нас вообще нет каркаса матча -> это критично
if not game_data_raw or "result" not in game_data_raw:
raise RuntimeError("build_render_state(): api_game/result отсутствует")
game_data = game_data_raw["result"]
# Защитимся от отсутствия ключевых структур
# Убедимся, что есть поля, которые ждёт остальной код
game_data.setdefault("teams", [])
game_data.setdefault("team1", {})
game_data.setdefault("team2", {})
game_data.setdefault("game", {})
# plays - список событий
game_data["plays"] = (play_by_play_data or {}).get("result", []) or []
# live_status - текущее состояние периода/секунды
if live_status_data and isinstance(live_status_data.get("result"), dict):
game_data["live_status"] = live_status_data["result"]
else:
# дадим заготовку, чтобы Play_By_Play не падал
game_data["live_status"] = {
"period": 1,
"second": 0,
"gameStatus": game_data.get("game", {}).get("gameStatus", ""),
}
# box-score -> статы игроков и команд
if (
box_score_data
and isinstance(box_score_data.get("result"), dict)
and "teams" in box_score_data["result"]
and box_score_data["result"]["teams"] is not None
):
for index_team, team in enumerate(game_data.get("teams", [])[1:]):
# box_team может отсутствовать, если индексы не совпали или сервер отдал None
box_teams_list = box_score_data["result"]["teams"]
if (
isinstance(box_teams_list, list)
and index_team < len(box_teams_list)
and box_teams_list[index_team] is not None
):
box_team = box_teams_list[index_team]
else:
box_team = {}
# переносим статы игроков
for player in team.get("starts", []):
stat = None
if isinstance(box_team.get("starts"), list):
stat = next(
(
s
for s in box_team["starts"]
if s.get("startNum") == player.get("startNum")
),
None,
)
if stat:
player["stats"] = stat
# total по команде
team["total"] = (
box_team.get("total", {}) if isinstance(box_team, dict) else {}
)
# периоды и общий счёт
if isinstance(box_score_data["result"], dict):
game_data["scoreByPeriods"] = (
box_score_data["result"].get("scoreByPeriods") or []
)
game_data["fullScore"] = box_score_data["result"].get("fullScore") or {}
else:
# если box-score нет ещё:
game_data.setdefault("scoreByPeriods", [])
game_data.setdefault("fullScore", {})
# а ещё надо, чтобы у каждой команды было хотя бы .total = {}
for team in game_data.get("teams", []):
team.setdefault("total", {})
for starter in team.get("starts", []):
starter.setdefault("stats", {})
merged: Dict[str, Any] = {
"meta": {
"generatedAt": _now_iso(),
"sourceHints": {
"boxScoreHas": "yes" if box_score_data else "no",
"pbpLen": str(len(game_data["plays"])),
},
},
"result": game_data,
}
return merged
def render_loop(stop_event: threading.Event, out_name: str = "game") -> None:
logger.info("[RENDER_THREAD] start render loop")
with ThreadPoolExecutor(max_workers=6) as pool:
while not stop_event.is_set():
try:
try:
state = build_render_state()
except Exception as build_err:
logger.debug(f"[RENDER_THREAD] build_render_state not ready: {build_err}")
time.sleep(0.2)
continue
tasks = {
"team_stats": pool.submit(Team_Both_Stat, state),
"team1": pool.submit(Json_Team_Generation, state, who="team1"),
"team2": pool.submit(Json_Team_Generation, state, who="team2"),
"scores": pool.submit(Scores_Quarter, state),
"referee": pool.submit(Referee, state),
"pbp": pool.submit(Play_By_Play, state),
}
# аккуратно собрать исключения, но не умереть целиком
for name, fut in tasks.items():
try:
fut.result()
except Exception as e:
logger.debug(f"[RENDER_THREAD] skip {name}: {e}")
# остальное можно сделать синхронно, это быстро
try:
live_status_to_write = []
rs = state.get("result", {})
if isinstance(rs, dict) and "live_status" in rs:
live_status_to_write = [rs["live_status"]]
atomic_write_json("live_status", live_status_to_write)
except Exception as e:
logger.debug(f"[RENDER_THREAD] skip live_status write: {e}")
try:
atomic_write_json(out_name, state.get("result", {}))
except Exception as e:
logger.debug(f"[RENDER_THREAD] skip {out_name}.json write: {e}")
except Exception as ex:
logger.exception(f"[RENDER_THREAD] unexpected error: {ex}")
time.sleep(1)
logger.info("[RENDER_THREAD] stop render loop")
def run_live_loop(
league: str,
season: str,
game_id: int,
lang: str,
game_meta: dict,
stop_event: threading.Event,
):
logger.info(
f"[LIVE_THREAD] start live loop for game_id={game_id} (league={league}, season={season})"
)
session = create_session()
# поток рендера
render_thread = threading.Thread(
target=render_loop,
args=(stop_event,),
daemon=False,
)
render_thread.start()
logger.info("[LIVE_THREAD] render thread spawned")
try:
poll_game_live(
session=session,
league=league,
season=season,
game_id=game_id,
lang=lang,
game_meta=game_meta,
stop_event=stop_event,
)
except Exception as e:
logger.exception(f"[LIVE_THREAD] crash in live loop for game_id={game_id}: {e}")
finally:
stop_event.set()
logger.info(f"[LIVE_THREAD] stopping worker threads for game_id={game_id}")
render_thread.join()
logger.info(f"[LIVE_THREAD] stop live loop for game_id={game_id}")
def Referee(merged: dict, *, out_dir: str = "static") -> None:
"""
Поток, создающий JSON-файл с информацией о судьях матча.
"""
logger.info("START making json for referee")
desired_order = [
"Crew chief",
"Referee 1",
"Referee 2",
"Commissioner",
"Ст.судья",
"Судья 1",
"Судья 2",
"Комиссар",
]
try:
# Найти судей (teamNumber == 0)
team_ref = next(
(t for t in merged["result"]["teams"] if t["teamNumber"] == 0), None
)
if not team_ref:
logger.warning("Не найдена судейская бригада в данных.")
referees_raw = team_ref.get("starts", [])
referees = []
for r in referees_raw:
flag_code = r.get("countryId", "").lower() if r.get("countryName") else ""
referees.append(
{
"displayNumber": r.get("displayNumber", ""),
"positionName": r.get("positionName", ""),
"lastNameGFX": f"{r.get('firstName', '')} {r.get('lastName', '')}".strip(),
"secondName": r.get("secondName", ""),
"birthday": r.get("birthday", ""),
"age": r.get("age", 0),
"flag": f"https://flagicons.lipis.dev/flags/4x3/{flag_code}.svg",
}
)
# Сортировка по позиции
referees = sorted(
referees,
key=lambda x: (
desired_order.index(x["positionName"])
if x["positionName"] in desired_order
else len(desired_order)
),
)
out_path = "referee"
atomic_write_json(out_path, referees)
logging.info("Сохранил payload: {out_path}")
except Exception as e:
logger.error(f"Ошибка в Referee потоке: {e}", exc_info=True)
def Play_By_Play(data: dict) -> None:
"""
Поток, обновляющий JSON-файл с последовательностью бросков в матче.
"""
logger.info("START making json for play-by-play")
try:
game_data = data["result"] if "result" in data else data
if not game_data:
logger.debug("game_online_data отсутствует")
return
teams = game_data["teams"]
team1_data = next((i for i in teams if i.get("teamNumber") == 1), None)
team2_data = next((i for i in teams if i.get("teamNumber") == 2), None)
if not team1_data or not team2_data:
logger.warning("Не удалось получить команды из game_online_data")
return
team1_name = game_data["team1"]["name"]
team2_name = game_data["team2"]["name"]
team1_startnum = [
p["startNum"]
for p in team1_data.get("starts", [])
if p.get("startRole") == "Player"
]
team2_startnum = [
p["startNum"]
for p in team2_data.get("starts", [])
if p.get("startRole") == "Player"
]
plays = game_data.get("plays", [])
if not plays:
logger.debug("нет данных в play-by-play")
return
# Получение текущего времени игры
json_live_status = (
data["result"]["live_status"]
if "result" in data and "live_status" in data["result"]
else None
)
last_event = plays[-1]
if json_live_status is None:
period = last_event.get("period", 1)
second = 0
else:
period = (json_live_status or {}).get("result", {}).get("period", 1)
second = (json_live_status or {}).get("result", {}).get("second", 0)
# Создание DataFrame из событий
df = pd.DataFrame(plays[::-1])
df_goals = df[df["play"].isin([1, 2, 3])].copy()
if df_goals.empty:
logger.debug("нет данных о голах в play-by-play")
return
# Расчёты по очкам и времени
df_goals["score1"] = (
df_goals["startNum"].isin(team1_startnum) * df_goals["play"]
)
df_goals["score2"] = (
df_goals["startNum"].isin(team2_startnum) * df_goals["play"]
)
df_goals["score_sum1"] = df_goals["score1"].fillna(0).cumsum()
df_goals["score_sum2"] = df_goals["score2"].fillna(0).cumsum()
df_goals["new_sec"] = (
pd.to_numeric(df_goals["sec"], errors="coerce").fillna(0).astype(int) // 10
)
df_goals["time_now"] = (600 if period < 5 else 300) - second
df_goals["quar"] = period - df_goals["period"]
df_goals["diff_time"] = np.where(
df_goals["quar"] == 0,
df_goals["time_now"] - df_goals["new_sec"],
(600 * df_goals["quar"] - df_goals["new_sec"]) + df_goals["time_now"],
)
df_goals["diff_time_str"] = df_goals["diff_time"].apply(
lambda x: (f"{x // 60}:{str(x % 60).zfill(2)}" if isinstance(x, int) else x)
)
# Текстовые поля
def generate_text(row, with_time=False, is_rus=False):
s1, s2 = int(row["score_sum1"]), int(row["score_sum2"])
team = (
team1_name
if not pd.isna(row["score1"]) and row["score1"] != 0
else team2_name
)
# правильный порядок счёта в зависимости от команды
if team == team1_name:
score = f"{s1}-{s2}"
else:
score = f"{s2}-{s1}"
time_str = (
f" за {row['diff_time_str']}"
if is_rus
else f" in last {row['diff_time_str']}"
)
prefix = "рывок" if is_rus else "run"
return f"{team} {score} {prefix}{time_str if with_time else ''}"
df_goals["text_rus"] = df_goals.apply(
lambda r: generate_text(r, is_rus=True, with_time=False), axis=1
)
df_goals["text_time_rus"] = df_goals.apply(
lambda r: generate_text(r, is_rus=True, with_time=True), axis=1
)
df_goals["text"] = df_goals.apply(
lambda r: generate_text(r, is_rus=False, with_time=False), axis=1
)
df_goals["text_time"] = df_goals.apply(
lambda r: generate_text(r, is_rus=False, with_time=True), axis=1
)
df_goals["team"] = df_goals["score1"].apply(
lambda x: team1_name if not pd.isna(x) and x != 0 else team2_name
)
# Удаление лишнего
drop_cols = [
"children",
"start",
"stop",
"hl",
"sort",
"startNum",
"zone",
"x",
"y",
]
df_goals.drop(columns=drop_cols, inplace=True, errors="ignore")
# Порядок колонок
main_cols = ["text", "text_time"]
all_cols = main_cols + [col for col in df_goals.columns if col not in main_cols]
df_goals = df_goals[all_cols]
# Сохранение JSON
directory = "static"
os.makedirs(directory, exist_ok=True)
filepath = os.path.join(directory, "play_by_play.json")
df_goals.to_json(filepath, orient="records", force_ascii=False)
logger.debug("Успешно положил данные об play-by-play в файл")
except Exception as e:
logger.error(f"Ошибка в Play_By_Play: {e}", exc_info=True)
def render_once_after_game(
session: requests.Session,
league: str,
season: str,
game_id: int,
lang: str,
out_name: str = "game",
) -> None:
"""
Одноразовая генерация всех выходных json-файлов (team_stats.json,
team1.json, team2.json, scores.json, live_status.json, game.json и т.д.)
без запуска вечного render_loop.
Что делает:
- один раз тянет /game из API (по game_id)
- собирает полный state (build_render_state)
- считает статистику команд, игроков, судей и счёт по четвертям
- сохраняет все соответствующие JSON в static/
Используется, когда матч уже завершён (finished/resultconfirmed)
или нет лайва.
"""
try:
logger.info(f"[RENDER_ONCE] Fetching final game snapshot for game_id={game_id}")
# один запрос к API (ручка "game")
state = fetch_api_data(
session,
"game",
host=HOST,
game_id=game_id,
lang=lang,
)
Team_Both_Stat(state)
Json_Team_Generation(state, who="team1")
Json_Team_Generation(state, who="team2")
Scores_Quarter(state)
Referee(state)
Play_By_Play(state)
atomic_write_json(out_name, state["result"])
logger.info("[RENDER_ONCE] финальные json сохранены успешно")
except Exception as ex:
logger.exception(f"[RENDER_ONCE] error while building final state: {ex}")
# ============================================================================
# 7. Постобработка статистики для вывода
# ============================================================================
def format_time(seconds: float | int) -> str:
"""
Удобный формат времени для игроков:
71 -> "1:11"
0 -> "0:00"
Любые кривые значения -> "0:00".
"""
try:
total_seconds = int(float(seconds))
minutes = total_seconds // 60
sec = total_seconds % 60
return f"{minutes}:{sec:02}"
except (ValueError, TypeError):
return "0:00"
def Json_Team_Generation(
merged: dict,
*,
who: Optional[str] = None,
) -> None:
"""
Формирует и записывает несколько JSON-файлов по составу и игрокам команды:
- <who>.json (полный список игроков с метриками)
- topTeam1.json / topTeam2.json (топ-игроки)
- started_team1.json / started_team2.json (игроки на паркете)
Вход:
merged: словарь из build_render_state()
who: "team1" или "team2"
"""
if who == "team1":
payload = next(
(i for i in merged["result"]["teams"] if i["teamNumber"] == 1), None
)
elif who == "team2":
payload = next(
(i for i in merged["result"]["teams"] if i["teamNumber"] == 2), None
)
else:
return
if not payload:
return
role_list = [
("Center", "C"),
("Guard", "G"),
("Forward", "F"),
("Power Forward", "PF"),
("Small Forward", "SF"),
("Shooting Guard", "SG"),
("Point Guard", "PG"),
("Forward-Center", "FC"),
]
starts = payload.get("starts", [])
team_rows = []
for item in starts:
stats = item.get("stats") or {}
row = {
"id": item.get("personId") or "",
"num": item.get("displayNumber"),
"startRole": item.get("startRole"),
"role": item.get("positionName"),
"roleShort": (
[
r[1]
for r in role_list
if r[0].lower() == (item.get("positionName") or "").lower()
][0]
if any(
r[0].lower() == (item.get("positionName") or "").lower()
for r in role_list
)
else ""
),
"NameGFX": (
f"{(item.get('firstName') or '').strip()} {(item.get('lastName') or '').strip()}".strip()
if item.get("firstName") is not None
and item.get("lastName") is not None
else "Команда"
),
"captain": item.get("isCapitan", False),
"age": item.get("age") or 0,
"height": f"{item.get('height')} cm" if item.get("height") else 0,
"weight": f"{item.get('weight')} kg" if item.get("weight") else 0,
"isStart": stats.get("isStart", False),
"isOn": "🏀" if stats.get("isOnCourt") is True else "",
"flag": (
"https://flagicons.lipis.dev/flags/4x3/"
+ (
"ru"
if item.get("countryId") is None
and item.get("countryName") == "Russia"
else (
""
if item.get("countryId") is None
else (
(item.get("countryId") or "").lower()
if item.get("countryName") is not None
else ""
)
)
)
+ ".svg"
),
"pts": stats.get("points", 0),
"pt-2": f"{stats.get('goal2',0)}/{stats.get('shot2',0)}" if stats else 0,
"pt-3": f"{stats.get('goal3',0)}/{stats.get('shot3',0)}" if stats else 0,
"pt-1": f"{stats.get('goal1',0)}/{stats.get('shot1',0)}" if stats else 0,
"fg": (
f"{stats.get('goal2',0)+stats.get('goal3',0)}/"
f"{stats.get('shot2',0)+stats.get('shot3',0)}"
if stats
else 0
),
"ast": stats.get("assist", 0),
"stl": stats.get("steal", 0),
"blk": stats.get("block", 0),
"blkVic": stats.get("blocked", 0),
"dreb": stats.get("defReb", 0),
"oreb": stats.get("offReb", 0),
"reb": stats.get("defReb", 0) + stats.get("offReb", 0),
"to": stats.get("turnover", 0),
"foul": stats.get("foul", 0),
"foulT": stats.get("foulT", 0),
"foulD": stats.get("foulD", 0),
"foulC": stats.get("foulC", 0),
"foulB": stats.get("foulB", 0),
"fouled": stats.get("foulsOn", 0),
"plusMinus": stats.get("plusMinus", 0),
"dunk": stats.get("dunk", 0),
"kpi": (
stats.get("points", 0)
+ stats.get("defReb", 0)
+ stats.get("offReb", 0)
+ stats.get("assist", 0)
+ stats.get("steal", 0)
+ stats.get("block", 0)
+ stats.get("foulsOn", 0)
+ (stats.get("goal1", 0) - stats.get("shot1", 0))
+ (stats.get("goal2", 0) - stats.get("shot2", 0))
+ (stats.get("goal3", 0) - stats.get("shot3", 0))
- stats.get("turnover", 0)
- stats.get("foul", 0)
),
"time": format_time(stats.get("second", 0)),
"pts1q": 0,
"pts2q": 0,
"pts3q": 0,
"pts4q": 0,
"pts1h": 0,
"pts2h": 0,
"Name1GFX": (item.get("firstName") or "").strip(),
"Name2GFX": (item.get("lastName") or "").strip(),
"photoGFX": (
os.path.join(
"D:\\Photos",
merged["result"]["league"]["abcName"],
merged["result"][who]["name"],
f"{item.get('displayNumber')}.png",
)
if item.get("startRole") == "Player"
else ""
),
"isOnCourt": stats.get("isOnCourt", False),
}
team_rows.append(row)
# добиваем до 12 строк, чтобы UI был ровный
count_player = sum(1 for x in team_rows if x["startRole"] == "Player")
if count_player < 12 and team_rows:
filler_count = (4 if count_player <= 4 else 12) - count_player
template_keys = list(team_rows[0].keys())
for _ in range(filler_count):
empty_row = {}
for key in template_keys:
if key in ["captain", "isStart", "isOnCourt"]:
empty_row[key] = False
elif key in [
"id",
"pts",
"weight",
"height",
"age",
"ast",
"stl",
"blk",
"blkVic",
"dreb",
"oreb",
"reb",
"to",
"foul",
"foulT",
"foulD",
"foulC",
"foulB",
"fouled",
"plusMinus",
"dunk",
"kpi",
]:
empty_row[key] = 0
else:
empty_row[key] = ""
team_rows.append(empty_row)
# сортируем игроков по типу роли: сначала "Player", потом "", потом "Coach" и т.д.
role_priority = {
"Player": 0,
"": 1,
"Coach": 2,
"Team": 3,
None: 4,
"Other": 5,
}
sorted_team = sorted(
team_rows,
key=lambda x: role_priority.get(x.get("startRole", 99), 99),
)
# пишем полный ростер команды
atomic_write_json(who, sorted_team)
logger.info(f"Сохранил payload: {who}.json")
# топ-игроки по очкам/подборам/ассистам и т.д.
top_sorted_team = sorted(
(p for p in sorted_team if p.get("startRole") in ["Player", ""]),
key=lambda x: (
x.get("pts", 0),
x.get("dreb", 0) + x.get("oreb", 0),
x.get("ast", 0),
x.get("stl", 0),
x.get("blk", 0),
x.get("time", "0:00"),
),
reverse=True,
)
# пустые строки не должны ломать UI процентами фолов/очков
for player in top_sorted_team:
if player.get("num", "") == "":
player["pts"] = ""
player["foul"] = ""
top_name = f"top{who.replace('t', 'T')}"
atomic_write_json(top_name, top_sorted_team)
logger.info(f"Сохранил payload: {top_name}.json")
# кто прямо сейчас на площадке
started_team = sorted(
(
p
for p in sorted_team
if p.get("startRole") == "Player" and p.get("isOnCourt") is True
),
key=lambda x: int(x.get("num") or 0),
)
started_name = f"started_{who}"
atomic_write_json(started_name, started_team)
logger.info(f"Сохранил payload: {started_name}.json")
def time_outs_func(data_pbp: List[dict]) -> Tuple[str, int, str, int]:
"""
Считает таймауты для обеих команд и формирует читабельные строки вида:
"2 Time-outs left in 2nd half"
Возвращает:
(строка_для_команды1, остаток1, строка_для_команды2, остаток2)
"""
timeout1 = []
timeout2 = []
for event in data_pbp:
if event.get("play") == 23: # 23 == таймаут
if event.get("startNum") == 1:
timeout1.append(event)
elif event.get("startNum") == 2:
timeout2.append(event)
def timeout_status(timeout_list: List[dict], last_event: dict) -> Tuple[str, int]:
period = last_event.get("period", 0)
sec = last_event.get("sec", 0)
if period < 3:
timeout_max = 2
count = sum(1 for t in timeout_list if t.get("period", 0) <= period)
quarter = "1st half"
elif period < 5:
count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period)
quarter = "2nd half"
if period == 4 and sec >= 4800 and count in (0, 1):
timeout_max = 2
else:
timeout_max = 3
else:
timeout_max = 1
count = sum(1 for t in timeout_list if t.get("period", 0) == period)
quarter = f"OverTime {period - 4}"
left = max(0, timeout_max - count)
word = "Time-outs" if left != 1 else "Time-out"
text = f"{left if left != 0 else 'No'} {word} left in {quarter}"
return text, left
if not data_pbp:
return "", 0, "", 0
last_event = data_pbp[-1]
t1_str, t1_left = timeout_status(timeout1, last_event)
t2_str, t2_left = timeout_status(timeout2, last_event)
return t1_str, t1_left, t2_str, t2_left
def add_data_for_teams(new_data: List[dict]) -> Tuple[float, List[Any], float]:
"""
Считает командные агрегаты:
- средний возраст
- очки со старта vs со скамейки, + их проценты
- средний рост
Возвращает кортеж:
(avg_age, [start_pts, start%, bench_pts, bench%], avg_height_cm)
"""
players = [item for item in new_data if item.get("startRole") == "Player"]
points_start = 0
points_bench = 0
total_age = 0
total_height = 0
player_count = len(players)
for player in players:
stats = player.get("stats") or {}
if stats:
if stats.get("isStart") is True:
points_start += stats.get("points", 0)
elif stats.get("isStart") is False:
points_bench += stats.get("points", 0)
total_age += player.get("age") or 0
total_height += player.get("height") or 0
total_points = points_start + points_bench
points_start_pro = (
f"{round(points_start * 100 / total_points)}%" if total_points else "0%"
)
points_bench_pro = (
f"{round(points_bench * 100 / total_points)}%" if total_points else "0%"
)
avg_age = round(total_age / player_count, 1) if player_count else 0
avg_height = round(total_height / player_count, 1) if player_count else 0
points = [points_start, points_start_pro, points_bench, points_bench_pro]
return avg_age, points, avg_height
def add_new_team_stat(
data: dict,
avg_age: float,
points: List[Any],
avg_height: float,
timeout_str: str,
timeout_left: int,
) -> dict:
"""
Берёт словарь total по команде (очки, подборы, броски и т.д.),
добавляет:
- проценты попаданий
- средний возраст / рост
- очки старт / бенч
- информацию по таймаутам
и всё приводит к строкам (для UI, чтобы не ловить типы).
Возвращает обновлённый словарь.
"""
def safe_int(v):
try:
return int(v)
except (ValueError, TypeError):
return 0
def format_percent(goal, shot):
goal, shot = safe_int(goal), safe_int(shot)
return f"{round(goal * 100 / shot)}%" if shot else "0%"
goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1"))
goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2"))
goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3"))
def_reb = safe_int(data.get("defReb"))
off_reb = safe_int(data.get("offReb"))
data.update(
{
"pt-1": f"{goal1}/{shot1}",
"pt-2": f"{goal2}/{shot2}",
"pt-3": f"{goal3}/{shot3}",
"fg": f"{goal2 + goal3}/{shot2 + shot3}",
"pt-1_pro": format_percent(goal1, shot1),
"pt-2_pro": format_percent(goal2, shot2),
"pt-3_pro": format_percent(goal3, shot3),
"fg_pro": format_percent(goal2 + goal3, shot2 + shot3),
"Reb": str(def_reb + off_reb),
"avgAge": str(avg_age),
"ptsStart": str(points[0]),
"ptsStart_pro": str(points[1]),
"ptsBench": str(points[2]),
"ptsBench_pro": str(points[3]),
"avgHeight": f"{avg_height} cm",
"timeout_left": str(timeout_left),
"timeout_str": str(timeout_str),
}
)
for k in data:
data[k] = str(data[k])
return data
stat_name_list = [
("points", "Очки", "points"),
("pt-1", "Штрафные", "free throws"),
("pt-1_pro", "штрафные, процент", "free throws pro"),
("pt-2", "2-очковые", "2-points"),
("pt-2_pro", "2-очковые, процент", "2-points pro"),
("pt-3", "3-очковые", "3-points"),
("pt-3_pro", "3-очковые, процент", "3-points pro"),
("fg", "очки с игры", "field goals"),
("fg_pro", "Очки с игры, процент", "field goals pro"),
("assist", "Передачи", "assists"),
("pass", "", ""),
("defReb", "подборы в защите", ""),
("offReb", "подборы в нападении", ""),
("Reb", "Подборы", "rebounds"),
("steal", "Перехваты", "steals"),
("block", "Блокшоты", "blocks"),
("blocked", "", ""),
("turnover", "Потери", "turnovers"),
("foul", "Фолы", "fouls"),
("foulsOn", "", ""),
("foulT", "", ""),
("foulD", "", ""),
("foulC", "", ""),
("foulB", "", ""),
("second", "секунды", "seconds"),
("dunk", "данки", "dunks"),
("fastBreak", "", "fast breaks"),
("plusMinus", "+/-", "+/-"),
("avgAge", "", "avg Age"),
("ptsBench", "", "Bench PTS"),
("ptsBench_pro", "", "Bench PTS, %"),
("ptsStart", "", "Start PTS"),
("ptsStart_pro", "", "Start PTS, %"),
("avgHeight", "", "avg height"),
("timeout_left", "", "timeout left"),
("timeout_str", "", "timeout str"),
]
def Team_Both_Stat(merged: dict) -> None:
"""
Формирует сводку по двум командам и пишет её в static/team_stats.json.
"""
logger.info("START making json for team statistics")
try:
teams = merged["result"]["teams"]
plays = merged["result"].get("plays", [])
team_1 = next((t for t in teams if t["teamNumber"] == 1), None)
team_2 = next((t for t in teams if t["teamNumber"] == 2), None)
if not team_1 or not team_2:
logger.warning("Не найдены обе команды в данных")
return
timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays)
avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1.get("starts", []))
avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2.get("starts", []))
if not team_1.get("total") or not team_2.get("total"):
logger.debug("Нет total у команд — пропускаю перезапись team_stats.json")
total_1 = add_new_team_stat(
team_1["total"],
avg_age_1,
points_1,
avg_height_1,
timeout_str1,
timeout_left1,
)
total_2 = add_new_team_stat(
team_2["total"],
avg_age_2,
points_2,
avg_height_2,
timeout_str2,
timeout_left2,
)
result_json = []
for key in total_1:
val1 = total_1[key]
val2 = total_2[key]
stat_rus = ""
stat_eng = ""
for metric_name, rus, eng in stat_name_list:
if metric_name == key:
stat_rus, stat_eng = rus, eng
break
result_json.append(
{
"name": key,
"nameGFX_rus": stat_rus,
"nameGFX_eng": stat_eng,
"val1": val1,
"val2": val2,
}
)
atomic_write_json("team_stats", result_json)
logger.info("Сохранил payload: team_stats.json")
except Exception as e:
logger.error(f"Ошибка при обработке командной статистики: {e}", exc_info=True)
def Pregame_data_json(data: dict) -> None:
teams = []
for data_team in (data["teamStats1"], data["teamStats2"]):
temp_team = {
"team": data_team["team"]["name"],
"games": data_team["games"],
"points": round(
(data_team["totalStats"]["points"] / data_team["games"]), 1
),
"points_2": round(
(
data_team["totalStats"]["goal2"]
* 100
/ data_team["totalStats"]["shot2"]
),
1,
),
"points_3": round(
(
data_team["totalStats"]["goal3"]
* 100
/ data_team["totalStats"]["shot3"]
),
1,
),
"points_23": round(
(
data_team["totalStats"]["goal23"]
* 100
/ data_team["totalStats"]["shot23"]
),
1,
),
"points_1": round(
(
data_team["totalStats"]["goal1"]
* 100
/ data_team["totalStats"]["shot1"]
),
1,
),
"assists": round(
(data_team["totalStats"]["assist"] / data_team["games"]), 1
),
"rebounds": round(
(
(
data_team["totalStats"]["defRebound"]
+ data_team["totalStats"]["offRebound"]
)
/ data_team["games"]
),
1,
),
"steals": round((data_team["totalStats"]["steal"] / data_team["games"]), 1),
"turnovers": round(
(data_team["totalStats"]["turnover"] / data_team["games"]), 1
),
"blocks": round(
(data_team["totalStats"]["blockShot"] / data_team["games"]), 1
),
"fouls": round((data_team["totalStats"]["foul"] / data_team["games"]), 1),
}
teams.append(temp_team)
atomic_write_json("team_comparison", teams)
logger.info("Сохранил payload: team_comparison.json")
def Pregame_data(pregame_raw: dict, game_stub: dict) -> None:
"""
Обработка предматчевых данных (Pregame).
Вызывается один раз ДО начала матча, если матч сегодня.
Вход:
pregame_raw -> ответ от /pregame (dict или None)
game_stub -> today_game["game"] из календаря (минимальная информация о матче)
"""
try:
out = {
"game": game_stub or {},
"pregame": (
pregame_raw.get("result")
if isinstance(pregame_raw, dict)
else pregame_raw
),
"generatedAt": _now_iso(),
}
Pregame_data_json(out["pregame"])
# сохраняем файл
# atomic_write_json(out["pregame"], "pregame")
# logger.info("Сохранил payload: pregame.json")
except Exception as e:
logger.error(f"Ошибка в Pregame_data: {e}", exc_info=True)
def Scores_Quarter(merged: dict) -> None:
"""
Пишет счёт по четвертям и овертаймам в static/scores.json.
"""
logger.info("START making json for scores quarter")
quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"]
score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters]
try:
full_score_str = merged.get("result", {}).get("game", {}).get("fullScore", "")
if full_score_str:
full_score_list = full_score_str.split(",")
for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]):
parts = score_str.split(":")
if len(parts) == 2:
score_by_quarter[i]["score1"] = parts[0]
score_by_quarter[i]["score2"] = parts[1]
logger.info("Счёт по четвертям получен из fullScore.")
elif "scoreByPeriods" in merged.get("result", {}):
periods = merged["result"]["scoreByPeriods"]
for i, score in enumerate(periods[: len(score_by_quarter)]):
score_by_quarter[i]["score1"] = str(score.get("score1", ""))
score_by_quarter[i]["score2"] = str(score.get("score2", ""))
logger.info("Счёт по четвертям получен из scoreByPeriods.")
else:
logger.debug("Нет данных по счёту, сохраняем пустые значения.")
atomic_write_json("scores", score_by_quarter)
logger.info("Сохранил payload: scores.json")
except Exception as e:
logger.error(f"Ошибка в Scores_Quarter: {e}", exc_info=True)
def Standing_func(
session: requests.Session,
league: str,
season: str,
lang: str,
stop_event: threading.Event,
out_dir: str = "static",
) -> None:
"""
Фоновый поток с турнирной таблицей (standings).
"""
logger.info("[STANDINGS_THREAD] start standings loop")
last_call_ts = 0
json_seasons = fetch_api_data(
session, "seasons", host=HOST, league=league, lang=lang
)
season = json_seasons[0]["season"]
interval = get_interval_by_name("standings")
while not stop_event.is_set():
now = time.time()
if now - last_call_ts < interval:
time.sleep(1)
continue
try:
data_standings = fetch_api_data(
session,
"standings",
host=HOST,
league=league,
season=season,
lang=lang,
)
if not data_standings:
logger.debug("[STANDINGS_THREAD] standings empty")
time.sleep(1)
continue
if isinstance(data_standings, list):
items = data_standings
else:
items = data_standings.get("items") or []
if not items:
logger.debug("[STANDINGS_THREAD] no items in standings")
last_call_ts = now
continue
for item in items:
comp = item.get("comp", {})
comp_name = (
(comp.get("name") or "unknown_comp")
.replace(" ", "_")
.replace("|", "")
)
if item.get("standings"):
standings_rows = item["standings"]
df = pd.json_normalize(standings_rows)
if "scores" in df.columns:
df = df.drop(columns=["scores"])
if (
"totalWin" in df.columns
and "totalDefeat" in df.columns
and "totalGames" in df.columns
and "totalGoalPlus" in df.columns
and "totalGoalMinus" in df.columns
):
tw = (
pd.to_numeric(df["totalWin"], errors="coerce")
.fillna(0)
.astype(int)
)
td = (
pd.to_numeric(df["totalDefeat"], errors="coerce")
.fillna(0)
.astype(int)
)
df["w_l"] = tw.astype(str) + " / " + td.astype(str)
def calc_percent(row):
win = row.get("totalWin", 0)
games = row.get("totalGames", 0)
# гарантируем числа
try:
win = int(win)
except (TypeError, ValueError):
win = 0
try:
games = int(games)
except (TypeError, ValueError):
games = 0
if games == 0 or row["w_l"] == "0 / 0":
return 0
return round(win * 100 / games + 0.000005)
df["procent"] = df.apply(calc_percent, axis=1)
tg_plus = (
pd.to_numeric(df["totalGoalPlus"], errors="coerce")
.fillna(0)
.astype(int)
)
tg_minus = (
pd.to_numeric(df["totalGoalMinus"], errors="coerce")
.fillna(0)
.astype(int)
)
df["plus_minus"] = tg_plus - tg_minus
standings_payload = df.to_dict(orient="records")
filename = f"standings_{league}_{comp_name}"
atomic_write_json(filename, standings_payload)
logger.info(f"[STANDINGS_THREAD] сохранил {filename}.json")
elif item.get("playoffPairs"):
playoff_rows = item["playoffPairs"]
df = pd.json_normalize(playoff_rows)
standings_payload = df.to_dict(orient="records")
filename = f"standings_{league}_{comp_name}"
atomic_write_json(filename, standings_payload)
logger.info(
f"[STANDINGS_THREAD] saved {filename}.json (playoffPairs, {len(standings_payload)} rows)"
)
else:
continue
last_call_ts = now
except Exception as e:
logger.warning(f"[STANDINGS_THREAD] ошибка в турнирном положении: {e}")
time.sleep(1)
logger.info("[STANDINGS_THREAD] stop standings loop")
# ============================================================================
# 8. Суточный цикл: находим игру, следим в лайве, потом уходим спать
# ============================================================================
def get_data_API(
session: requests.Session,
league: str,
team: str,
lang: str,
stop_event: threading.Event,
) -> str:
"""
Один "дневной прогон" логики.
Возвращает day_state:
- "upcoming" -> матч сегодня (домашний), но ещё не начался
- "live_done" -> был live_loop и завершился
- "finished_now" -> матч уже завершён, отрендерили финал
- "no_game" -> сегодня игры нет вообще
- "error" -> что-то упало по дороге
"""
# 1. сезоны
json_seasons = fetch_api_data(
session, "seasons", host=HOST, league=league, lang=lang
)
if not json_seasons:
logger.error("Не удалось получить список сезонов")
return "error"
season = json_seasons[0]["season"]
# 2. standings и calendar
fetch_api_data(
session,
"standings",
host=HOST,
league=league,
season=season,
lang=lang,
)
json_calendar = fetch_api_data(
session,
"calendar",
host=HOST,
league=league,
season=season,
lang=lang,
)
if not json_calendar:
logger.error("Не удалось получить список матчей")
return "error"
# 3. определяем игру
today_game, last_played = get_game_id(json_calendar, team)
print(today_game, last_played)
# Ветка А: есть завершённая игра, но сегодня нет матча
if last_played and not today_game:
game_id = last_played["game"]["id"]
logger.info(f"Последний завершённый матч id={game_id}")
render_once_after_game(session, league, season, game_id, lang)
return "finished_now"
# Ветка Б: матч сегодня есть (для домашней команды)
if today_game:
game_id = today_game["game"]["id"]
logger.info(f"Онлайн матч id={game_id}")
# Обновляем api_* файлы сразу
fetch_api_data(session, "game", host=HOST, game_id=game_id, lang=lang)
fetch_api_data(session, "box-score", host=HOST, game_id=game_id)
fetch_api_data(session, "play-by-play", host=HOST, game_id=game_id)
live_status_raw = fetch_api_data(
session, "live-status", host=HOST, game_id=game_id
)
# Определяем состояние матча
status_calendar = today_game["game"].get("gameStatus", "")
status_live = ""
# print(live_status_raw)
if isinstance(live_status_raw, dict):
# бывают ответы вида {"status":"404","message":"Not found","result":None}
ls_result = live_status_raw.get("result")
if isinstance(ls_result, dict):
status_live = ls_result.get("gameStatus", "")
# если result == None -> просто считаем, что live-статуса нет (ещё не начался)
effective_status = status_live or status_calendar
phase = classify_game_state_from_status(effective_status)
if phase == "live":
# матч идёт → запускаем live_loop блокирующе
t = threading.Thread(
target=run_live_loop,
args=(league, season, game_id, lang, today_game["game"], stop_event),
daemon=False,
)
t.start()
logger.info("[get_data_API] live thread spawned, waiting for it to finish")
try:
t.join()
except KeyboardInterrupt:
logger.info(
"[get_data_API] KeyboardInterrupt while waiting live thread -> stop_event"
)
stop_event.set()
t.join()
logger.info("[get_data_API] live thread finished")
return "live_done"
if phase == "upcoming":
logger.info(
f"Матч {game_id} сегодня, но ещё не начался (status={effective_status}). Подготовка pregame."
)
# дергаем pregame только один раз за матч
if not _pregame_done_for_game.get(game_id):
try:
pregame_raw = fetch_api_data(
session,
"pregame",
host=HOST,
league=league,
season=season,
game_id=game_id,
lang=lang,
)
Pregame_data(
pregame_raw=pregame_raw,
game_stub=today_game["game"],
)
_pregame_done_for_game[game_id] = True
logger.info(
f"[get_data_API] pregame данные собраны для game_id={game_id}"
)
except Exception as e:
logger.exception(
f"[get_data_API] ошибка при подготовке pregame для {game_id}: {e}"
)
# матч сегодня, ждём старта
return "upcoming"
if phase == "finished":
# матч уже закончен → однократно пререндерили всё и можем спать
render_once_after_game(session, league, season, game_id, lang)
return "finished_now"
# на всякий случай (если API дал что-то новое)
logger.info(
f"[get_data_API] Неожиданная фаза '{phase}', status={effective_status}. Считаем как 'upcoming'."
)
return "upcoming"
# Ветка В: нет матча сегодня, нет последнего завершённого
logger.info("Для этой команды игр сегодня нет и нет завершённой последней игры.")
return "no_game"
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--league", default="vtb")
parser.add_argument("--team", required=True)
parser.add_argument("--lang", default="en")
args = parser.parse_args()
stop_event = threading.Event()
standings_session = create_session()
standings_thread = threading.Thread(
target=Standing_func,
args=(standings_session, args.league, None, args.lang, stop_event),
daemon=False,
)
standings_thread.start()
logger.info("[MAIN] standings thread started (global)")
while True:
session = create_session()
try:
day_state = get_data_API(
session, args.league, args.team, args.lang, stop_event
)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt -> останавливаем всё")
stop_event.set()
break
except Exception as e:
logger.exception(f"main loop crash: {e}")
day_state = "error"
now = datetime.now(APP_TZ)
if day_state == "upcoming":
# матч сегодня, но ещё не начался → НЕ спим до завтра.
time.sleep(120) # 2 минуты опроса статуса до старта
continue
if day_state == "error":
# что-то пошло не так → подожди минуту и попробуем ещё раз
time.sleep(60)
continue
# сюда мы попадаем если:
# - live_done (лайв отработался до конца)
# - finished_now (матч уже был, всё посчитали)
# - no_game (сегодня матчей вообще нет)
# -> можно лечь до завтра 00:05
tomorrow = (now + timedelta(days=1)).replace(
hour=0, minute=5, second=0, microsecond=0
)
sleep_seconds = (tomorrow - now).total_seconds()
if sleep_seconds < 0:
tomorrow = (now + timedelta(days=2)).replace(
hour=0, minute=5, second=0, microsecond=0
)
sleep_seconds = (tomorrow - now).total_seconds()
hours_left = int(sleep_seconds // 3600)
mins_left = int((sleep_seconds % 3600) // 60)
logger.info(
f"Работа за день завершена. Засыпаем до {tomorrow.strftime('%d.%m %H:%M')} "
f"(~{hours_left}.{mins_left} ч)."
)
try:
time.sleep(sleep_seconds)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt во время сна -> выходим")
stop_event.set()
break
stop_event.set()
standings_thread.join()
logger.info("[MAIN] standings thread stopped, shutdown complete")
# ============================================================================
# 9. Точка входа
# ============================================================================
if __name__ == "__main__":
main()