Files
RFB/get_data.py

2114 lines
82 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Шаблон: мониторинг онлайн-матча по лиге/команде
Функции:
1) Валидация тега (--league), иначе: лог + сообщение в Telegram и завершение.
2) Получение номера последнего сезона из JSON по ссылке. Ошибки -> лог + Telegram.
3) Загрузка расписания по ссылке, поиск игры на сегодня для --team.
- Если сегодня нет матча — берём последний сыгранный (если есть), иначе лог + Telegram.
- Если сегодня есть игра — запускаем поток-монитор:
* статус «онлайн?» проверяем РАЗ В МИНУТУ;
* при статусе онлайн — КАЖДУЮ СЕКУНДУ дергаем три запроса:
box-score, play-by-play, live-status.
* если не онлайн — ждём минуту до следующей проверки.
4) Ежедневная перекладка: каждый следующий день повторно проверяем расписание
для команды. Если матча нет — один раз подгружаем последний сыгранный и ждём следующего дня.
ЗАМЕТКИ:
- Заполни URL_* и функции-экстракторы JSON под свой формат.
- Для Telegram используй переменные окружения TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID.
"""
from __future__ import annotations
import os
import sys
import time
import json
import argparse
import logging
import pandas as pd
import logging.config
import threading
import concurrent.futures
import queue
import platform
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from typing import Any, Dict, List
import tempfile
from pathlib import Path
from threading import Event, Lock
import requests
# ==========================
# ---- НАСТРОЙКИ/КОНСТАНТЫ
# ==========================
APP_TZ = ZoneInfo("Europe/Moscow")
# Разрешённые теги лиг
ALLOWED_LEAGUES = {
"vtb", # Единая Лига ВТБ
"vtbyouth", # Молодежка ВТБ
"vtb-supercup", # Супер-Кубок ЕЛ ВТБ
"msl", # Высшая лига. Мужчины
"mcup", # Кубок России. Мужчины
"wpremier", # Премьер-Лига. Женщины
"wsl", # Суперлига. Женщины
"whl", # Высшая лига. Женщины
"wcup", # Кубок России. Женщины
"dubl-b", # Дюбл до 19 лет
# "pr-mezhreg-w13", # Межрегиональные соревнования до 14 лет
# добавляй свои…
}
DEFAULT_LEAGUE = "vtb"
DEFAULT_LANG = "en"
# URL-шаблоны (замени на реальные)
HOST = "ref.russiabasket.org"
URL_SEASON = "https://{host}/api/abc/comps/seasons?Tag={league}&Lang={lang}" # вернёт JSON со списком сезонов
URL_SCHEDULE = "https://{host}/api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000" # расписание лиги (или команды)
# Статус конкретной игры (используется для проверки "онлайн?" раз в минуту)
URL_GAME = "https://{host}/api/abc/games/game?Id={game_id}&lang={lang}"
# Быстрые запросы, когда матч онлайн (каждую секунду)
URL_BOX_SCORE = "https://{host}/api/abc/games/box-score?Id={game_id}&lang={lang}"
URL_PLAY_BY_PLAY = "https://{host}/api/abc/games/play-by-play?Id={game_id}&lang={lang}"
URL_LIVE_STATUS = "https://{host}/api/abc/games/live-status?Id={game_id}&lang={lang}"
URL_STANDINGS = "https://{host}/api/abc/comps/actual-standings?tag={league}&season={season}&lang={lang}"
# Интервалы опроса
STATUS_CHECK_INTERVAL_SEC = 60 # проверять "онлайн?" раз в минуту
ONLINE_FETCH_INTERVAL_SEC = 1 # когда матч онлайн, дергать три запроса каждую секунду
POLL_INTERVAL_OFFLINE_SEC = 300 # резервный интервал сна при ошибках/до старта
TIMEOUT_DATA_OFF = 600
TELEGRAM_BOT_TOKEN = "7639240596:AAH0YtdQoWZSC-_R_EW4wKAHHNLIA0F_ARY"
# TELEGRAM_CHAT_ID = 228977654
TELEGRAM_CHAT_ID = -4803699526
MYHOST = platform.node()
if not os.path.exists("logs"):
os.makedirs("logs")
LOG_CONFIG = {
"version": 1,
"handlers": {
"telegram": {
"class": "telegram_handler.TelegramHandler",
"level": "INFO",
"token": TELEGRAM_BOT_TOKEN,
"chat_id": TELEGRAM_CHAT_ID,
"formatter": "telegram",
},
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "simple",
"filename": f"logs/GFX_{MYHOST}.log",
"encoding": "utf-8",
},
},
"loggers": {
__name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"},
},
"formatters": {
"telegram": {
"class": "telegram_handler.HtmlFormatter",
"format": f"%(levelname)s [{MYHOST.upper()}] %(message)s",
"use_emoji": "True",
},
"simple": {
"class": "logging.Formatter",
"format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s",
"datefmt": "%d.%m.%Y %H:%M:%S",
},
},
}
logging.config.dictConfig(LOG_CONFIG)
logger = logging.getLogger(__name__)
logger.handlers[2].formatter.use_emoji = True
# ==========================
# ---- УТИЛИТЫ
# ==========================
def fetch_json(
url: str, params: dict | None = None, session: requests.Session | None = None
) -> dict:
"""
GET JSON с таймаутом и внятными ошибками.
Использует переданный session для keep-alive.
"""
sess = session or requests
try:
r = sess.get(url, params=params, timeout=(3.0, 4.0)) # (connect, read)
r.raise_for_status()
return r.json()
except requests.HTTPError as he:
raise RuntimeError(f"HTTP {he.response.status_code} для {url}") from he
except requests.RequestException as re:
raise RuntimeError(f"Сетевой сбой для {url}: {re}") from re
except json.JSONDecodeError as je:
raise RuntimeError(f"Некорректный JSON на {url}: {je}") from je
# ==========================
# ---- ЭКСТРАКТОРЫ ИЗ JSON
# ==========================
def extract_last_season(data: dict) -> str:
"""
Вытаскиваем последний сезон, в списке он первый
"""
try:
seasons = data["items"]
if not seasons:
raise ValueError("пустой список сезонов")
last = seasons[0]
season_id = last["season"]
return str(season_id)
except Exception as e:
raise RuntimeError(f"Не удалось извлечь последний сезон: {e}") from e
def extract_team_schedule_for_season(data: dict, team_code: str) -> list[dict]:
"""
Верни список игр команды. Адаптируй ключи под реальный JSON.
Предполагаем формат игр:
{
"gameId": "12345",
"home": "BOS",
"away": "LAL",
"startTimeUTC": "2025-10-23T18:00:00Z",
"status": "finished|scheduled|inprogress"
}
"""
try:
games = data["items"]
team_games = [
g for g in games if g.get("team1").get("name").lower() == team_code.lower()
]
return team_games
except Exception as e:
raise RuntimeError(f"Не удалось извлечь расписание команды: {e}") from e
def parse_game_start_dt(item: dict) -> datetime:
"""
Достаёт дату/время начала матча из объекта расписания и приводит к APP_TZ.
Приоритет полей:
1) game.defaultZoneDateTime — уже в "дефолтной зоне" лиги (например, +03:00)
2) game.scheduledTime — ISO 8601 с оффсетом (например, 2025-09-30T19:00:00+04:00)
3) game.startTime — если API когда-то его заполняет
4) (fallback) game.localDate + game.localTime — считаем, что это локальное время площадки, задаём tz=APP_TZ
Возвращает aware-datetime в APP_TZ.
"""
g = item.get("game", {}) if "game" in item else item
raw = g.get("defaultZoneDateTime") or g.get("scheduledTime") or g.get("startTime")
if raw:
try:
dt = datetime.fromisoformat(raw) # ISO-8601
return dt.astimezone(APP_TZ)
except Exception as e:
raise RuntimeError(f"Ошибка парсинга ISO времени '{raw}': {e}")
# Fallback: localDate + localTime (пример: "30.09.2025" + "19:00")
ld, lt = g.get("localDate"), g.get("localTime")
if ld and lt:
try:
naive = datetime.strptime(f"{ld} {lt}", "%d.%m.%Y %H:%M")
aware = naive.replace(tzinfo=APP_TZ)
return aware
except Exception as e:
raise RuntimeError(f"Ошибка парсинга localDate/localTime '{ld} {lt}': {e}")
raise RuntimeError(
"Не найдено ни одного подходящего поля времени (defaultZoneDateTime/scheduledTime/startTime/localDate+localTime)."
)
def extract_game_status(data: dict) -> str:
"""
Ожидаем JSON вида {"status":"inprogress|scheduled|finished"}
"""
try:
return str(data["result"]["status"]["id"]).lower()
except Exception as e:
raise RuntimeError(f"Не удалось извлечь статус матча: {e}") from e
# ==========================
# ---- ДОП. ЗАПРОСЫ ПРИ ОНЛАЙНЕ
# ==========================
def fetch_box_score(
league: str, game_id: str, lang: str, session: requests.Session | None = None
) -> dict:
url = URL_BOX_SCORE.format(host=HOST, league=league, game_id=game_id, lang=lang)
return fetch_json(url, session=session)
def fetch_play_by_play(
league: str, game_id: str, lang: str, session: requests.Session | None = None
) -> dict:
url = URL_PLAY_BY_PLAY.format(host=HOST, league=league, game_id=game_id, lang=lang)
return fetch_json(url, session=session)
def fetch_live_status(
league: str, game_id: str, lang: str, session: requests.Session | None = None
) -> dict:
url = URL_LIVE_STATUS.format(host=HOST, league=league, game_id=game_id, lang=lang)
return fetch_json(url, session=session)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _get(d: dict | None, *path, default=None):
"""Безопасно достаём вложенные ключи: _get(d, "result", "fullScore", default={})"""
cur = d or {}
for p in path:
if not isinstance(cur, dict) or p not in cur:
return default
cur = cur[p]
return cur
def _dedup_plays(plays: List[dict]) -> List[dict]:
"""
Удаляем дубли по стабильному идентификатору события.
Приоритет: eventId -> id -> (sequence, clock, teamId, type)
"""
seen = set()
out = []
for ev in plays:
if not isinstance(ev, dict):
continue
key = (
ev.get("eventId")
or ev.get("id")
or (ev.get("sequence"), ev.get("clock"), ev.get("teamId"), ev.get("type"))
)
if key in seen:
continue
seen.add(key)
out.append(ev)
# если есть поле sequence/time — отсортируем, чтобы обработчик получал стабильный порядок
out.sort(
key=lambda e: (
e.get("sequence") is None,
e.get("sequence"),
e.get("time") or e.get("clock"),
)
)
return out
def merge_online_payloads(
game: dict,
box_score: dict | None,
play_by_play: dict | None,
live_status: dict | None,
) -> Dict[str, Any]:
"""
Склеивает онлайн-ответы в единый компактный payload для downstream-обработки.
Ничего не знает о внутренней логике обработки — только нормализует.
"""
# исходные куски
# plays_raw: List[dict] = _get(play_by_play, "result", default=[]) or []
# score_by_periods = _get(box_score, "result", "scoreByPeriods", default=[]) or []
# full_score = _get(box_score, "result", "fullScore", default={}) or {}
# teams = _get(box_score, "result", "teams", default={}) or {} # если пригодится в обработчике
# players = _get(box_score, "result", "players", default=[]) or []
# box_score = _get(box_score, "result", "teams", default=[]) or []
# fullScore = _get(box_score, "result", "fullScore", default="") or ""
# # live
# live_status = _get(live_status, "result", "live_status")
# period = _get(live_status, "result", "period")
# clock = _get(live_status, "result", "clock")
# status = _get(live_status, "result", "status") # e.g., "inprogress", "ended", "scheduled"
# нормализация/дедуп
# plays = _dedup_plays(plays_raw)
# print(game["teams"])
# print(box_score)
for index_team, team in enumerate(game["teams"][1:]):
box_team = box_score["result"]["teams"][index_team]
for player in team.get("starts", []):
stat = next(
(
s
for s in box_team.get("starts", [])
if s.get("startNum") == player.get("startNum")
),
None,
)
if stat:
player["stats"] = stat
team["total"] = box_team.get("total", {})
game["plays"] = play_by_play.get("result", [])
game["scoreByPeriods"] = box_score["result"].get("scoreByPeriods", [])
game["fullScore"] = box_score["result"].get("fullScore", {})
game["live_status"] = live_status["result"]
# game[""]
merged: Dict[str, Any] = {
"meta": {
"generatedAt": _now_iso(),
"sourceHints": {
"boxScoreHas": list((_get(box_score, "result") or {}).keys()),
"pbpLen": "",
},
},
"result": game,
}
return merged
# где-то в твоём коде
def process_online_update(merged: dict) -> None:
"""
Здесь — любая твоя логика: обновить JSON-файлы, пересчитать агрегаты,
уведомить подписчиков, обновить кеш и т.д.
"""
# пример:
game_id = merged["meta"]["gameId"]
print(game_id)
# ... твоя обработка ...
def is_already_merged(obj: dict) -> bool:
"""
Проверяем, что объект уже содержит result.plays/fullScore/scoreByPeriods.
Подходит для ответа game (исторический матч).
"""
if not isinstance(obj, dict):
return False
res = obj.get("result") or obj.get("game") or obj # подстрахуемся под разные корни
if not isinstance(res, dict):
return False
r = res.get("result", res) # иногда внутри ещё один "result"
return (
isinstance(r, dict)
and isinstance(r.get("plays", []), list)
and isinstance(r.get("fullScore", {}), dict)
and isinstance(r.get("scoreByPeriods", []), list)
)
def ensure_merged_payload(
game_or_merged: dict | None = None,
*,
box_score: dict | None = None,
play_by_play: dict | None = None,
live_status: dict | None = None,
game_meta: dict | None = None, # например {"id": ..., "league": ...}
) -> dict:
"""
1) Если передан уже-склеенный payload (исторический матч) — нормализуем и возвращаем.
2) Иначе склеиваем из box/pbp/live через merge_online_payloads.
"""
# 1) Уже склеено (история) — просто привести к твоему контракту {meta, ids, result}
if game_or_merged and is_already_merged(game_or_merged):
g = game_or_merged.get("result") or game_or_merged # допускаем разные корни
# print(g)
with open("temp.json", "w", encoding="utf-8") as f:
json.dump(g, f, ensure_ascii=False, indent=2)
merged = {
"meta": {
"generatedAt": _now_iso(),
"sourceHints": {"from": "game_api", "pbpLen": len(g.get("plays", []))},
},
"result": g,
}
return merged
# 2) Онлайн-ветка — склеиваем так, как у тебя уже реализовано
if box_score is not None or play_by_play is not None or live_status is not None:
base_game = game_meta or {}
out_path = Path("static") / "game.json"
with open(out_path, "r", encoding="utf-8") as file:
game = json.load(file)
base_game = game["result"]
# print(base_game)
return merge_online_payloads(base_game, box_score, play_by_play, live_status)
# 2b) Fallback: если пришёл "game", но без plays/fullScore/scoreByPeriods — всё равно сохраним
if game_or_merged:
g = game_or_merged.get("result") or game_or_merged
return {
"meta": {
"generatedAt": _now_iso(),
"sourceHints": {"from": "game_api_raw"},
},
"result": g, # положим сырой ответ целиком — чтобы файл гарантированно записался
}
raise ValueError(
"ensure_merged_payload: не передан ни уже-склеенный game, ни box/pbp/live."
)
def atomic_write_json(path: str | Path, data: dict, ensure_dirs: bool = True) -> None:
path = Path(path)
if ensure_dirs:
path.parent.mkdir(parents=True, exist_ok=True)
# атомарная запись: пишем во временный файл и переименовываем
with tempfile.NamedTemporaryFile(
"w", delete=False, dir=str(path.parent), encoding="utf-8"
) as tmp:
json.dump(data, tmp, ensure_ascii=False, indent=2)
tmp.flush()
os.fsync(tmp.fileno())
tmp_name = tmp.name
os.replace(tmp_name, path)
def format_time(seconds: float | int) -> str:
"""
Форматирует время в секундах в строку "M:SS".
Args:
seconds (float | int): Количество секунд.
Returns:
str: Время в формате "M:SS".
"""
try:
total_seconds = int(float(seconds))
minutes = total_seconds // 60
sec = total_seconds % 60
return f"{minutes}:{sec:02}"
except (ValueError, TypeError):
return "0:00"
def Json_Team_Generation(
merged: dict, *, out_dir: str = "static", who: str | None = None
) -> None:
"""
Единая точка: принимает уже нормализованный merged, делает нужные вычисления (если надо)
и сохраняет в JSON.
"""
# Здесь можно делать любые расчёты/агрегации...
# Пример предохранителя: сортировка плей-бай-плея по sequence
# plays = merged.get("result", {}).get("plays", [])
# if plays and isinstance(plays, list):
# try:
# plays.sort(key=lambda e: (e.get("sequence") is None, e.get("sequence"), e.get("time") or e.get("clock")))
# except Exception:
# pass
# Имя файла
# print(merged)
# merged =
if who == "team1":
for i in merged["result"]["teams"]:
if i["teamNumber"] == 1:
payload = i
elif who == "team2":
for i in merged["result"]["teams"]:
if i["teamNumber"] == 2:
payload = i
# online = (
# True
# if json_live_status
# and "status" in json_live_status
# and json_live_status["status"] == "Ok"
# and json_live_status["result"]["gameStatus"] == "Online"
# else False
# )
online = False
role_list = [
("Center", "C"),
("Guard", "G"),
("Forward", "F"),
("Power Forward", "PF"),
("Small Forward", "SF"),
("Shooting Guard", "SG"),
("Point Guard", "PG"),
("Forward-Center", "FC"),
]
starts = payload["starts"]
team = []
for item in starts:
player = {
"id": (item["personId"] if item["personId"] else ""),
"num": item["displayNumber"],
"startRole": item["startRole"],
"role": item["positionName"],
"roleShort": (
[
r[1]
for r in role_list
if r[0].lower() == item["positionName"].lower()
][0]
if any(r[0].lower() == item["positionName"].lower() for r in role_list)
else ""
),
"NameGFX": (
f"{item['firstName'].strip()} {item['lastName'].strip()}"
if item["firstName"] is not None and item["lastName"] is not None
else "Команда"
),
"captain": item["isCapitan"],
"age": item["age"] if item["age"] is not None else 0,
"height": f'{item["height"]} cm' if item["height"] else 0,
"weight": f'{item["weight"]} kg' if item["weight"] else 0,
"isStart": (item["stats"]["isStart"] if item["stats"] else False),
"isOn": (
"🏀" if item["stats"] and item["stats"]["isOnCourt"] is True else ""
),
"flag": f"https://flagicons.lipis.dev/flags/4x3/{'ru' if item['countryId'] is None and item['countryName'] == 'Russia' else '' if item['countryId'] is None else item['countryId'].lower() if item['countryName'] is not None else ''}.svg",
"pts": item["stats"]["points"] if item["stats"] else 0,
"pt-2": (
f"{item['stats']['goal2']}/{item['stats']['shot2']}"
if item["stats"]
else 0
),
"pt-3": (
f"{item['stats']['goal3']}/{item['stats']['shot3']}"
if item["stats"]
else 0
),
"pt-1": (
f"{item['stats']['goal1']}/{item['stats']['shot1']}"
if item["stats"]
else 0
),
"fg": (
f"{item['stats']['goal2'] + item['stats']['goal3']}/{item['stats']['shot2'] + item['stats']['shot3']}"
if item["stats"]
else 0
),
"ast": item["stats"]["assist"] if item["stats"] else 0,
"stl": item["stats"]["steal"] if item["stats"] else 0,
"blk": item["stats"]["block"] if item["stats"] else 0,
"blkVic": item["stats"]["blocked"] if item["stats"] else 0,
"dreb": item["stats"]["defReb"] if item["stats"] else 0,
"oreb": item["stats"]["offReb"] if item["stats"] else 0,
"reb": (
item["stats"]["defReb"] + item["stats"]["offReb"]
if item["stats"]
else 0
),
"to": item["stats"]["turnover"] if item["stats"] else 0,
"foul": item["stats"]["foul"] if item["stats"] else 0,
"foulT": item["stats"]["foulT"] if item["stats"] else 0,
"foulD": item["stats"]["foulD"] if item["stats"] else 0,
"foulC": item["stats"]["foulC"] if item["stats"] else 0,
"foulB": item["stats"]["foulB"] if item["stats"] else 0,
"fouled": item["stats"]["foulsOn"] if item["stats"] else 0,
"plusMinus": item["stats"]["plusMinus"] if item["stats"] else 0,
"dunk": item["stats"]["dunk"] if item["stats"] else 0,
"kpi": (
item["stats"]["points"]
+ item["stats"]["defReb"]
+ item["stats"]["offReb"]
+ item["stats"]["assist"]
+ item["stats"]["steal"]
+ item["stats"]["block"]
+ item["stats"]["foulsOn"]
+ (item["stats"]["goal1"] - item["stats"]["shot1"])
+ (item["stats"]["goal2"] - item["stats"]["shot2"])
+ (item["stats"]["goal3"] - item["stats"]["shot3"])
- item["stats"]["turnover"]
- item["stats"]["foul"]
if item["stats"]
else 0
),
"time": (format_time(item["stats"]["second"]) if item["stats"] else "0:00"),
"pts1q": 0,
"pts2q": 0,
"pts3q": 0,
"pts4q": 0,
"pts1h": 0,
"pts2h": 0,
"Name1GFX": (item["firstName"].strip() if item["firstName"] else ""),
"Name2GFX": (item["lastName"].strip() if item["lastName"] else ""),
"photoGFX": (
os.path.join(
"D:\\Photos",
merged["result"]["league"]["abcName"],
merged["result"][who]["name"],
# LEAGUE,
# data[who],
f"{item['displayNumber']}.png",
)
if item["startRole"] == "Player"
else ""
),
# "season": text,
"isOnCourt": (item["stats"]["isOnCourt"] if item["stats"] else False),
# "AvgPoints": (
# row_player_season_avg["points"]
# if row_player_season_avg
# and row_player_season_avg["points"] != ""
# else "0.0"
# ),
# "AvgAssist": (
# row_player_season_avg["assist"]
# if row_player_season_avg
# and row_player_season_avg["assist"] != ""
# else "0.0"
# ),
# "AvgBlocks": (
# row_player_season_avg["blockShot"]
# if row_player_season_avg
# and row_player_season_avg["blockShot"] != ""
# else "0.0"
# ),
# "AvgDefRebound": (
# row_player_season_avg["defRebound"]
# if row_player_season_avg
# and row_player_season_avg["defRebound"] != ""
# else "0.0"
# ),
# "AvgOffRebound": (
# row_player_season_avg["offRebound"]
# if row_player_season_avg
# and row_player_season_avg["offRebound"] != ""
# else "0.0"
# ),
# "AvgRebound": (
# row_player_season_avg["rebound"]
# if row_player_season_avg
# and row_player_season_avg["rebound"] != ""
# else "0.0"
# ),
# "AvgSteal": (
# row_player_season_avg["steal"]
# if row_player_season_avg
# and row_player_season_avg["steal"] != ""
# else "0.0"
# ),
# "AvgTurnover": (
# row_player_season_avg["turnover"]
# if row_player_season_avg
# and row_player_season_avg["turnover"] != ""
# else "0.0"
# ),
# "AvgFoul": (
# row_player_season_avg["foul"]
# if row_player_season_avg
# and row_player_season_avg["foul"] != ""
# else "0.0"
# ),
# "AvgOpponentFoul": (
# row_player_season_avg["foulsOnPlayer"]
# if row_player_season_avg
# and row_player_season_avg["foulsOnPlayer"] != ""
# else "0.0"
# ),
# "AvgPlusMinus": (
# row_player_season_avg["plusMinus"]
# if row_player_season_avg
# and row_player_season_avg["plusMinus"] != ""
# else "0.0"
# ),
# "AvgDunk": (
# row_player_season_avg["dunk"]
# if row_player_season_avg
# and row_player_season_avg["dunk"] != ""
# else "0.0"
# ),
# "AvgKPI": "0.0",
# "AvgPlayedTime": (
# row_player_season_avg["playedTime"]
# if row_player_season_avg
# and row_player_season_avg["playedTime"] != ""
# else "0:00"
# ),
# "Shot1Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=1
# ),
# "Shot2Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=2
# ),
# "Shot3Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=3
# ),
# "Shot23Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=[2, 3]
# ),
# "TPoints": sum_stat_with_online(
# "points", sum_stat, item["stats"], online
# ),
# "TShots1": calc_total_shots_str(
# sum_stat, item["stats"], online, 1
# ),
# "TShots2": calc_total_shots_str(
# sum_stat, item["stats"], online, 2
# ),
# "TShots3": calc_total_shots_str(
# sum_stat, item["stats"], online, 3
# ),
# "TShots23": calc_total_shots_str(
# sum_stat, item["stats"], online, [2, 3]
# ),
# "TShot1Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=1
# ),
# "TShot2Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=2
# ),
# "TShot3Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=3
# ),
# "TShot23Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=[2, 3]
# ),
# "TAssist": sum_stat_with_online(
# "assist", sum_stat, item["stats"], online
# ),
# "TBlocks": sum_stat_with_online(
# "blockShot", sum_stat, item["stats"], online
# ),
# "TDefRebound": sum_stat_with_online(
# "defRebound", sum_stat, item["stats"], online
# ),
# "TOffRebound": sum_stat_with_online(
# "offRebound", sum_stat, item["stats"], online
# ),
# "TRebound": (
# sum_stat_with_online(
# "defRebound", sum_stat, item["stats"], online
# )
# + sum_stat_with_online(
# "offRebound", sum_stat, item["stats"], online
# )
# ),
# "TSteal": sum_stat_with_online(
# "steal", sum_stat, item["stats"], online
# ),
# "TTurnover": sum_stat_with_online(
# "turnover", sum_stat, item["stats"], online
# ),
# "TFoul": sum_stat_with_online(
# "foul", sum_stat, item["stats"], online
# ),
# "TOpponentFoul": sum_stat_with_online(
# "foulsOnPlayer", sum_stat, item["stats"], online
# ),
# "TPlusMinus": 0,
# "TDunk": sum_stat_with_online(
# "dunk", sum_stat, item["stats"], online
# ),
# "TKPI": 0,
# "TPlayedTime": sum_stat["playedTime"] if sum_stat else "0:00",
# "TGameCount": (
# safe_int(sum_stat["games"])
# if sum_stat and sum_stat.get("games") != ""
# else 0
# )
# + (1 if online else 0),
# "TStartCount": (
# safe_int(sum_stat["isStarts"])
# if sum_stat and sum_stat.get("isStarts", 0) != ""
# else 0
# ),
# "CareerTShots1": calc_total_shots_str(
# row_player_career_sum, item["stats"], online, 1
# ),
# "CareerTShots2": calc_total_shots_str(
# row_player_career_sum, item["stats"], online, 2
# ),
# "CareerTShots3": calc_total_shots_str(
# row_player_career_sum, item["stats"], online, 3
# ),
# "CareerTShots23": calc_total_shots_str(
# row_player_career_sum, item["stats"], online, [2, 3]
# ),
# "CareerTShot1Percent": calc_shot_percent_by_type(
# row_player_career_sum, item["stats"], online, 1
# ),
# "CareerTShot2Percent": calc_shot_percent_by_type(
# row_player_career_sum, item["stats"], online, 2
# ),
# "CareerTShot3Percent": calc_shot_percent_by_type(
# row_player_career_sum, item["stats"], online, 3
# ),
# "CareerTShot23Percent": calc_shot_percent_by_type(
# row_player_career_sum, item["stats"], online, [2, 3]
# ),
# "CareerTPoints": sum_stat_with_online(
# "points", row_player_career_sum, item["stats"], online
# ),
# "CareerTAssist": sum_stat_with_online(
# "assist", row_player_career_sum, item["stats"], online
# ),
# "CareerTBlocks": sum_stat_with_online(
# "blockShot", row_player_career_sum, item["stats"], online
# ),
# "CareerTDefRebound": sum_stat_with_online(
# "defRebound", row_player_career_sum, item["stats"], online
# ),
# "CareerTOffRebound": sum_stat_with_online(
# "offRebound", row_player_career_sum, item["stats"], online
# ),
# "CareerTRebound": (
# sum_stat_with_online(
# "defRebound",
# row_player_career_sum,
# item["stats"],
# online,
# )
# + sum_stat_with_online(
# "offRebound",
# row_player_career_sum,
# item["stats"],
# online,
# )
# ),
# "CareerTSteal": sum_stat_with_online(
# "steal", row_player_career_sum, item["stats"], online
# ),
# "CareerTTurnover": sum_stat_with_online(
# "turnover", row_player_career_sum, item["stats"], online
# ),
# "CareerTFoul": sum_stat_with_online(
# "foul", row_player_career_sum, item["stats"], online
# ),
# "CareerTOpponentFoul": sum_stat_with_online(
# "foulsOnPlayer",
# row_player_career_sum,
# item["stats"],
# online,
# ),
# "CareerTPlusMinus": 0, # оставить как есть
# "CareerTDunk": sum_stat_with_online(
# "dunk", row_player_career_sum, item["stats"], online
# ),
# "CareerTPlayedTime": (
# row_player_career_sum["playedTime"]
# if row_player_career_sum
# else "0:00"
# ),
# "CareerTGameCount": sum_stat_with_online(
# "games", row_player_career_sum, item["stats"], online
# )
# + (1 if online else 0),
# "CareerTStartCount": sum_stat_with_online(
# "isStarts", row_player_career_sum, item["stats"], online
# ), # если нужно, можно +1 при старте
# "AvgCarPoints": (
# row_player_career_avg["points"]
# if row_player_career_avg
# and row_player_career_avg["points"] != ""
# else "0.0"
# ),
# "AvgCarAssist": (
# row_player_career_avg["assist"]
# if row_player_career_avg
# and row_player_career_avg["assist"] != ""
# else "0.0"
# ),
# "AvgCarBlocks": (
# row_player_career_avg["blockShot"]
# if row_player_career_avg
# and row_player_career_avg["blockShot"] != ""
# else "0.0"
# ),
# "AvgCarDefRebound": (
# row_player_career_avg["defRebound"]
# if row_player_career_avg
# and row_player_career_avg["defRebound"] != ""
# else "0.0"
# ),
# "AvgCarOffRebound": (
# row_player_career_avg["offRebound"]
# if row_player_career_avg
# and row_player_career_avg["offRebound"] != ""
# else "0.0"
# ),
# "AvgCarRebound": (
# row_player_career_avg["rebound"]
# if row_player_career_avg
# and row_player_career_avg["rebound"] != ""
# else "0.0"
# ),
# "AvgCarSteal": (
# row_player_career_avg["steal"]
# if row_player_career_avg
# and row_player_career_avg["steal"] != ""
# else "0.0"
# ),
# "AvgCarTurnover": (
# row_player_career_avg["turnover"]
# if row_player_career_avg
# and row_player_career_avg["turnover"] != ""
# else "0.0"
# ),
# "AvgCarFoul": (
# row_player_career_avg["foul"]
# if row_player_career_avg
# and row_player_career_avg["foul"] != ""
# else "0.0"
# ),
# "AvgCarOpponentFoul": (
# row_player_career_avg["foulsOnPlayer"]
# if row_player_career_avg
# and row_player_career_avg["foulsOnPlayer"] != ""
# else "0.0"
# ),
# "AvgCarPlusMinus": (
# row_player_career_avg["plusMinus"]
# if row_player_career_avg
# and row_player_career_avg["plusMinus"] != ""
# else "0.0"
# ),
# "AvgCarDunk": (
# row_player_career_avg["dunk"]
# if row_player_career_avg
# and row_player_career_avg["dunk"] != ""
# else "0.0"
# ),
# "AvgCarKPI": "0.0",
# "AvgCarPlayedTime": (
# row_player_career_avg["playedTime"]
# if row_player_career_avg
# and row_player_career_avg["playedTime"] != ""
# else "0:00"
# ),
# "HeadCoachStatsCareer": HeadCoachStatsCareer,
# "HeadCoachStatsTeam": HeadCoachStatsTeam,
# # "PTS_Career_High": get_carrer_high(item["personId"], "points"),
# # "AST_Career_High": get_carrer_high(item["personId"], "assist"),
# # "REB_Career_High": get_carrer_high(item["personId"], "rebound"),
# # "STL_Career_High": get_carrer_high(item["personId"], "steal"),
# # "BLK_Career_High": get_carrer_high(item["personId"], "blockShot"),
}
team.append(player)
count_player = sum(1 for x in team if x["startRole"] == "Player")
# print(count_player)
if count_player < 12:
if team: # Check if team is not empty
empty_rows = [
{
key: (
False
if key in ["captain", "isStart", "isOnCourt"]
else (
0
if key
in [
"id",
"pts",
"weight",
"height",
"age",
"ast",
"stl",
"blk",
"blkVic",
"dreb",
"oreb",
"reb",
"to",
"foul",
"foulT",
"foulD",
"foulC",
"foulB",
"fouled",
"plusMinus",
"dunk",
"kpi",
]
else ""
)
)
for key in team[0].keys()
}
for _ in range((4 if count_player <= 4 else 12) - count_player)
]
team.extend(empty_rows)
role_priority = {
"Player": 0,
"": 1,
"Coach": 2,
"Team": 3,
None: 4,
"Other": 5, # на случай неизвестных
}
# print(team)
sorted_team = sorted(
team,
key=lambda x: role_priority.get(
x.get("startRole", 99), 99
), # 99 — по умолчанию
)
out_path = Path(out_dir) / f"{who}.json"
atomic_write_json(out_path, sorted_team)
logging.info("Сохранил payload: {out_path}")
top_sorted_team = sorted(
filter(lambda x: x["startRole"] in ["Player", ""], sorted_team),
key=lambda x: (
x["pts"],
x["dreb"] + x["oreb"],
x["ast"],
x["stl"],
x["blk"],
x["time"],
),
reverse=True,
)
for item in top_sorted_team:
item["pts"] = "" if item["num"] == "" else item["pts"]
item["foul"] = "" if item["num"] == "" else item["foul"]
out_path = Path(out_dir) / f"top{who.replace('t','T')}.json"
atomic_write_json(out_path, top_sorted_team)
logging.info("Сохранил payload: {out_path}")
started_team = sorted(
filter(
lambda x: x["startRole"] == "Player" and x["isOnCourt"] is True,
sorted_team,
),
key=lambda x: int(x["num"]),
reverse=False,
)
out_path = Path(out_dir) / f"started_{who}.json"
atomic_write_json(out_path, started_team)
logging.info("Сохранил payload: {out_path}")
def time_outs_func(data_pbp: list[dict]) -> tuple[str, int, str, int]:
"""
Вычисляет количество оставшихся таймаутов для обеих команд
и формирует строку состояния.
Args:
data_pbp: Список игровых событий (play-by-play).
Returns:
Кортеж: (строка команды 1, остаток, строка команды 2, остаток)
"""
timeout1 = []
timeout2 = []
for event in data_pbp:
if event.get("play") == 23:
if event.get("startNum") == 1:
timeout1.append(event)
elif event.get("startNum") == 2:
timeout2.append(event)
def timeout_status(timeout_list: list[dict], last_event: dict) -> tuple[str, int]:
period = last_event.get("period", 0)
sec = last_event.get("sec", 0)
if period < 3:
timeout_max = 2
count = sum(1 for t in timeout_list if t.get("period", 0) <= period)
quarter = "1st half"
elif period < 5:
count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period)
quarter = "2nd half"
if period == 4 and sec >= 4800 and count in (0, 1):
timeout_max = 2
else:
timeout_max = 3
else:
timeout_max = 1
count = sum(1 for t in timeout_list if t.get("period", 0) == period)
quarter = f"OverTime {period - 4}"
left = max(0, timeout_max - count)
word = "Time-outs" if left != 1 else "Time-out"
text = f"{left if left != 0 else 'No'} {word} left in {quarter}"
return text, left
if not data_pbp:
return "", 0, "", 0
last_event = data_pbp[-1]
t1_str, t1_left = timeout_status(timeout1, last_event)
t2_str, t2_left = timeout_status(timeout2, last_event)
return t1_str, t1_left, t2_str, t2_left
def add_data_for_teams(new_data: list[dict]) -> tuple[float, list, float]:
"""
Возвращает усреднённые статистики команды:
- средний возраст
- очки со старта и скамейки + их доли
- средний рост
Args:
new_data (list[dict]): Список игроков с полями "startRole", "stats", "age", "height"
Returns:
tuple: (avg_age: float, points: list, avg_height: float)
"""
players = [item for item in new_data if item.get("startRole") == "Player"]
points_start = 0
points_bench = 0
total_age = 0
total_height = 0
player_count = len(players)
for player in players:
stats = player.get("stats")
if stats:
is_start = stats.get("isStart")
# Очки
if is_start is True:
points_start += stats.get("points", 0)
elif is_start is False:
points_bench += stats.get("points", 0)
# Возраст и рост
total_age += player.get("age", 0) or 0
total_height += player.get("height", 0) or 0
total_points = points_start + points_bench
points_start_pro = (
f"{round(points_start * 100 / total_points)}%" if total_points else "0%"
)
points_bench_pro = (
f"{round(points_bench * 100 / total_points)}%" if total_points else "0%"
)
avg_age = round(total_age / player_count, 1) if player_count else 0
avg_height = round(total_height / player_count, 1) if player_count else 0
points = [points_start, points_start_pro, points_bench, points_bench_pro]
return avg_age, points, avg_height
def add_new_team_stat(
data: dict,
avg_age: float,
points: float,
avg_height: float,
timeout_str: str,
timeout_left: str,
) -> dict:
"""
Добавляет в словарь команды форматированную статистику.
Все значения приводятся к строкам.
Args:
data: Исходная статистика команды.
avg_age: Средний возраст команды (строка).
points: Кортеж из 4 строк: ptsStart, ptsStart_pro, ptsBench, ptsBench_pro.
avg_height: Средний рост (в см).
timeout_str: Строка отображения таймаутов.
timeout_left: Остаток таймаутов.
Returns:
Обновлённый словарь `data` с новыми ключами.
"""
def safe_int(v): # Локальная защита от ValueError/TypeError
try:
return int(v)
except (ValueError, TypeError):
return 0
def format_percent(goal, shot):
goal, shot = safe_int(goal), safe_int(shot)
return f"{round(goal * 100 / shot)}%" if shot else "0%"
goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1"))
goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2"))
goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3"))
def_reb = safe_int(data.get("defReb"))
off_reb = safe_int(data.get("offReb"))
data.update(
{
"pt-1": f"{goal1}/{shot1}",
"pt-2": f"{goal2}/{shot2}",
"pt-3": f"{goal3}/{shot3}",
"fg": f"{goal2 + goal3}/{shot2 + shot3}",
"pt-1_pro": format_percent(goal1, shot1),
"pt-2_pro": format_percent(goal2, shot2),
"pt-3_pro": format_percent(goal3, shot3),
"fg_pro": format_percent(goal2 + goal3, shot2 + shot3),
"Reb": str(def_reb + off_reb),
"avgAge": str(avg_age),
"ptsStart": str(points[0]),
"ptsStart_pro": str(points[1]),
"ptsBench": str(points[2]),
"ptsBench_pro": str(points[3]),
"avgHeight": f"{avg_height} cm",
"timeout_left": str(timeout_left),
"timeout_str": str(timeout_str),
}
)
# Приводим все значения к строкам, если нужно строго для сериализации
for k in data:
data[k] = str(data[k])
return data
stat_name_list = [
("points", "Очки", "points"),
("pt-1", "Штрафные", "free throws"),
("pt-1_pro", "штрафные, процент", "free throws pro"),
("pt-2", "2-очковые", "2-points"),
("pt-2_pro", "2-очковые, процент", "2-points pro"),
("pt-3", "3-очковые", "3-points"),
("pt-3_pro", "3-очковые, процент", "3-points pro"),
("fg", "очки с игры", "field goals"),
("fg_pro", "Очки с игры, процент", "field goals pro"),
("assist", "Передачи", "assists"),
("pass", "", ""),
("defReb", "подборы в защите", ""),
("offReb", "подборы в нападении", ""),
("Reb", "Подборы", "rebounds"),
("steal", "Перехваты", "steals"),
("block", "Блокшоты", "blocks"),
("blocked", "", ""),
("turnover", "Потери", "turnovers"),
("foul", "Фолы", "fouls"),
("foulsOn", "", ""),
("foulT", "", ""),
("foulD", "", ""),
("foulC", "", ""),
("foulB", "", ""),
("second", "секунды", "seconds"),
("dunk", "данки", "dunks"),
("fastBreak", "", "fast breaks"),
("plusMinus", "+/-", "+/-"),
("avgAge", "", "avg Age"),
("ptsBench", "", "Bench PTS"),
("ptsBench_pro", "", "Bench PTS, %"),
("ptsStart", "", "Start PTS"),
("ptsStart_pro", "", "Start PTS, %"),
("avgHeight", "", "avg height"),
("timeout_left", "", "timeout left"),
("timeout_str", "", "timeout str"),
]
def Team_Both_Stat(merged: dict, *, out_dir: str = "static") -> None:
"""
Обновляет файл team_stats.json, содержащий сравнение двух команд.
Аргументы:
stop_event (threading.Event): Событие для остановки цикла.
"""
logger.info("START making json for team statistics")
try:
teams = merged["result"]["teams"]
plays = merged["result"].get("plays", [])
# Разделение команд
team_1 = next((t for t in teams if t["teamNumber"] == 1), None)
team_2 = next((t for t in teams if t["teamNumber"] == 2), None)
if not team_1 or not team_2:
logger.warning("Не найдены обе команды в данных")
# time.sleep()
# Таймауты
timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays)
# Возраст, очки, рост
avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1.get("starts", []))
avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2.get("starts", []))
if not team_1.get("total") or not team_2.get("total"):
logger.debug("Нет total у команд — пропускаю перезапись team_stats.json")
# Форматирование общей статистики (как и было)
total_1 = add_new_team_stat(
team_1["total"],
avg_age_1,
points_1,
avg_height_1,
timeout_str1,
timeout_left1,
)
total_2 = add_new_team_stat(
team_2["total"],
avg_age_2,
points_2,
avg_height_2,
timeout_str2,
timeout_left2,
)
# Финальный JSON
result_json = []
for key in total_1:
val1 = (
int(total_1[key]) if isinstance(total_1[key], float) else total_1[key]
)
val2 = (
int(total_2[key]) if isinstance(total_2[key], float) else total_2[key]
)
stat_rus, stat_eng = "", ""
for s in stat_name_list:
if s[0] == key:
stat_rus, stat_eng = s[1], s[2]
break
result_json.append(
{
"name": key,
"nameGFX_rus": stat_rus,
"nameGFX_eng": stat_eng,
"val1": val1,
"val2": val2,
}
)
out_path = Path(out_dir) / "team_stats.json"
atomic_write_json(out_path, result_json)
logging.info("Сохранил payload: {out_path}")
logger.debug("Успешно записаны данные в team_stats.json")
except Exception as e:
logger.error(f"Ошибка при обработке командной статистики: {e}", exc_info=True)
def Referee(merged: dict, *, out_dir: str = "static") -> None:
"""
Поток, создающий JSON-файл с информацией о судьях матча.
"""
logger.info("START making json for referee")
desired_order = [
"Crew chief",
"Referee 1",
"Referee 2",
"Commissioner",
"Ст.судья",
"Судья 1",
"Судья 2",
"Комиссар",
]
try:
# Найти судей (teamNumber == 0)
team_ref = next(
(t for t in merged["result"]["teams"] if t["teamNumber"] == 0), None
)
if not team_ref:
logger.warning("Не найдена судейская бригада в данных.")
referees_raw = team_ref.get("starts", [])
# print(referees_raw)
referees = []
for r in referees_raw:
flag_code = r.get("countryId", "").lower() if r.get("countryName") else ""
referees.append(
{
"displayNumber": r.get("displayNumber", ""),
"positionName": r.get("positionName", ""),
"lastNameGFX": f"{r.get('firstName', '')} {r.get('lastName', '')}".strip(),
"secondName": r.get("secondName", ""),
"birthday": r.get("birthday", ""),
"age": r.get("age", 0),
"flag": f"https://flagicons.lipis.dev/flags/4x3/{flag_code}.svg",
}
)
# Сортировка по позиции
referees = sorted(
referees,
key=lambda x: (
desired_order.index(x["positionName"])
if x["positionName"] in desired_order
else len(desired_order)
),
)
out_path = Path(out_dir) / "referee.json"
atomic_write_json(out_path, referees)
logging.info("Сохранил payload: {out_path}")
except Exception as e:
logger.error(f"Ошибка в Referee потоке: {e}", exc_info=True)
def Scores_Quarter(merged: dict, *, out_dir: str = "static") -> None:
"""
Поток, обновляющий JSON со счётом по четвертям.
"""
logger.info("START making json for scores quarter")
quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"]
score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters]
try:
# Сначала пробуем fullScore
full_score_str = merged.get("result", {}).get("game", {}).get("fullScore", "")
if full_score_str:
full_score_list = full_score_str.split(",")
for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]):
parts = score_str.split(":")
if len(parts) == 2:
score_by_quarter[i]["score1"] = parts[0]
score_by_quarter[i]["score2"] = parts[1]
logger.info("Счёт по четвертям получен из fullScore.")
# Если нет fullScore, пробуем scoreByPeriods
elif "scoreByPeriods" in merged.get("result", {}):
periods = merged["result"]["scoreByPeriods"]
for i, score in enumerate(periods[: len(score_by_quarter)]):
score_by_quarter[i]["score1"] = str(score.get("score1", ""))
score_by_quarter[i]["score2"] = str(score.get("score2", ""))
logger.info("Счёт по четвертям получен из scoreByPeriods.")
else:
logger.debug("Нет данных по счёту, сохраняем пустые значения.")
out_path = Path(out_dir) / "scores.json"
atomic_write_json(out_path, score_by_quarter)
logging.info("Сохранил payload: {out_path}")
except Exception as e:
logger.error(f"Ошибка в Scores_Quarter: {e}", exc_info=True)
def status_online_func(merged: dict, *, out_dir: str = "static") -> None:
"""
Получает онлайн-статус игры и возвращает данные + путь к PNG-фолам.
"""
try:
out_path = Path(out_dir) / "live_status.json"
if "live_status" in merged["result"]:
status_data = merged["result"]["live_status"]
atomic_write_json(out_path, [status_data])
else:
logger.warning("Матч не ОНЛАЙН!!!!")
atomic_write_json(
out_path,
[
{
"foulsA": 0,
"foulsB": 0,
}
],
)
logging.info("Сохранил payload: {out_path}")
except Exception as e:
logger.error(f"Ошибка в status_online_func: {e}", exc_info=True)
return None
def Standing_func(league, season, lang, stop_event: threading.Event, out_dir: str = "static") -> None:
logger.info("START making json for standings")
while not stop_event.is_set():
try:
url = URL_STANDINGS.format(host=HOST, league=league, season=season, lang=lang)
data_standings = fetch_json(url)
if data_standings and "items" in data_standings and data_standings["items"]:
standings_temp = data_standings["items"]
for item in standings_temp:
if "standings" in item and item["standings"] != []:
standings_temp = item["standings"]
df = pd.json_normalize(standings_temp)
del df["scores"]
if not df["totalWin"].isna().all():
df["w_l"] = (
df["totalWin"].astype(str)
+ " / "
+ df["totalDefeat"].astype(str)
)
df["procent"] = df.apply(
lambda row: (
0
if row["w_l"] == "0 / 0"
or row["totalGames"] == 0
or pd.isna(row["totalWin"])
else round(
row["totalWin"] * 100 / row["totalGames"]
+ 0.000005
)
),
axis=1,
)
df["plus_minus"] = (
df["totalGoalPlus"] - df["totalGoalMinus"]
)
filepath = os.path.join(
out_dir,
f"standings_{league}_{item['comp']['name'].replace(' ', '_')}.json",
)
df.to_json(
filepath,
orient="records",
force_ascii=False,
indent=4,
)
logger.info("Standings data saved successfully.")
elif "playoffPairs" in item and item["playoffPairs"] != []:
standings_temp = item["playoffPairs"]
df = pd.json_normalize(standings_temp)
filepath = os.path.join(
out_dir,
f"standings_{league}_{item['comp']['name'].replace(' ', '_')}.json",
)
df.to_json(
filepath,
orient="records",
force_ascii=False,
indent=4,
)
logger.info("Standings data saved successfully.")
except Exception as e:
logger.warning(f"Ошибка в турнирном положении: {e}")
stop_event.wait(TIMEOUT_DATA_OFF)
# ==========================
# ---- ДОМЕННАЯ ЛОГИКА
# ==========================
def validate_league_or_die(league: str) -> str:
league = (league or DEFAULT_LEAGUE).lower().strip()
if league not in ALLOWED_LEAGUES:
logger.warning(
f"Неверный тег лиги: '{league}'. Допустимо: {sorted(ALLOWED_LEAGUES)}"
)
sys.exit(2)
return league
def get_last_season_or_die(league: str, lang: str) -> str:
url = URL_SEASON.format(host=HOST, league=league, lang=lang)
try:
data = fetch_json(url)
season = extract_last_season(data)
logging.info(f"Последний сезон для {league}: {season}")
return season
except Exception as e:
logger.warning(f"Не получилось получить последний сезон для {league}: {e}")
sys.exit(3)
def get_team_schedule_or_die(
league: str, season: str, team: str, lang: str
) -> list[dict]:
url = URL_SCHEDULE.format(host=HOST, league=league, season=season, lang=lang)
try:
data = fetch_json(url)
team_games = extract_team_schedule_for_season(data, team)
if not team_games:
logger.warning(f"Для команды {team} не найдено игр в сезоне {season}.")
return team_games
except Exception as e:
logger.warning(f"Не получилось получить расписание {league}/{season}: {e}")
return []
def pick_today_or_last_played(
team_games: list[dict], now: datetime
) -> tuple[dict | None, dict | None]:
"""
Возвращает (сегодняшняя игра, последний сыгранный матч).
"""
today = now.date()
games_sorted = sorted(team_games, key=parse_game_start_dt)
today_game = None
last_played = None
for g in games_sorted:
start = parse_game_start_dt(g)
status = g.get("game", {}).get("gameStatus", "").lower()
if start.date() == today and today_game is None:
today_game = g
if start <= now and status == "resultconfirmed":
last_played = g
return today_game, last_played
def is_game_online(league: str, game_id: str, lang: str) -> str:
"""
Возвращает статус: inprogress|scheduled|finished (или то, что твой API даёт).
"""
url = URL_GAME.format(host=HOST, league=league, game_id=game_id, lang=lang)
data = fetch_json(url)
out_path = Path("static") / "game.json"
atomic_write_json(out_path, data)
return extract_game_status(data)
class PostProcessor:
def __init__(self):
self.q = queue.Queue(maxsize=1)
self._t = threading.Thread(target=self._worker, daemon=True)
self._stop = threading.Event()
self._t.start()
def submit(self, merged):
# кладём только «последний» payload
try:
# если очередь занята, выкидываем старое задание
while True:
self.q.get_nowait()
except queue.Empty:
pass
# не блокируем: если за эту миллисекунду кто-то положил — просто заменим в следующий раз
try:
self.q.put_nowait(merged)
except queue.Full:
pass
def _worker(self):
while not self._stop.is_set():
merged = self.q.get()
try:
Json_Team_Generation(merged, out_dir="static", who="team1")
Json_Team_Generation(merged, out_dir="static", who="team2")
Team_Both_Stat(merged, out_dir="static")
Referee(merged, out_dir="static")
Scores_Quarter(merged, out_dir="static")
status_online_func(merged, out_dir="static")
except Exception as e:
logging.exception(f"Postproc failed: {e}")
def stop(self):
self._stop.set()
class OnlinePoller:
def __init__(
self, league: str, game_id: str, lang: str, on_update: callable | None = None
):
self.league = league
self.game_id = game_id
self.lang = lang
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._log = logging.info("start")
self._on_update = on_update
self._post = PostProcessor()
# 1) Постоянная сессия и пул соединений
self._session = requests.Session()
retry = Retry(
total=2,
connect=2,
read=2,
backoff_factor=0.1,
status_forcelist=(502, 503, 504),
allowed_methods=frozenset(["GET"]),
)
adapter = HTTPAdapter(pool_connections=1, pool_maxsize=10, max_retries=retry)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
self._session.headers.update(
{
"Connection": "keep-alive",
"Accept": "application/json, */*",
"Accept-Encoding": "gzip, deflate, br",
"User-Agent": "game-watcher/1.0",
}
)
def stop(self):
if self._thread and self._thread.is_alive():
self._stop_event.set()
self._thread.join(timeout=2)
# self._log.info(f"Онлайн-поллер для игры {self.game_id} остановлен.")
self._thread = None
try:
self._session.close()
except Exception:
pass
try:
self._post.stop()
except Exception:
pass
def _run(self):
# Исполнитель для параллельных GET
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
while not self._stop_event.is_set():
started = time.perf_counter()
try:
futures = [
pool.submit(
fetch_box_score,
self.league,
self.game_id,
self.lang,
self._session,
),
pool.submit(
fetch_play_by_play,
self.league,
self.game_id,
self.lang,
self._session,
),
pool.submit(
fetch_live_status,
self.league,
self.game_id,
self.lang,
self._session,
),
]
bs, pbp, ls = (f.result() for f in futures)
merged = ensure_merged_payload(
None,
box_score=bs,
play_by_play=pbp,
live_status=ls,
game_meta={"id": self.game_id, "league": self.league},
)
# print(merged)
# внешний коллбек, если задан
if self._on_update:
self._on_update(merged)
# твоя общая обработка + сохранение
self._post.submit(merged)
logger.debug(
"Обновления online: box-score(%s keys), pbp(%s keys), live-status(%s keys)",
len(bs) if isinstance(bs, dict) else "",
len(pbp) if isinstance(pbp, dict) else "",
len(ls) if isinstance(ls, dict) else "",
)
except Exception as e:
logger.warning(f"Сбой online-поллера для игры {self.game_id}: {e}")
# лёгкая задержка после ошибки, но не «наказание» на целую секунду
time.sleep(0.2)
# Точное выдерживание частоты: «1 цикл в секунду»
elapsed = time.perf_counter() - started
rest = ONLINE_FETCH_INTERVAL_SEC - elapsed
if rest > 0:
# спим только остаток
self._stop_event.wait(rest)
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run,
name=f"poller-{self.game_id}",
daemon=True,
)
self._thread.start()
self._log.info(f"Онлайн-поллер для игры {self.game_id} запущен.")
def monitor_game_loop(
league: str, game_id: str, lang: str, stop_event: threading.Event
) -> None:
logger.info(f"Старт мониторинга игры {game_id} ({league}).")
poller = OnlinePoller(league, game_id, lang)
was_online = False
while not stop_event.is_set():
try:
status = is_game_online(league, game_id, lang)
# print(status)
is_online = status in {"scheduled", "online"}
is_finished = status in {"resultconfirmed", "result"}
if is_finished:
logger.info(f"Матч {game_id} завершён.\nОстанавливаем мониторинг.")
break
if is_online and not was_online:
logger.info(
f"Матч {game_id} перешёл в онлайн.\nЗапускаем быстрый опрос (1 сек)."
)
poller.start()
elif not is_online and was_online:
logger.info(
f"Матч {game_id} вышел из онлайна (или ещё не стартовал).\nОстанавливаем быстрый опрос."
)
poller.stop()
was_online = is_online
# Проверяем статус снова через минуту
stop_event.wait(STATUS_CHECK_INTERVAL_SEC)
except Exception as e:
logger.warning(f"Сбой проверки статуса матча {game_id}: {e}")
# При ошибке — не дергаем быстро, подождём немного и повторим
stop_event.wait(POLL_INTERVAL_OFFLINE_SEC)
# Гарантированно остановим быстрый опрос при завершении
poller.stop()
logger.info(f"Мониторинг матча {game_id} остановлен.")
def next_midnight_local(now: datetime) -> datetime:
tomorrow = (now + timedelta(days=1)).date()
return datetime.combine(tomorrow, datetime.min.time(), tzinfo=APP_TZ) + timedelta(
minutes=5
)
# return now + timedelta(seconds=30)
def daily_rollover_loop(
league: str,
team: str,
lang: str,
season_getter,
schedule_getter,
monitor_mgr,
stop_event: threading.Event,
):
"""
Каждый день в ~00:05 по Europe/Moscow:
- узнаём актуальный сезон
- заново тянем расписание
- выбираем сегодняшнюю игру или последний сыгранный
- при наличии сегодняшней игры — перезапускаем монитор на неё
"""
while not stop_event.is_set():
now = datetime.now(APP_TZ)
wakeup_at = next_midnight_local(now)
seconds = (wakeup_at - now).total_seconds()
logger.info(
# f"Ежедневка: проснусь {datetime.fromisoformat(wakeup_at.isoformat())} (через {int(seconds)} сек)."
f"Ежедневная проверка матча:\nпроснусь {wakeup_at.strftime('%Y-%m-%d %H:%M:%S')} (через {int(seconds)} сек)."
)
if stop_event.wait(seconds):
break
# Выполняем ежедневную проверку
try:
season = season_getter(league, lang)
games = schedule_getter(league, season, team, lang)
if not games:
logger.info(
f"Ежедневная проверка:\nу {team} нет игр в расписании сезона {season}."
)
continue
today_game, last_played = pick_today_or_last_played(
games, datetime.now(APP_TZ)
)
if today_game:
gid = today_game["game"]["id"]
logger.info(
f"Сегодня у {team} есть игра: gameID={gid}. \nПерезапуск мониторинга."
)
monitor_mgr.restart(gid, lang)
elif last_played:
gid = last_played["game"]["id"]
logger.info(
f"Сегодня у {team} нет игры. \nПоследняя сыгранная: gameID={gid}.\nМониторинг НЕ запускаем."
)
else:
logger.info(f"Сегодня у {team} нет игры и нет предыдущих сыгранных.")
except Exception as e:
logger.warning(f"Ошибка ежедневной проверки: {e}")
class MonitorManager:
"""
Управляет потоком мониторинга, чтобы можно было
безопасно перезапускать на новый gameId.
"""
def __init__(self, league: str):
self.league = league
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._lock = threading.Lock()
def restart(self, game_id: str, lang: str):
with self._lock:
self.stop()
self._stop_event = threading.Event()
self._thread = threading.Thread(
target=monitor_game_loop,
args=(self.league, game_id, lang, self._stop_event),
name=f"monitor-{game_id}",
daemon=True,
)
self._thread.start()
def stop(self):
if self._thread and self._thread.is_alive():
self._stop_event.set()
self._thread.join(timeout=5)
self._thread = None
# ==========================
# ---- MAIN
# ==========================
def main():
# global MYHOST
parser = argparse.ArgumentParser(description="Game watcher")
parser.add_argument("--league", type=str, default=DEFAULT_LEAGUE, help="тег лиги")
parser.add_argument(
"--team", type=str, required=True, help="код/тег команды (например, BOS)"
)
parser.add_argument("--lang", type=str, default="en", help="язык получения данных")
parser.add_argument(
"--log-level", type=str, default="INFO", help="DEBUG|INFO|WARNING|ERROR"
)
args = parser.parse_args()
print(args)
# logger.info(f"Запуск программы пользователем: {MYHOST}")
logger.info(
f"Запуск с параметрами:\nleague={args.league}\nteam={args.team}\nlang={args.lang}"
)
league = validate_league_or_die(args.league)
team = args.team.lower()
# 1) Узнать последний сезон
season = get_last_season_or_die(league, args.lang)
# 2) Получить расписание для команды
team_games = get_team_schedule_or_die(league, season, team, args.lang)
if not team_games:
logger.warning("Расписание пустое — работа завершена.")
sys.exit(4)
# 3) Найти сегодняшнюю или последнюю сыгранную игру
now = datetime.now(APP_TZ)
today_game, last_played = pick_today_or_last_played(team_games, now)
monitor_mgr = MonitorManager(league=league)
if today_game:
# В исходном расписании предполагалось наличие game.id
game_id = today_game["game"]["id"]
logger.info(
f"Сегодня у {team} есть игра: gameID={game_id}.\nЗапускаю мониторинг."
)
monitor_mgr.restart(game_id, args.lang)
else:
if last_played:
game_id = last_played["game"]["id"]
try:
url = URL_GAME.format(
host=HOST, league=league, game_id=game_id, lang=args.lang
)
game_json = fetch_json(url)
merged = ensure_merged_payload(
game_json,
game_meta={
"id": game_json.get("result", {}).get("gameId"),
"league": args.league,
},
)
Json_Team_Generation(merged, out_dir="static", who="team1")
Json_Team_Generation(merged, out_dir="static", who="team2")
Team_Both_Stat(merged, out_dir="static")
Referee(merged, out_dir="static")
Scores_Quarter(merged, out_dir="static")
status_online_func(merged, out_dir="static")
# print(merged)
logger.info(
f"Сегодня у {team} нет игры.\nПоследняя сыгранная: gameID={game_id}.\nМониторинг не запускаю."
)
except Exception as e:
logging.exception(
f"Оффлайн-сохранение для gameID={game_id}\nупало: {e}"
)
else:
logger.info(f"Сегодня у {team} нет игры и нет предыдущих сыгранных.")
# 4) Ежедневная перекладка расписания
stop_event = threading.Event()
rollover_thread = threading.Thread(
target=daily_rollover_loop,
args=(
league,
team,
args.lang,
get_last_season_or_die,
get_team_schedule_or_die,
monitor_mgr,
stop_event,
),
name="daily-rollover",
daemon=True,
)
rollover_thread.start()
# 1.1) турнирная таблица
threads = [
threading.Thread(
target=Standing_func,
args=(league, season, args.lang, stop_event),
name="standings",)]
for t in threads:
t.start()
logger.debug(f"Поток {t.name} запущен.")
# Держим главный поток живым
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Завершение по Ctrl+C…")
stop_event.set()
for t in threads:
t.join()
logger.debug(f"Поток {t.name} завершён.")
finally:
stop_event.set()
monitor_mgr.stop()
rollover_thread.join(timeout=5)
logger.info("Остановлено.")
if __name__ == "__main__":
main()