Files
RFB/get_data_new.py

1696 lines
62 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import os
import json
import tempfile
import argparse
import platform
import logging
import logging.config
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from typing import Any, Dict, List, Tuple, Optional
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# ============================================================================
# 1. Константы / глобальные объекты
# ============================================================================
HOST = "https://ref.russiabasket.org"
# Таймзона, в которой мы считаем время матчей / расписания / сна до завтра
APP_TZ = ZoneInfo("Europe/Moscow")
MYHOST = platform.node()
TELEGRAM_BOT_TOKEN = "7639240596:AAH0YtdQoWZSC-_R_EW4wKAHHNLIA0F_ARY"
# TELEGRAM_CHAT_ID = 228977654
TELEGRAM_CHAT_ID = -4803699526
# Глобальный лок для потокобезопасной записи JSON
_write_lock = threading.Lock()
# Карта всех ручек API, с интервалами опроса в секундах.
URLS = [
{
"name": "seasons",
"url": "{host}/api/abc/comps/seasons?Tag={league}&Lang={lang}",
"interval": 86400, # раз в сутки
},
{
"name": "standings",
"url": "{host}/api/abc/comps/standings?tag={league}&season={season}&lang={lang}",
"interval": 1800, # раз в 30 минут
},
{
"name": "calendar",
"url": "{host}/api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000",
"interval": 86400, # раз в сутки
},
{
"name": "game",
"url": "{host}/api/abc/games/game?Id={game_id}&Lang={lang}",
"interval": 600, # раз в 10 минут
},
{
"name": "pregame",
"url": "{host}/api/abc/games/pregame?tag={league}&season={season}&Id={game_id}&Lang={lang}",
"interval": 86400, # раз в сутки
},
{
"name": "pregame-fullstats",
"url": "{host}/api/abc/games/pregame-fullstats?tag={league}&season={season}&id={game_id}&lang={lang}",
"interval": 600, # раз в 10 минут
},
{
"name": "live-status",
"url": "{host}/api/abc/games/live-status?id={game_id}",
"interval": 1, # каждую секунду
},
{
"name": "box-score",
"url": "{host}/api/abc/games/box-score?Id={game_id}",
"interval": 1, # каждую секунду
},
{
"name": "play-by-play",
"url": "{host}/api/abc/games/play-by-play?Id={game_id}",
"interval": 1, # каждую секунду
},
]
# ============================================================================
# 2. Логирование
# ============================================================================
if not os.path.exists("logs"):
os.makedirs("logs")
LOG_CONFIG = {
"version": 1,
"handlers": {
"telegram": {
"class": "telegram_handler.TelegramHandler",
"level": "INFO",
"token": TELEGRAM_BOT_TOKEN,
"chat_id": TELEGRAM_CHAT_ID,
"formatter": "telegram",
},
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "simple",
"filename": f"logs/GFX_{MYHOST}.log",
"encoding": "utf-8",
},
},
"loggers": {
__name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"},
},
"formatters": {
"telegram": {
"class": "telegram_handler.HtmlFormatter",
"format": f"%(levelname)s [{MYHOST.upper()}] %(message)s",
"use_emoji": "True",
},
"simple": {
"class": "logging.Formatter",
"format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s",
"datefmt": "%d.%m.%Y %H:%M:%S",
},
},
}
logging.config.dictConfig(LOG_CONFIG)
logger = logging.getLogger(__name__)
logger.handlers[2].formatter.use_emoji = True
# ============================================================================
# 3. I/O вспомогательные функции
# ============================================================================
def atomic_write_json(data: Any, name: str, out_dir: str = "static") -> None:
"""
Потокобезопасная запись JSON в static/<name>.json.
Запись делается через временный файл + os.replace, чтобы:
- читатели не получили битый файл во время перезаписи;
- не было гонок между render_loop и poll_game_live.
"""
os.makedirs(out_dir, exist_ok=True)
filename = os.path.join(out_dir, f"{name}.json")
with _write_lock:
with tempfile.NamedTemporaryFile(
"w", delete=False, dir=out_dir, encoding="utf-8"
) as tmp_file:
json.dump(data, tmp_file, ensure_ascii=False, indent=2)
tmp_file.flush()
os.fsync(tmp_file.fileno())
tmp_name = tmp_file.name
os.replace(tmp_name, filename)
def read_local_json(name: str, in_dir: str = "static") -> Optional[dict]:
"""
Безопасно читает static/<name>.json.
Возвращает dict или None.
Не кидает исключение, если файл не существует или был в моменте перезаписи.
Это важно для render_loop, который читает файлы параллельно с poll_game_live.
"""
filename = os.path.join(in_dir, f"{name}.json")
try:
with open(filename, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return None
except json.JSONDecodeError:
# файл мог быть в моменте перезаписи -> пропустим тик
return None
except Exception as ex:
logger.exception(f"read_local_json({name}) error: {ex}")
return None
def _now_iso() -> str:
"""
Возвращает текущее время в ISO-формате UTC ("2025-10-27T12:34:56Z").
Это кладётся в итоговый JSON как метаданные генерации.
"""
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# ============================================================================
# 4. Работа с HTTP / API
# ============================================================================
def create_session() -> requests.Session:
"""
Создаёт requests.Session с ретраями и дефолтными заголовками.
Эту сессию потом используем для всех запросов (в том числе в live-пуле).
"""
session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
)
session.mount("https://", HTTPAdapter(max_retries=retries))
session.headers.update(
{
"Connection": "keep-alive",
"Accept": "application/json, */*",
"Accept-Encoding": "gzip, deflate, br",
"User-Agent": "game-watcher/1.0",
}
)
return session
def build_url(name: str, **kwargs) -> str:
"""
Собрать конечный URL по имени эндпоинта из URLS.
Пример:
build_url("standings", host=..., league=..., season=..., lang=...)
"""
template = next((u["url"] for u in URLS if u["name"] == name), None)
if not template:
raise ValueError(f"Unknown URL name: {name}")
return template.format(**kwargs)
def get_json(session: requests.Session, url: str, name: str) -> Any:
"""
Выполняет GET к API, падает, если HTTP != 2xx,
складывает ответ в static/api_<name>.json (сырой ответ API),
и возвращает распарсенный json.
"""
resp = session.get(url, timeout=10)
resp.raise_for_status()
data = resp.json()
atomic_write_json(data, f"api_{name}")
return data
def get_items(data: dict) -> Optional[list]:
"""
Мелкий хелпер: берём первый список в ответе API.
Многие ручки отдают {"result":[...]} или {"seasons":[...]}.
Если находим список — возвращаем его.
Если нет — возвращаем None (значит, нужно брать весь dict).
"""
for k, v in data.items():
if isinstance(v, list):
return data[k]
return None
def fetch_api_data(session: requests.Session, name: str, name_save: str = None, **kwargs) -> Any:
"""
Универсальный обёртчик над API:
- строит URL по имени ручки,
- тянет данные через get_json(),
- ищет "главный" список (get_items),
- возвращает список или весь dict.
Параллельно пишет в static/api_<name>.json (через get_json()).
"""
url = build_url(name, **kwargs)
try:
json_data = get_json(session, url, name_save or name)
if json_data:
items = get_items(json_data)
return items if items is not None else json_data
return None
except Exception as ex:
logger.error(f"{url} | {ex}")
def poll_one_endpoint(
session: requests.Session,
endpoint_name: str,
league: str,
season: str,
game_id: int,
lang: str,
) -> Tuple[str, Any]:
"""
Вызывает конкретный эндпоинт (box-score, live-status, play-by-play и т.д.),
возвращает кортеж (имя_эндпоинта, данные_или_None).
Используется внутри poll_game_live() для параллельного опроса API.
"""
if endpoint_name == "live-status":
data = fetch_api_data(session, "live-status", host=HOST, game_id=game_id)
return endpoint_name, data
if endpoint_name == "box-score":
data = fetch_api_data(session, "box-score", host=HOST, game_id=game_id)
return endpoint_name, data
if endpoint_name == "play-by-play":
data = fetch_api_data(session, "play-by-play", host=HOST, game_id=game_id)
return endpoint_name, data
if endpoint_name == "game":
data = fetch_api_data(
session, "game", host=HOST, game_id=game_id, lang=lang
)
return endpoint_name, data
if endpoint_name == "pregame-fullstats":
data = fetch_api_data(
session,
"pregame-fullstats",
host=HOST,
league=league,
season=season,
game_id=game_id,
lang=lang,
)
return endpoint_name, data
# fallback — "ну вдруг добавим что-то ещё"
data = fetch_api_data(session, endpoint_name, host=HOST, game_id=game_id, lang=lang)
return endpoint_name, data
def get_interval_by_name(name: str) -> int:
"""
Возвращает рекомендуемый интервал опроса эндпоинта в секундах,
как задано в URLS.
"""
for u in URLS:
if u["name"] == name:
return u["interval"]
raise ValueError(f"interval not found for {name}")
# ============================================================================
# 5. Работа с расписанием / статусом матча
# ============================================================================
def parse_game_start_dt(item: dict) -> datetime:
"""
Достаёт дату/время начала матча из объекта календаря и нормализует в APP_TZ.
Источники времени в порядке приоритета:
1. game.defaultZoneDateTime (обычно уже с таймзоной лиги)
2. game.scheduledTime (ISO8601 с оффсетом)
3. game.startTime
4. (fallback) game.localDate + game.localTime (считаем как APP_TZ)
Возвращает timezone-aware datetime в APP_TZ.
"""
g = item.get("game", {}) if "game" in item else item
raw = g.get("defaultZoneDateTime") or g.get("scheduledTime") or g.get("startTime")
if raw:
try:
dt = datetime.fromisoformat(raw) # ISO-8601
return dt.astimezone(APP_TZ)
except Exception as e:
raise RuntimeError(f"Ошибка парсинга ISO времени '{raw}': {e}")
# fallback: localDate + localTime, формата "30.09.2025" + "19:00"
ld, lt = g.get("localDate"), g.get("localTime")
if ld and lt:
try:
naive = datetime.strptime(f"{ld} {lt}", "%d.%m.%Y %H:%M")
aware = naive.replace(tzinfo=APP_TZ)
return aware
except Exception as e:
raise RuntimeError(f"Ошибка парсинга localDate/localTime '{ld} {lt}': {e}")
raise RuntimeError(
"Не найдено ни одного подходящего поля времени (defaultZoneDateTime/"
"scheduledTime/startTime/localDate+localTime)."
)
def get_game_id(
team_games: List[dict],
team: str,
) -> Tuple[Optional[dict], Optional[dict]]:
"""
Находим интересующую нас игру.
Логика (важно):
- считаем, что интересующая нас команда — это team1 (домашняя),
и сравниваем по имени.
- если есть игра сегодня -> это today_game
- иначе берём последнюю уже завершённую игру -> last_played
- возвращаем (today_game, last_played)
Если и того и другого нет -> (None, None).
"""
now = datetime.now(APP_TZ)
today = now.date()
today_game = None
last_played = None
for g in team_games:
start = parse_game_start_dt(g)
status = g.get("game", {}).get("gameStatus", "").lower()
# Игра сегодня для этой команды
if (
start.date() == today
and today_game is None
and g["team1"]["name"].lower() == team.lower()
):
today_game = g
last_played = None
# Последняя завершённая игра (resultconfirmed)
elif (
start <= now
and status == "resultconfirmed"
and g["team1"]["name"].lower() == team.lower()
):
today_game = None
last_played = g
return today_game, last_played
def is_game_live(game_obj: dict) -> bool:
"""
Пытаемся понять, идёт ли матч прямо сейчас.
Правила:
- 'resultconfirmed' / 'finished' / 'result' => матч уже окончен
- 'scheduled' / 'notstarted' / 'draft' => матч ещё не начался
- всё остальное считаем лайвом (в том числе 'online', 'inprogress', и т.п.)
"""
status = (game_obj.get("gameStatus") or "").lower()
if status in ("resultconfirmed", "finished", "result"):
return False
if status in ("scheduled", "notstarted", "draft"):
return False
return True
# ============================================================================
# 6. Лайв-петля: опрос API и поток рендера
# ============================================================================
def poll_game_live(
session: requests.Session,
league: str,
season: str,
game_id: int,
lang: str,
game_meta: dict,
stop_event: threading.Event,
) -> None:
"""
Главный цикл лайва.
Каждые ~0.2 сек:
- решаем, какие эндпоинты давно не опрашивали (live-status, box-score, play-by-play, game),
- параллельно дёргаем их через ThreadPoolExecutor,
- сохраняем результаты в static/api_*.json,
- проверяем статус матча в live-status.
Цикл завершится, когда:
- матч закончен (по live-status),
- календарь говорит, что игра не live,
- или выставлен stop_event (например, оператор нажал Ctrl+C).
"""
slow_endpoints = ["game"] # "pregame-fullstats" можно вернуть по желанию
fast_endpoints = ["live-status", "box-score", "play-by-play"]
last_call = {name: 0 for name in slow_endpoints + fast_endpoints}
with ThreadPoolExecutor(max_workers=5) as executor:
while True:
# внешний стоп: операторская остановка или завершение run_live_loop
if stop_event.is_set():
logger.info(f"[POLL] stop_event set -> break live poll for game {game_id}")
break
now = time.time()
# решаем, какие ручки надо дёрнуть прямо сейчас
to_run = []
for ep in fast_endpoints + slow_endpoints:
interval = get_interval_by_name(ep)
if now - last_call[ep] >= interval:
to_run.append(ep)
futures = []
if to_run:
for ep in to_run:
futures.append(
executor.submit(
poll_one_endpoint,
session,
ep,
league,
season,
game_id,
lang,
)
)
game_finished = False
# собираем результаты параллельных вызовов
for fut in as_completed(futures):
try:
ep_name, data = fut.result()
last_call[ep_name] = now
# проверяем статус лайва
if ep_name == "live-status":
if isinstance(data, dict):
st = (
data.get("result").get("gameStatus")
or ""
).lower()
if st in ("resultconfirmed", "finished", "result"):
logger.info(
f"[POLL] Game {game_id} finished by live-status"
)
game_finished = True
except Exception as e:
logger.exception(f"[POLL] poll endpoint error: {e}")
# страховка: календарь говорит, что матч не лайв -> выходим
if not is_game_live(game_meta):
logger.info(f"[POLL] Game {game_id} no longer live by calendar meta")
break
if game_finished:
break
time.sleep(0.2)
# вторая точка выхода по stop_event после sleep
if stop_event.is_set():
logger.info(f"[POLL] stop_event set after sleep -> break live poll for game {game_id}")
break
def build_render_state() -> dict:
"""
Собирает итоговое состояние матча (merged dict) для графики/внешки.
Читает из api_*.json:
- api_game
- api_live-status
- api_box-score
- api_play-by-play
Обогащает:
- мержит box-score в структуру команд/игроков
- добавляет plays, scoreByPeriods, fullScore, live_status
- добавляет служебные метаданные (generatedAt)
Возвращает словарь:
{
"meta": {...},
"result": {...} # <-- это пойдёт в game.json / ui_state.json и т.д.
}
"""
game_data = read_local_json("api_game")
live_status_data = read_local_json("api_live-status")
box_score_data = read_local_json("api_box-score")
play_by_play_data = read_local_json("api_play-by-play")
# Минимальная защита: если ничего нет, рендер всё равно не должен падать жёстко.
if not game_data or "result" not in game_data:
raise RuntimeError("build_render_state(): api_game/result отсутствует")
game_data = game_data["result"]
# проставляем статистику игроков из box-score внутрь game_data["teams"]
if box_score_data and "result" in box_score_data:
for index_team, team in enumerate(game_data["teams"][1:]):
box_team = box_score_data["result"]["teams"][index_team]
for player in team.get("starts", []):
stat = next(
(
s
for s in box_team.get("starts", [])
if s.get("startNum") == player.get("startNum")
),
None,
)
if stat:
player["stats"] = stat
team["total"] = box_team.get("total", {})
game_data["scoreByPeriods"] = box_score_data["result"].get(
"scoreByPeriods", []
)
game_data["fullScore"] = box_score_data["result"].get("fullScore", {})
# плей-бай-плей и live_status
game_data["plays"] = (play_by_play_data or {}).get("result", [])
if live_status_data and "result" in live_status_data:
game_data["live_status"] = live_status_data["result"]
merged: Dict[str, Any] = {
"meta": {
"generatedAt": _now_iso(),
"sourceHints": {
"boxScoreHas": "",
"pbpLen": "",
},
},
"result": game_data,
}
return merged
def render_loop(stop_event: threading.Event, out_name: str = "game") -> None:
"""
Поток рендера.
Пока матч идёт (или пока мы не сказали стоп), крутится так:
- собрал текущее state через build_render_state()
- посчитал командную статистику (Team_Both_Stat)
- посчитал ростер/стартеров/лидеров (Json_Team_Generation)
- посчитал счёт по четвертям (Scores_Quarter)
- всё это положил в static/*.json
Цикл выходит, когда stop_event.is_set() == True.
"""
logger.info("[RENDER_THREAD] start render loop")
while not stop_event.is_set():
try:
state = build_render_state()
Team_Both_Stat(state)
Json_Team_Generation(state, who="team1")
Json_Team_Generation(state, who="team2")
Scores_Quarter(state)
# live_status отдельно, + общий state в <out_name>.json
atomic_write_json([state["result"]["live_status"]], "live_status")
atomic_write_json(state["result"], out_name)
except Exception as ex:
logger.exception(f"[RENDER_THREAD] error while building render state: {ex}")
time.sleep(0.2)
logger.info("[RENDER_THREAD] stop render loop")
def run_live_loop(
league: str,
season: str,
game_id: int,
lang: str,
game_meta: dict,
stop_event: threading.Event,
):
logger.info(
f"[LIVE_THREAD] start live loop for game_id={game_id} (league={league}, season={season})"
)
session = create_session()
# поток рендера
render_thread = threading.Thread(
target=render_loop,
args=(stop_event,),
daemon=False,
)
render_thread.start()
logger.info("[LIVE_THREAD] render thread spawned")
# поток standings
standings_thread = threading.Thread(
target=Standing_func,
args=(session, league, season, lang, stop_event),
daemon=False,
)
standings_thread.start()
logger.info("[LIVE_THREAD] standings thread spawned")
try:
poll_game_live(
session=session,
league=league,
season=season,
game_id=game_id,
lang=lang,
game_meta=game_meta,
stop_event=stop_event,
)
except Exception as e:
logger.exception(f"[LIVE_THREAD] crash in live loop for game_id={game_id}: {e}")
finally:
stop_event.set()
logger.info(f"[LIVE_THREAD] stopping worker threads for game_id={game_id}")
render_thread.join()
standings_thread.join()
logger.info(f"[LIVE_THREAD] stop live loop for game_id={game_id}")
def Referee(merged: dict, *, out_dir: str = "static") -> None:
"""
Поток, создающий JSON-файл с информацией о судьях матча.
"""
logger.info("START making json for referee")
desired_order = [
"Crew chief",
"Referee 1",
"Referee 2",
"Commissioner",
"Ст.судья",
"Судья 1",
"Судья 2",
"Комиссар",
]
try:
# Найти судей (teamNumber == 0)
team_ref = next(
(t for t in merged["result"]["teams"] if t["teamNumber"] == 0), None
)
if not team_ref:
logger.warning("Не найдена судейская бригада в данных.")
referees_raw = team_ref.get("starts", [])
# print(referees_raw)
referees = []
for r in referees_raw:
flag_code = r.get("countryId", "").lower() if r.get("countryName") else ""
referees.append(
{
"displayNumber": r.get("displayNumber", ""),
"positionName": r.get("positionName", ""),
"lastNameGFX": f"{r.get('firstName', '')} {r.get('lastName', '')}".strip(),
"secondName": r.get("secondName", ""),
"birthday": r.get("birthday", ""),
"age": r.get("age", 0),
"flag": f"https://flagicons.lipis.dev/flags/4x3/{flag_code}.svg",
}
)
# Сортировка по позиции
referees = sorted(
referees,
key=lambda x: (
desired_order.index(x["positionName"])
if x["positionName"] in desired_order
else len(desired_order)
),
)
out_path = "referee.json"
atomic_write_json(referees, out_path)
logging.info("Сохранил payload: {out_path}")
except Exception as e:
logger.error(f"Ошибка в Referee потоке: {e}", exc_info=True)
# ============================================================================
# 7. Постобработка статистики для вывода
# ============================================================================
def format_time(seconds: float | int) -> str:
"""
Удобный формат времени для игроков:
71 -> "1:11"
0 -> "0:00"
Любые кривые значения -> "0:00".
"""
try:
total_seconds = int(float(seconds))
minutes = total_seconds // 60
sec = total_seconds % 60
return f"{minutes}:{sec:02}"
except (ValueError, TypeError):
return "0:00"
def Json_Team_Generation(
merged: dict,
*,
who: Optional[str] = None,
) -> None:
"""
Формирует и записывает несколько JSON-файлов по составу и игрокам команды:
- <who>.json (полный список игроков с метриками)
- topTeam1.json / topTeam2.json (топ-игроки)
- started_team1.json / started_team2.json (игроки на паркете)
Вход:
merged: словарь из build_render_state()
who: "team1" или "team2"
"""
if who == "team1":
payload = next(
(i for i in merged["result"]["teams"] if i["teamNumber"] == 1), None
)
elif who == "team2":
payload = next(
(i for i in merged["result"]["teams"] if i["teamNumber"] == 2), None
)
else:
return
if not payload:
return
role_list = [
("Center", "C"),
("Guard", "G"),
("Forward", "F"),
("Power Forward", "PF"),
("Small Forward", "SF"),
("Shooting Guard", "SG"),
("Point Guard", "PG"),
("Forward-Center", "FC"),
]
starts = payload.get("starts", [])
team_rows = []
for item in starts:
stats = item.get("stats") or {}
# маппинг одной строки игрока
row = {
"id": item.get("personId") or "",
"num": item.get("displayNumber"),
"startRole": item.get("startRole"),
"role": item.get("positionName"),
"roleShort": (
[
r[1]
for r in role_list
if r[0].lower() == (item.get("positionName") or "").lower()
][0]
if any(
r[0].lower() == (item.get("positionName") or "").lower()
for r in role_list
)
else ""
),
"NameGFX": (
f"{(item.get('firstName') or '').strip()} {(item.get('lastName') or '').strip()}".strip()
if item.get("firstName") is not None
and item.get("lastName") is not None
else "Команда"
),
"captain": item.get("isCapitan", False),
"age": item.get("age") or 0,
"height": f"{item.get('height')} cm" if item.get("height") else 0,
"weight": f"{item.get('weight')} kg" if item.get("weight") else 0,
"isStart": stats.get("isStart", False),
"isOn": "🏀" if stats.get("isOnCourt") is True else "",
"flag": (
"https://flagicons.lipis.dev/flags/4x3/"
+ (
"ru"
if item.get("countryId") is None
and item.get("countryName") == "Russia"
else (
"" if item.get("countryId") is None
else (item.get("countryId") or "").lower()
if item.get("countryName") is not None
else ""
)
)
+ ".svg"
),
"pts": stats.get("points", 0),
"pt-2": f"{stats.get('goal2',0)}/{stats.get('shot2',0)}" if stats else 0,
"pt-3": f"{stats.get('goal3',0)}/{stats.get('shot3',0)}" if stats else 0,
"pt-1": f"{stats.get('goal1',0)}/{stats.get('shot1',0)}" if stats else 0,
"fg": (
f"{stats.get('goal2',0)+stats.get('goal3',0)}/"
f"{stats.get('shot2',0)+stats.get('shot3',0)}"
if stats
else 0
),
"ast": stats.get("assist", 0),
"stl": stats.get("steal", 0),
"blk": stats.get("block", 0),
"blkVic": stats.get("blocked", 0),
"dreb": stats.get("defReb", 0),
"oreb": stats.get("offReb", 0),
"reb": stats.get("defReb", 0) + stats.get("offReb", 0),
"to": stats.get("turnover", 0),
"foul": stats.get("foul", 0),
"foulT": stats.get("foulT", 0),
"foulD": stats.get("foulD", 0),
"foulC": stats.get("foulC", 0),
"foulB": stats.get("foulB", 0),
"fouled": stats.get("foulsOn", 0),
"plusMinus": stats.get("plusMinus", 0),
"dunk": stats.get("dunk", 0),
"kpi": (
stats.get("points", 0)
+ stats.get("defReb", 0)
+ stats.get("offReb", 0)
+ stats.get("assist", 0)
+ stats.get("steal", 0)
+ stats.get("block", 0)
+ stats.get("foulsOn", 0)
+ (stats.get("goal1", 0) - stats.get("shot1", 0))
+ (stats.get("goal2", 0) - stats.get("shot2", 0))
+ (stats.get("goal3", 0) - stats.get("shot3", 0))
- stats.get("turnover", 0)
- stats.get("foul", 0)
),
"time": format_time(stats.get("second", 0)),
"pts1q": 0,
"pts2q": 0,
"pts3q": 0,
"pts4q": 0,
"pts1h": 0,
"pts2h": 0,
"Name1GFX": (item.get("firstName") or "").strip(),
"Name2GFX": (item.get("lastName") or "").strip(),
"photoGFX": (
os.path.join(
"D:\\Photos",
merged["result"]["league"]["abcName"],
merged["result"][who]["name"],
f"{item.get('displayNumber')}.png",
)
if item.get("startRole") == "Player"
else ""
),
"isOnCourt": stats.get("isOnCourt", False),
}
team_rows.append(row)
# добиваем до 12 строк, чтобы UI был ровный
count_player = sum(1 for x in team_rows if x["startRole"] == "Player")
if count_player < 12 and team_rows:
filler_count = (4 if count_player <= 4 else 12) - count_player
template_keys = list(team_rows[0].keys())
for _ in range(filler_count):
empty_row = {}
for key in template_keys:
if key in ["captain", "isStart", "isOnCourt"]:
empty_row[key] = False
elif key in [
"id",
"pts",
"weight",
"height",
"age",
"ast",
"stl",
"blk",
"blkVic",
"dreb",
"oreb",
"reb",
"to",
"foul",
"foulT",
"foulD",
"foulC",
"foulB",
"fouled",
"plusMinus",
"dunk",
"kpi",
]:
empty_row[key] = 0
else:
empty_row[key] = ""
team_rows.append(empty_row)
# сортируем игроков по типу роли: сначала "Player", потом "", потом "Coach" и т.д.
role_priority = {
"Player": 0,
"": 1,
"Coach": 2,
"Team": 3,
None: 4,
"Other": 5,
}
sorted_team = sorted(
team_rows,
key=lambda x: role_priority.get(x.get("startRole", 99), 99),
)
# пишем полный ростер команды
atomic_write_json(sorted_team, who)
logger.info(f"Сохранил payload: {who}.json")
# топ-игроки по очкам/подборам/ассистам и т.д.
top_sorted_team = sorted(
(p for p in sorted_team if p.get("startRole") in ["Player", ""]),
key=lambda x: (
x.get("pts", 0),
x.get("dreb", 0) + x.get("oreb", 0),
x.get("ast", 0),
x.get("stl", 0),
x.get("blk", 0),
x.get("time", "0:00"),
),
reverse=True,
)
# пустые строки не должны ломать UI процентами фолов/очков
for player in top_sorted_team:
if player.get("num", "") == "":
player["pts"] = ""
player["foul"] = ""
top_name = f"top{who.replace('t', 'T')}"
atomic_write_json(top_sorted_team, top_name)
logger.info(f"Сохранил payload: {top_name}.json")
# кто прямо сейчас на площадке
started_team = sorted(
(
p
for p in sorted_team
if p.get("startRole") == "Player" and p.get("isOnCourt") is True
),
key=lambda x: int(x.get("num") or 0),
)
started_name = f"started_{who}"
atomic_write_json(started_team, started_name)
logger.info(f"Сохранил payload: {started_name}.json")
def time_outs_func(data_pbp: List[dict]) -> Tuple[str, int, str, int]:
"""
Считает таймауты для обеих команд и формирует читабельные строки вида:
"2 Time-outs left in 2nd half"
Возвращает:
(строка_для_команды1, остаток1, строка_для_команды2, остаток2)
"""
timeout1 = []
timeout2 = []
for event in data_pbp:
if event.get("play") == 23: # 23 == таймаут
if event.get("startNum") == 1:
timeout1.append(event)
elif event.get("startNum") == 2:
timeout2.append(event)
def timeout_status(timeout_list: List[dict], last_event: dict) -> Tuple[str, int]:
period = last_event.get("period", 0)
sec = last_event.get("sec", 0)
if period < 3:
timeout_max = 2
count = sum(1 for t in timeout_list if t.get("period", 0) <= period)
quarter = "1st half"
elif period < 5:
count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period)
quarter = "2nd half"
# в концовке 4-й четверти лимит может ужиматься
if period == 4 and sec >= 4800 and count in (0, 1):
timeout_max = 2
else:
timeout_max = 3
else:
timeout_max = 1
count = sum(1 for t in timeout_list if t.get("period", 0) == period)
quarter = f"OverTime {period - 4}"
left = max(0, timeout_max - count)
word = "Time-outs" if left != 1 else "Time-out"
text = f"{left if left != 0 else 'No'} {word} left in {quarter}"
return text, left
if not data_pbp:
return "", 0, "", 0
last_event = data_pbp[-1]
t1_str, t1_left = timeout_status(timeout1, last_event)
t2_str, t2_left = timeout_status(timeout2, last_event)
return t1_str, t1_left, t2_str, t2_left
def add_data_for_teams(new_data: List[dict]) -> Tuple[float, List[Any], float]:
"""
Считает командные агрегаты:
- средний возраст
- очки со старта vs со скамейки, + их проценты
- средний рост
Возвращает кортеж:
(avg_age, [start_pts, start%, bench_pts, bench%], avg_height_cm)
"""
players = [item for item in new_data if item.get("startRole") == "Player"]
points_start = 0
points_bench = 0
total_age = 0
total_height = 0
player_count = len(players)
for player in players:
stats = player.get("stats") or {}
if stats:
if stats.get("isStart") is True:
points_start += stats.get("points", 0)
elif stats.get("isStart") is False:
points_bench += stats.get("points", 0)
total_age += player.get("age") or 0
total_height += player.get("height") or 0
total_points = points_start + points_bench
points_start_pro = (
f"{round(points_start * 100 / total_points)}%" if total_points else "0%"
)
points_bench_pro = (
f"{round(points_bench * 100 / total_points)}%" if total_points else "0%"
)
avg_age = round(total_age / player_count, 1) if player_count else 0
avg_height = round(total_height / player_count, 1) if player_count else 0
points = [points_start, points_start_pro, points_bench, points_bench_pro]
return avg_age, points, avg_height
def add_new_team_stat(
data: dict,
avg_age: float,
points: List[Any],
avg_height: float,
timeout_str: str,
timeout_left: int,
) -> dict:
"""
Берёт словарь total по команде (очки, подборы, броски и т.д.),
добавляет:
- проценты попаданий
- средний возраст / рост
- очки старт / бенч
- информацию по таймаутам
и всё приводит к строкам (для UI, чтобы не ловить типы).
Возвращает обновлённый словарь.
"""
def safe_int(v):
try:
return int(v)
except (ValueError, TypeError):
return 0
def format_percent(goal, shot):
goal, shot = safe_int(goal), safe_int(shot)
return f"{round(goal * 100 / shot)}%" if shot else "0%"
goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1"))
goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2"))
goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3"))
def_reb = safe_int(data.get("defReb"))
off_reb = safe_int(data.get("offReb"))
data.update(
{
"pt-1": f"{goal1}/{shot1}",
"pt-2": f"{goal2}/{shot2}",
"pt-3": f"{goal3}/{shot3}",
"fg": f"{goal2 + goal3}/{shot2 + shot3}",
"pt-1_pro": format_percent(goal1, shot1),
"pt-2_pro": format_percent(goal2, shot2),
"pt-3_pro": format_percent(goal3, shot3),
"fg_pro": format_percent(goal2 + goal3, shot2 + shot3),
"Reb": str(def_reb + off_reb),
"avgAge": str(avg_age),
"ptsStart": str(points[0]),
"ptsStart_pro": str(points[1]),
"ptsBench": str(points[2]),
"ptsBench_pro": str(points[3]),
"avgHeight": f"{avg_height} cm",
"timeout_left": str(timeout_left),
"timeout_str": str(timeout_str),
}
)
# всё -> строки (UI не должен думать о типах)
for k in data:
data[k] = str(data[k])
return data
# Статическая таблица "как назвать метрику"
stat_name_list = [
("points", "Очки", "points"),
("pt-1", "Штрафные", "free throws"),
("pt-1_pro", "штрафные, процент", "free throws pro"),
("pt-2", "2-очковые", "2-points"),
("pt-2_pro", "2-очковые, процент", "2-points pro"),
("pt-3", "3-очковые", "3-points"),
("pt-3_pro", "3-очковые, процент", "3-points pro"),
("fg", "очки с игры", "field goals"),
("fg_pro", "Очки с игры, процент", "field goals pro"),
("assist", "Передачи", "assists"),
("pass", "", ""),
("defReb", "подборы в защите", ""),
("offReb", "подборы в нападении", ""),
("Reb", "Подборы", "rebounds"),
("steal", "Перехваты", "steals"),
("block", "Блокшоты", "blocks"),
("blocked", "", ""),
("turnover", "Потери", "turnovers"),
("foul", "Фолы", "fouls"),
("foulsOn", "", ""),
("foulT", "", ""),
("foulD", "", ""),
("foulC", "", ""),
("foulB", "", ""),
("second", "секунды", "seconds"),
("dunk", "данки", "dunks"),
("fastBreak", "", "fast breaks"),
("plusMinus", "+/-", "+/-"),
("avgAge", "", "avg Age"),
("ptsBench", "", "Bench PTS"),
("ptsBench_pro", "", "Bench PTS, %"),
("ptsStart", "", "Start PTS"),
("ptsStart_pro", "", "Start PTS, %"),
("avgHeight", "", "avg height"),
("timeout_left", "", "timeout left"),
("timeout_str", "", "timeout str"),
]
def Team_Both_Stat(merged: dict) -> None:
"""
Формирует сводку по двум командам и пишет её в static/team_stats.json.
Делает:
- считает таймауты для обеих команд,
- считает средний возраст / рост,
- считает очки старт / скамейка,
- добавляет проценты попаданий, подборы и т.д.,
- мапит имена метрик на удобные подписи.
"""
logger.info("START making json for team statistics")
try:
teams = merged["result"]["teams"]
plays = merged["result"].get("plays", [])
# Разделяем команды по teamNumber
team_1 = next((t for t in teams if t["teamNumber"] == 1), None)
team_2 = next((t for t in teams if t["teamNumber"] == 2), None)
if not team_1 or not team_2:
logger.warning("Не найдены обе команды в данных")
return
# Таймауты
timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays)
# Возраст / очки / рост
avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1.get("starts", []))
avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2.get("starts", []))
if not team_1.get("total") or not team_2.get("total"):
logger.debug("Нет total у команд — пропускаю перезапись team_stats.json")
# Добавляем в total агрегаты
total_1 = add_new_team_stat(
team_1["total"],
avg_age_1,
points_1,
avg_height_1,
timeout_str1,
timeout_left1,
)
total_2 = add_new_team_stat(
team_2["total"],
avg_age_2,
points_2,
avg_height_2,
timeout_str2,
timeout_left2,
)
# Готовим список пар "метрика -> команда1 vs команда2"
result_json = []
for key in total_1:
val1 = total_1[key]
val2 = total_2[key]
stat_rus = ""
stat_eng = ""
for metric_name, rus, eng in stat_name_list:
if metric_name == key:
stat_rus, stat_eng = rus, eng
break
result_json.append(
{
"name": key,
"nameGFX_rus": stat_rus,
"nameGFX_eng": stat_eng,
"val1": val1,
"val2": val2,
}
)
atomic_write_json(result_json, "team_stats")
logger.info("Сохранил payload: team_stats.json")
except Exception as e:
logger.error(f"Ошибка при обработке командной статистики: {e}", exc_info=True)
def Scores_Quarter(merged: dict) -> None:
"""
Пишет счёт по четвертям и овертаймам в static/scores.json.
Логика:
- если есть game.result.game.fullScore -> парсим "XX:YY,AA:BB,..."
- иначе используем scoreByPeriods из box-score
"""
logger.info("START making json for scores quarter")
quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"]
score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters]
try:
full_score_str = merged.get("result", {}).get("game", {}).get("fullScore", "")
if full_score_str:
# пример: "19:15,20:22,18:18,25:10"
full_score_list = full_score_str.split(",")
for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]):
parts = score_str.split(":")
if len(parts) == 2:
score_by_quarter[i]["score1"] = parts[0]
score_by_quarter[i]["score2"] = parts[1]
logger.info("Счёт по четвертям получен из fullScore.")
elif "scoreByPeriods" in merged.get("result", {}):
periods = merged["result"]["scoreByPeriods"]
for i, score in enumerate(periods[: len(score_by_quarter)]):
score_by_quarter[i]["score1"] = str(score.get("score1", ""))
score_by_quarter[i]["score2"] = str(score.get("score2", ""))
logger.info("Счёт по четвертям получен из scoreByPeriods.")
else:
logger.debug("Нет данных по счёту, сохраняем пустые значения.")
atomic_write_json(score_by_quarter, "scores")
logger.info("Сохранил payload: scores.json")
except Exception as e:
logger.error(f"Ошибка в Scores_Quarter: {e}", exc_info=True)
def Standing_func(
session: requests.Session,
league: str,
season: str,
lang: str,
stop_event: threading.Event,
out_dir: str = "static",
) -> None:
"""
Фоновый поток с турнирной таблицей (standings).
Что делает:
- Периодически (не чаще, чем interval для "standings" в URLS) тянет /standings
для лиги+сезона.
- Для каждой подтаблицы (regular season, playoffs и т.д.) нормализует данные,
досчитывает полезные колонки (W/L, %) и сохраняет в
static/standings_<league>_<compName>.json
- Останавливается, когда поднят stop_event.
Почему отдельный поток?
- standings нам нужна даже во время лайва игры, но не каждую секунду.
- Она не должна блокировать рендер и не должна блокировать poll_game_live.
"""
logger.info("[STANDINGS_THREAD] start standings loop")
# когда мы последний раз успешно обновили standings
last_call_ts = 0
# как часто вообще можно дёргать standings
interval = get_interval_by_name("standings")
while not stop_event.is_set():
now = time.time()
# достаточно рано? если нет — просто подожди немного
if now - last_call_ts < interval:
time.sleep(1)
continue
try:
# тянем свежие данные standings тем же способом, что в get_data_API
data_standings = fetch_api_data(
session,
"standings",
host=HOST,
league=league,
season=season,
lang=lang,
)
# fetch_api_data для standings вернёт либо:
# - dict с "items": [...], либо
# - сам массив items (если get_items нашёл список)
# Мы хотим привести к единому формату, как было в твоём коде.
if not data_standings:
logger.debug("[STANDINGS_THREAD] standings empty")
# не обновляем last_call_ts, чтобы через секунду попытаться снова
time.sleep(1)
continue
# Если data_standings оказался списком, приведём к виду {"items": [...]}:
if isinstance(data_standings, list):
items = data_standings
else:
items = data_standings.get("items") or []
if not items:
logger.debug("[STANDINGS_THREAD] no items in standings")
last_call_ts = now # запрос был успешным, но пустым
continue
# Обрабатываем каждый "item" внутри standings:
for item in items:
comp = item.get("comp", {})
comp_name = (comp.get("name") or "unknown_comp").replace(" ", "_")
# 1) обычная таблица регулярки
if item.get("standings"):
standings_rows = item["standings"]
# pandas нормализация
df = pd.json_normalize(standings_rows)
# убираем поле 'scores', если есть
if "scores" in df.columns:
df = df.drop(columns=["scores"])
# добавляем w_l, procent, plus_minus если есть нужные столбцы
if (
"totalWin" in df.columns
and "totalDefeat" in df.columns
and "totalGames" in df.columns
and "totalGoalPlus" in df.columns
and "totalGoalMinus" in df.columns
):
# W / L
df["w_l"] = (
df["totalWin"].fillna(0).astype(int).astype(str)
+ " / "
+ df["totalDefeat"].fillna(0).astype(int).astype(str)
)
# % побед
def calc_percent(row):
win = row.get("totalWin", 0)
games = row.get("totalGames", 0)
if (
pd.isna(win)
or pd.isna(games)
or games == 0
or (row["w_l"] == "0 / 0")
):
return 0
return round(win * 100 / games + 0.000005)
df["procent"] = df.apply(calc_percent, axis=1)
# +/- по очкам
df["plus_minus"] = (
df["totalGoalPlus"].fillna(0).astype(int)
- df["totalGoalMinus"].fillna(0).astype(int)
)
# готовим питоновский список словарей для атомарной записи
standings_payload = df.to_dict(orient="records")
filename = f"standings_{league}_{comp_name}"
atomic_write_json(standings_payload, filename, out_dir)
logger.info(
f"[STANDINGS_THREAD] saved {filename}.json ({len(standings_payload)} rows)"
)
# 2) плейофф-пары (playoffPairs)
elif item.get("playoffPairs"):
playoff_rows = item["playoffPairs"]
df = pd.json_normalize(playoff_rows)
standings_payload = df.to_dict(orient="records")
filename = f"standings_{league}_{comp_name}"
atomic_write_json(standings_payload, filename, out_dir)
logger.info(
f"[STANDINGS_THREAD] saved {filename}.json (playoffPairs, {len(standings_payload)} rows)"
)
# если ни standings ни playoffPairs — просто пропускаем этот блок
else:
continue
# если всё прошло без исключения — фиксируем время удачного апдейта
last_call_ts = now
except Exception as e:
logger.warning(f"[STANDINGS_THREAD] ошибка в турнирном положении: {e}")
# не жрём CPU впустую
time.sleep(1)
logger.info("[STANDINGS_THREAD] stop standings loop")
# ============================================================================
# 8. Суточный цикл: находим игру, следим в лайве, потом уходим спать
# ============================================================================
def get_data_API(
session: requests.Session,
league: str,
team: str,
lang: str,
stop_event: threading.Event,
) -> None:
"""
Один "дневной прогон" логики:
1. Узнать текущий сезон
2. Обновить standings и calendar
3. Найти игру для нашей команды сегодня (today_game) или последнюю законченную (last_played)
4. Если есть last_played и нет игры сегодня -> просто забираем /game и пишем api_game.json
5. Если есть игра сегодня:
- пишем /game сессии
- если статус игры live -> запускаем run_live_loop() в отдельном потоке
и ждём его завершения (до конца матча)
6. Если нет ничего -> просто логируем и выходим
Эта функция НЕ усыпляет процесс — цикл сна делает main().
"""
# 1. сезоны
json_seasons = fetch_api_data(
session, "seasons", host=HOST, league=league, lang=lang
)
if not json_seasons:
logger.error("Не удалось получить список сезонов")
return
season = json_seasons[0]["season"]
# 2. standings и calendar
fetch_api_data(session, "standings", host=HOST, league=league, season=season, lang=lang)
json_calendar = fetch_api_data(
session, "calendar", host=HOST, league=league, season=season, lang=lang
)
if not json_calendar:
logger.error("Не удалось получить список матчей")
return
# 3. какая игра нас интересует?
today_game, last_played = get_game_id(json_calendar, team)
# 4. уже сыграли, но сегодня не играем -> просто пишем /game последнего матча
if last_played and not today_game:
game_id = last_played["game"]["id"]
logger.info(f"Последний завершённый матч id={game_id}")
fetch_api_data(session, "game", host=HOST, game_id=game_id, lang=lang)
return
# 5. матч сегодня
if today_game:
game_id = today_game["game"]["id"]
logger.info(f"Онлайн матч id={game_id}")
# всегда пишем /game прямо сейчас (обновим api_game.json)
fetch_api_data(session, "game", host=HOST, game_id=game_id, lang=lang)
# если матч идёт — стартуем live
if is_game_live(today_game["game"]):
t = threading.Thread(
target=run_live_loop,
args=(league, season, game_id, lang, today_game["game"], stop_event),
daemon=False,
)
t.start()
logger.info("live thread spawned, waiting for it to finish...")
try:
t.join()
except KeyboardInterrupt:
logger.info("KeyboardInterrupt while waiting live thread -> stop_event")
stop_event.set()
t.join()
logger.info("live thread finished")
return
# 6. ничего подходящего
logger.info("Для этой команды игр сегодня нет и нет завершённой последней игры.")
def main() -> None:
"""
Главный цикл демона.
Работает бесконечно:
- собирает данные на сегодня (get_data_API)
- если нужно, следит за матчем в реальном времени до свистка
- после этого уходит спать до 00:05 следующего дня по APP_TZ
- повторяет
Ctrl+C:
- моментально поднимает stop_event и завершает работу.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--league", required=True)
parser.add_argument("--team", required=True)
parser.add_argument("--lang", default="en")
args = parser.parse_args()
while True:
# на каждый "прогон дня" — своя HTTP-сессия и свой stop_event
session = create_session()
stop_event = threading.Event()
try:
get_data_API(session, args.league, args.team, args.lang, stop_event)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt -> остановка по запросу оператора")
stop_event.set()
break
except Exception as e:
# мы не падаем навсегда, а логируем, чтобы демон продолжил жить
logger.exception(f"main loop crash: {e}")
# === сон до завтрашних 00:05 по APP_TZ ===
now = datetime.now(APP_TZ)
tomorrow = (now + timedelta(days=1)).replace(
hour=0, minute=5, second=0, microsecond=0
)
sleep_seconds = (tomorrow - now).total_seconds()
if sleep_seconds < 0:
# защита, если вдруг текущее время уже после 00:05 и replace() дал прошедшее
tomorrow = (now + timedelta(days=2)).replace(
hour=0, minute=5, second=0, microsecond=0
)
sleep_seconds = (tomorrow - now).total_seconds()
logger.info(
f"Работа завершена. Засыпаем до {tomorrow.strftime('%d.%m %H:%M')} "
f"(~{round(sleep_seconds/3600, 2)} ч)."
)
try:
time.sleep(sleep_seconds)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt во время сна -> выходим.")
break
# идём на новую итерацию while True
# (новая сессия / новый stop_event создаются в начале цикла)
# ============================================================================
# 9. Точка входа
# ============================================================================
if __name__ == "__main__":
main()