Files
RFB/get_data.py

1484 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Шаблон: мониторинг онлайн-матча по лиге/команде
Функции:
1) Валидация тега (--league), иначе: лог + сообщение в Telegram и завершение.
2) Получение номера последнего сезона из JSON по ссылке. Ошибки -> лог + Telegram.
3) Загрузка расписания по ссылке, поиск игры на сегодня для --team.
- Если сегодня нет матча — берём последний сыгранный (если есть), иначе лог + Telegram.
- Если сегодня есть игра — запускаем поток-монитор:
* статус «онлайн?» проверяем РАЗ В МИНУТУ;
* при статусе онлайн — КАЖДУЮ СЕКУНДУ дергаем три запроса:
box-score, play-by-play, live-status.
* если не онлайн — ждём минуту до следующей проверки.
4) Ежедневная перекладка: каждый следующий день повторно проверяем расписание
для команды. Если матча нет — один раз подгружаем последний сыгранный и ждём следующего дня.
ЗАМЕТКИ:
- Заполни URL_* и функции-экстракторы JSON под свой формат.
- Для Telegram используй переменные окружения TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID.
"""
from __future__ import annotations
import os
import sys
import time
import json
import argparse
import logging
import logging.config
import threading
import concurrent.futures
import queue
import platform
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from typing import Any, Dict, List
import tempfile
from pathlib import Path
import requests
# ==========================
# ---- НАСТРОЙКИ/КОНСТАНТЫ
# ==========================
APP_TZ = ZoneInfo("Europe/Moscow")
# Разрешённые теги лиг
ALLOWED_LEAGUES = {
"vtb", # Единая Лига ВТБ
"vtbyouth", # Молодежка ВТБ
"vtb-supercup", # Супер-Кубок ЕЛ ВТБ
"msl", # Высшая лига. Мужчины
"mcup", # Кубок России. Мужчины
"wpremier", # Премьер-Лига. Женщины
"wsl", # Суперлига. Женщины
"whl", # Высшая лига. Женщины
"wcup", # Кубок России. Женщины
"dubl-b", # Дюбл до 19 лет
# добавляй свои…
}
DEFAULT_LEAGUE = "vtb"
DEFAULT_LANG = "en"
# URL-шаблоны (замени на реальные)
HOST = "ref.russiabasket.org"
URL_SEASON = "https://{host}/api/abc/comps/seasons?Tag={league}&Lang={lang}" # вернёт JSON со списком сезонов
URL_SCHEDULE = (
"https://{host}/api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000" # расписание лиги (или команды)
)
# Статус конкретной игры (используется для проверки "онлайн?" раз в минуту)
URL_GAME = "https://{host}/api/abc/games/game?Id={game_id}&lang={lang}"
# Быстрые запросы, когда матч онлайн (каждую секунду)
URL_BOX_SCORE = "https://{host}/api/abc/games/box-score?Id={game_id}&lang={lang}"
URL_PLAY_BY_PLAY = "https://{host}/api/abc/games/play-by-play?Id={game_id}&lang={lang}"
URL_LIVE_STATUS = "https://{host}/api/abc/games/live-status?Id={game_id}&lang={lang}"
# Интервалы опроса
STATUS_CHECK_INTERVAL_SEC = 60 # проверять "онлайн?" раз в минуту
ONLINE_FETCH_INTERVAL_SEC = 1 # когда матч онлайн, дергать три запроса каждую секунду
POLL_INTERVAL_OFFLINE_SEC = 300 # резервный интервал сна при ошибках/до старта
TELEGRAM_BOT_TOKEN = "7639240596:AAH0YtdQoWZSC-_R_EW4wKAHHNLIA0F_ARY"
TELEGRAM_CHAT_ID = 228977654
MYHOST = platform.node()
LOG_CONFIG = {
"version": 1,
"handlers": {
"telegram": {
"class": "telegram_handler.TelegramHandler",
"level": "INFO",
"token": TELEGRAM_BOT_TOKEN,
"chat_id": TELEGRAM_CHAT_ID,
"formatter": "telegram",
},
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "simple",
"filename": f"logs/GFX_{MYHOST}.log",
"encoding": "utf-8",
},
},
"loggers": {
__name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"},
},
"formatters": {
"telegram": {
"class": "telegram_handler.HtmlFormatter",
"format": f"%(levelname)s [{MYHOST.upper()}] %(message)s",
"use_emoji": "True",
},
"simple": {
"class": "logging.Formatter",
"format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s",
"datefmt": "%d.%m.%Y %H:%M:%S",
},
},
}
logging.config.dictConfig(LOG_CONFIG)
logger = logging.getLogger(__name__)
logger.handlers[2].formatter.use_emoji = True
# ==========================
# ---- УТИЛИТЫ
# ==========================
def fetch_json(url: str, params: dict | None = None, session: requests.Session | None = None) -> dict:
"""
GET JSON с таймаутом и внятными ошибками.
Использует переданный session для keep-alive.
"""
sess = session or requests
try:
r = sess.get(url, params=params, timeout=(3.0, 4.0)) # (connect, read)
r.raise_for_status()
return r.json()
except requests.HTTPError as he:
raise RuntimeError(f"HTTP {he.response.status_code} для {url}") from he
except requests.RequestException as re:
raise RuntimeError(f"Сетевой сбой для {url}: {re}") from re
except json.JSONDecodeError as je:
raise RuntimeError(f"Некорректный JSON на {url}: {je}") from je
# ==========================
# ---- ЭКСТРАКТОРЫ ИЗ JSON
# ==========================
def extract_last_season(data: dict) -> str:
"""
Вытаскиваем последний сезон, в списке он первый
"""
try:
seasons = data["items"]
if not seasons:
raise ValueError("пустой список сезонов")
last = seasons[0]
season_id = last["season"]
return str(season_id)
except Exception as e:
raise RuntimeError(f"Не удалось извлечь последний сезон: {e}") from e
def extract_team_schedule_for_season(data: dict, team_code: str) -> list[dict]:
"""
Верни список игр команды. Адаптируй ключи под реальный JSON.
Предполагаем формат игр:
{
"gameId": "12345",
"home": "BOS",
"away": "LAL",
"startTimeUTC": "2025-10-23T18:00:00Z",
"status": "finished|scheduled|inprogress"
}
"""
try:
games = data["items"]
team_games = [
g for g in games if g.get("team1").get("name").lower() == team_code.lower()
]
return team_games
except Exception as e:
raise RuntimeError(f"Не удалось извлечь расписание команды: {e}") from e
def parse_game_start_dt(item: dict) -> datetime:
"""
Достаёт дату/время начала матча из объекта расписания и приводит к APP_TZ.
Приоритет полей:
1) game.defaultZoneDateTime — уже в "дефолтной зоне" лиги (например, +03:00)
2) game.scheduledTime — ISO 8601 с оффсетом (например, 2025-09-30T19:00:00+04:00)
3) game.startTime — если API когда-то его заполняет
4) (fallback) game.localDate + game.localTime — считаем, что это локальное время площадки, задаём tz=APP_TZ
Возвращает aware-datetime в APP_TZ.
"""
g = item.get("game", {}) if "game" in item else item
raw = g.get("defaultZoneDateTime") or g.get("scheduledTime") or g.get("startTime")
if raw:
try:
dt = datetime.fromisoformat(raw) # ISO-8601
return dt.astimezone(APP_TZ)
except Exception as e:
raise RuntimeError(f"Ошибка парсинга ISO времени '{raw}': {e}")
# Fallback: localDate + localTime (пример: "30.09.2025" + "19:00")
ld, lt = g.get("localDate"), g.get("localTime")
if ld and lt:
try:
naive = datetime.strptime(f"{ld} {lt}", "%d.%m.%Y %H:%M")
aware = naive.replace(tzinfo=APP_TZ)
return aware
except Exception as e:
raise RuntimeError(f"Ошибка парсинга localDate/localTime '{ld} {lt}': {e}")
raise RuntimeError("Не найдено ни одного подходящего поля времени (defaultZoneDateTime/scheduledTime/startTime/localDate+localTime).")
def extract_game_status(data: dict) -> str:
"""
Ожидаем JSON вида {"status":"inprogress|scheduled|finished"}
"""
try:
return str(data["result"]["status"]["id"]).lower()
except Exception as e:
raise RuntimeError(f"Не удалось извлечь статус матча: {e}") from e
# ==========================
# ---- ДОП. ЗАПРОСЫ ПРИ ОНЛАЙНЕ
# ==========================
def fetch_box_score(league: str, game_id: str, lang: str, session: requests.Session | None = None) -> dict:
url = URL_BOX_SCORE.format(host=HOST, league=league, game_id=game_id, lang=lang)
return fetch_json(url, session=session)
def fetch_play_by_play(league: str, game_id: str, lang: str, session: requests.Session | None = None) -> dict:
url = URL_PLAY_BY_PLAY.format(host=HOST, league=league, game_id=game_id, lang=lang)
return fetch_json(url, session=session)
def fetch_live_status(league: str, game_id: str, lang: str, session: requests.Session | None = None) -> dict:
url = URL_LIVE_STATUS.format(host=HOST, league=league, game_id=game_id, lang=lang)
return fetch_json(url, session=session)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _get(d: dict | None, *path, default=None):
"""Безопасно достаём вложенные ключи: _get(d, "result", "fullScore", default={})"""
cur = d or {}
for p in path:
if not isinstance(cur, dict) or p not in cur:
return default
cur = cur[p]
return cur
def _dedup_plays(plays: List[dict]) -> List[dict]:
"""
Удаляем дубли по стабильному идентификатору события.
Приоритет: eventId -> id -> (sequence, clock, teamId, type)
"""
seen = set()
out = []
for ev in plays:
if not isinstance(ev, dict):
continue
key = (
ev.get("eventId")
or ev.get("id")
or (ev.get("sequence"), ev.get("clock"), ev.get("teamId"), ev.get("type"))
)
if key in seen:
continue
seen.add(key)
out.append(ev)
# если есть поле sequence/time — отсортируем, чтобы обработчик получал стабильный порядок
out.sort(key=lambda e: (e.get("sequence") is None, e.get("sequence"), e.get("time") or e.get("clock")))
return out
def merge_online_payloads(
game: dict,
box_score: dict | None,
play_by_play: dict | None,
live_status: dict | None,
) -> Dict[str, Any]:
"""
Склеивает онлайн-ответы в единый компактный payload для downstream-обработки.
Ничего не знает о внутренней логике обработки — только нормализует.
"""
# исходные куски
plays_raw: List[dict] = _get(play_by_play, "result", default=[]) or []
score_by_periods = _get(box_score, "result", "scoreByPeriods", default=[]) or []
full_score = _get(box_score, "result", "fullScore", default={}) or {}
teams = _get(box_score, "result", "teams", default={}) or {} # если пригодится в обработчике
players = _get(box_score, "result", "players", default=[]) or []
# live
period = _get(live_status, "result", "period")
clock = _get(live_status, "result", "clock")
status = _get(live_status, "result", "status") # e.g., "inprogress", "ended", "scheduled"
# нормализация/дедуп
plays = _dedup_plays(plays_raw)
merged: Dict[str, Any] = {
"meta": {
"generatedAt": _now_iso(),
"sourceHints": {
"boxScoreHas": list((_get(box_score, "result") or {}).keys()),
"pbpLen": len(plays),
},
},
"result": {
# то, что просил: три ключа (плюс ещё полезные поля)
"plays": plays,
"scoreByPeriods": score_by_periods,
"fullScore": full_score,
# добавим live — обработчику пригодится
"period": period,
"clock": clock,
"status": status,
# опционально: передадим команды/игроков, если есть в box score
"teams": teams,
"players": players,
},
}
return merged
# где-то в твоём коде
def process_online_update(merged: dict) -> None:
"""
Здесь — любая твоя логика: обновить JSON-файлы, пересчитать агрегаты,
уведомить подписчиков, обновить кеш и т.д.
"""
# пример:
game_id = merged["meta"]["gameId"]
print(game_id)
# ... твоя обработка ...
def is_already_merged(obj: dict) -> bool:
"""
Проверяем, что объект уже содержит result.plays/fullScore/scoreByPeriods.
Подходит для ответа game (исторический матч).
"""
if not isinstance(obj, dict):
return False
res = obj.get("result") or obj.get("game") or obj # подстрахуемся под разные корни
if not isinstance(res, dict):
return False
r = res.get("result", res) # иногда внутри ещё один "result"
return (
isinstance(r, dict)
and isinstance(r.get("plays", []), list)
and isinstance(r.get("fullScore", {}), dict)
and isinstance(r.get("scoreByPeriods", []), list)
)
def ensure_merged_payload(
game_or_merged: dict | None = None,
*,
box_score: dict | None = None,
play_by_play: dict | None = None,
live_status: dict | None = None,
game_meta: dict | None = None, # например {"id": ..., "league": ...}
) -> dict:
"""
1) Если передан уже-склеенный payload (исторический матч) — нормализуем и возвращаем.
2) Иначе склеиваем из box/pbp/live через merge_online_payloads.
"""
# 1) Уже склеено (история) — просто привести к твоему контракту {meta, ids, result}
if game_or_merged and is_already_merged(game_or_merged):
g = game_or_merged.get("result") or game_or_merged # допускаем разные корни
# print(g)
with open("temp.json", "w", encoding="utf-8") as f:
json.dump(g, f, ensure_ascii=False, indent=2)
merged = {
"meta": {
"generatedAt": _now_iso(),
"sourceHints": {"from": "game_api", "pbpLen": len(g.get("plays", []))},
},
"result": g,
}
return merged
# 2) Онлайн-ветка — склеиваем так, как у тебя уже реализовано
if box_score is not None or play_by_play is not None or live_status is not None:
base_game = game_meta or {}
return merge_online_payloads(base_game, box_score, play_by_play, live_status)
# 2b) Fallback: если пришёл "game", но без plays/fullScore/scoreByPeriods — всё равно сохраним
if game_or_merged:
g = game_or_merged.get("result") or game_or_merged
return {
"meta": {
"generatedAt": _now_iso(),
"sourceHints": {"from": "game_api_raw"},
},
"result": g, # положим сырой ответ целиком — чтобы файл гарантированно записался
}
raise ValueError("ensure_merged_payload: не передан ни уже-склеенный game, ни box/pbp/live.")
def atomic_write_json(path: str | Path, data: dict, ensure_dirs: bool = True) -> None:
path = Path(path)
if ensure_dirs:
path.parent.mkdir(parents=True, exist_ok=True)
# атомарная запись: пишем во временный файл и переименовываем
with tempfile.NamedTemporaryFile("w", delete=False, dir=str(path.parent), encoding="utf-8") as tmp:
json.dump(data, tmp, ensure_ascii=False, indent=2)
tmp.flush()
os.fsync(tmp.fileno())
tmp_name = tmp.name
os.replace(tmp_name, path)
def format_time(seconds: float | int) -> str:
"""
Форматирует время в секундах в строку "M:SS".
Args:
seconds (float | int): Количество секунд.
Returns:
str: Время в формате "M:SS".
"""
try:
total_seconds = int(float(seconds))
minutes = total_seconds // 60
sec = total_seconds % 60
return f"{minutes}:{sec:02}"
except (ValueError, TypeError):
return "0:00"
def Json_Team_Generation(merged: dict, *, out_dir: str = "static", who: str | None = None):
"""
Единая точка: принимает уже нормализованный merged, делает нужные вычисления (если надо)
и сохраняет в JSON.
"""
# Здесь можно делать любые расчёты/агрегации...
# Пример предохранителя: сортировка плей-бай-плея по sequence
# plays = merged.get("result", {}).get("plays", [])
# if plays and isinstance(plays, list):
# try:
# plays.sort(key=lambda e: (e.get("sequence") is None, e.get("sequence"), e.get("time") or e.get("clock")))
# except Exception:
# pass
# Имя файла
# print(merged)
if who == "team1":
for i in merged["result"]["teams"]:
if i["teamNumber"] == 1:
payload = i
elif who == "team2":
for i in merged["result"]["teams"]:
if i["teamNumber"] == 2:
payload = i
# online = (
# True
# if json_live_status
# and "status" in json_live_status
# and json_live_status["status"] == "Ok"
# and json_live_status["result"]["gameStatus"] == "Online"
# else False
# )
online = False
role_list = [
("Center", "C"),
("Guard", "G"),
("Forward", "F"),
("Power Forward", "PF"),
("Small Forward", "SF"),
("Shooting Guard", "SG"),
("Point Guard", "PG"),
("Forward-Center", "FC"),
]
starts = payload["starts"]
team = []
for item in starts:
player = {
"id": (
item["personId"]
if item["personId"]
else ""
),
"num": item["displayNumber"],
"startRole": item["startRole"],
"role": item["positionName"],
"roleShort": (
[
r[1]
for r in role_list
if r[0].lower() == item["positionName"].lower()
][0]
if any(
r[0].lower() == item["positionName"].lower()
for r in role_list
)
else ""
),
"NameGFX": (
f"{item['firstName'].strip()} {item['lastName'].strip()}"
if item["firstName"] is not None
and item["lastName"] is not None
else "Команда"
),
"captain": item["isCapitan"],
"age": item["age"] if item["age"] is not None else 0,
"height": f'{item["height"]} cm' if item["height"] else 0,
"weight": f'{item["weight"]} kg' if item["weight"] else 0,
"isStart": (
item["stats"]["isStart"] if item["stats"] else False
),
"isOn": (
"🏀"
if item["stats"] and item["stats"]["isOnCourt"] is True
else ""
),
"flag": f"https://flagicons.lipis.dev/flags/4x3/{'ru' if item['countryId'] is None and item['countryName'] == 'Russia' else '' if item['countryId'] is None else item['countryId'].lower() if item['countryName'] is not None else ''}.svg",
"pts": item["stats"]["points"] if item["stats"] else 0,
"pt-2": (
f"{item['stats']['goal2']}/{item['stats']['shot2']}"
if item["stats"]
else 0
),
"pt-3": (
f"{item['stats']['goal3']}/{item['stats']['shot3']}"
if item["stats"]
else 0
),
"pt-1": (
f"{item['stats']['goal1']}/{item['stats']['shot1']}"
if item["stats"]
else 0
),
"fg": (
f"{item['stats']['goal2'] + item['stats']['goal3']}/{item['stats']['shot2'] + item['stats']['shot3']}"
if item["stats"]
else 0
),
"ast": item["stats"]["assist"] if item["stats"] else 0,
"stl": item["stats"]["steal"] if item["stats"] else 0,
"blk": item["stats"]["block"] if item["stats"] else 0,
"blkVic": item["stats"]["blocked"] if item["stats"] else 0,
"dreb": item["stats"]["defReb"] if item["stats"] else 0,
"oreb": item["stats"]["offReb"] if item["stats"] else 0,
"reb": (
item["stats"]["defReb"] + item["stats"]["offReb"]
if item["stats"]
else 0
),
"to": item["stats"]["turnover"] if item["stats"] else 0,
"foul": item["stats"]["foul"] if item["stats"] else 0,
"foulT": item["stats"]["foulT"] if item["stats"] else 0,
"foulD": item["stats"]["foulD"] if item["stats"] else 0,
"foulC": item["stats"]["foulC"] if item["stats"] else 0,
"foulB": item["stats"]["foulB"] if item["stats"] else 0,
"fouled": item["stats"]["foulsOn"] if item["stats"] else 0,
"plusMinus": item["stats"]["plusMinus"] if item["stats"] else 0,
"dunk": item["stats"]["dunk"] if item["stats"] else 0,
"kpi": (
item["stats"]["points"]
+ item["stats"]["defReb"]
+ item["stats"]["offReb"]
+ item["stats"]["assist"]
+ item["stats"]["steal"]
+ item["stats"]["block"]
+ item["stats"]["foulsOn"]
+ (item["stats"]["goal1"] - item["stats"]["shot1"])
+ (item["stats"]["goal2"] - item["stats"]["shot2"])
+ (item["stats"]["goal3"] - item["stats"]["shot3"])
- item["stats"]["turnover"]
- item["stats"]["foul"]
if item["stats"]
else 0
),
"time": (
format_time(item["stats"]["second"])
if item["stats"]
else "0:00"
),
"pts1q": 0,
"pts2q": 0,
"pts3q": 0,
"pts4q": 0,
"pts1h": 0,
"pts2h": 0,
"Name1GFX": (
item["firstName"].strip() if item["firstName"] else ""
),
"Name2GFX": (
item["lastName"].strip() if item["lastName"] else ""
),
"photoGFX": (
os.path.join(
"D:\\Photos",
merged["result"]["league"]["abcName"],
merged["result"][who]["name"],
# LEAGUE,
# data[who],
f"{item['displayNumber']}.png",
)
if item["startRole"] == "Player"
else ""
),
# "season": text,
"isOnCourt": (
item["stats"]["isOnCourt"] if item["stats"] else False
),
# "AvgPoints": (
# row_player_season_avg["points"]
# if row_player_season_avg
# and row_player_season_avg["points"] != ""
# else "0.0"
# ),
# "AvgAssist": (
# row_player_season_avg["assist"]
# if row_player_season_avg
# and row_player_season_avg["assist"] != ""
# else "0.0"
# ),
# "AvgBlocks": (
# row_player_season_avg["blockShot"]
# if row_player_season_avg
# and row_player_season_avg["blockShot"] != ""
# else "0.0"
# ),
# "AvgDefRebound": (
# row_player_season_avg["defRebound"]
# if row_player_season_avg
# and row_player_season_avg["defRebound"] != ""
# else "0.0"
# ),
# "AvgOffRebound": (
# row_player_season_avg["offRebound"]
# if row_player_season_avg
# and row_player_season_avg["offRebound"] != ""
# else "0.0"
# ),
# "AvgRebound": (
# row_player_season_avg["rebound"]
# if row_player_season_avg
# and row_player_season_avg["rebound"] != ""
# else "0.0"
# ),
# "AvgSteal": (
# row_player_season_avg["steal"]
# if row_player_season_avg
# and row_player_season_avg["steal"] != ""
# else "0.0"
# ),
# "AvgTurnover": (
# row_player_season_avg["turnover"]
# if row_player_season_avg
# and row_player_season_avg["turnover"] != ""
# else "0.0"
# ),
# "AvgFoul": (
# row_player_season_avg["foul"]
# if row_player_season_avg
# and row_player_season_avg["foul"] != ""
# else "0.0"
# ),
# "AvgOpponentFoul": (
# row_player_season_avg["foulsOnPlayer"]
# if row_player_season_avg
# and row_player_season_avg["foulsOnPlayer"] != ""
# else "0.0"
# ),
# "AvgPlusMinus": (
# row_player_season_avg["plusMinus"]
# if row_player_season_avg
# and row_player_season_avg["plusMinus"] != ""
# else "0.0"
# ),
# "AvgDunk": (
# row_player_season_avg["dunk"]
# if row_player_season_avg
# and row_player_season_avg["dunk"] != ""
# else "0.0"
# ),
# "AvgKPI": "0.0",
# "AvgPlayedTime": (
# row_player_season_avg["playedTime"]
# if row_player_season_avg
# and row_player_season_avg["playedTime"] != ""
# else "0:00"
# ),
# "Shot1Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=1
# ),
# "Shot2Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=2
# ),
# "Shot3Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=3
# ),
# "Shot23Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=[2, 3]
# ),
# "TPoints": sum_stat_with_online(
# "points", sum_stat, item["stats"], online
# ),
# "TShots1": calc_total_shots_str(
# sum_stat, item["stats"], online, 1
# ),
# "TShots2": calc_total_shots_str(
# sum_stat, item["stats"], online, 2
# ),
# "TShots3": calc_total_shots_str(
# sum_stat, item["stats"], online, 3
# ),
# "TShots23": calc_total_shots_str(
# sum_stat, item["stats"], online, [2, 3]
# ),
# "TShot1Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=1
# ),
# "TShot2Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=2
# ),
# "TShot3Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=3
# ),
# "TShot23Percent": calc_shot_percent_by_type(
# sum_stat, item["stats"], online, shot_types=[2, 3]
# ),
# "TAssist": sum_stat_with_online(
# "assist", sum_stat, item["stats"], online
# ),
# "TBlocks": sum_stat_with_online(
# "blockShot", sum_stat, item["stats"], online
# ),
# "TDefRebound": sum_stat_with_online(
# "defRebound", sum_stat, item["stats"], online
# ),
# "TOffRebound": sum_stat_with_online(
# "offRebound", sum_stat, item["stats"], online
# ),
# "TRebound": (
# sum_stat_with_online(
# "defRebound", sum_stat, item["stats"], online
# )
# + sum_stat_with_online(
# "offRebound", sum_stat, item["stats"], online
# )
# ),
# "TSteal": sum_stat_with_online(
# "steal", sum_stat, item["stats"], online
# ),
# "TTurnover": sum_stat_with_online(
# "turnover", sum_stat, item["stats"], online
# ),
# "TFoul": sum_stat_with_online(
# "foul", sum_stat, item["stats"], online
# ),
# "TOpponentFoul": sum_stat_with_online(
# "foulsOnPlayer", sum_stat, item["stats"], online
# ),
# "TPlusMinus": 0,
# "TDunk": sum_stat_with_online(
# "dunk", sum_stat, item["stats"], online
# ),
# "TKPI": 0,
# "TPlayedTime": sum_stat["playedTime"] if sum_stat else "0:00",
# "TGameCount": (
# safe_int(sum_stat["games"])
# if sum_stat and sum_stat.get("games") != ""
# else 0
# )
# + (1 if online else 0),
# "TStartCount": (
# safe_int(sum_stat["isStarts"])
# if sum_stat and sum_stat.get("isStarts", 0) != ""
# else 0
# ),
# "CareerTShots1": calc_total_shots_str(
# row_player_career_sum, item["stats"], online, 1
# ),
# "CareerTShots2": calc_total_shots_str(
# row_player_career_sum, item["stats"], online, 2
# ),
# "CareerTShots3": calc_total_shots_str(
# row_player_career_sum, item["stats"], online, 3
# ),
# "CareerTShots23": calc_total_shots_str(
# row_player_career_sum, item["stats"], online, [2, 3]
# ),
# "CareerTShot1Percent": calc_shot_percent_by_type(
# row_player_career_sum, item["stats"], online, 1
# ),
# "CareerTShot2Percent": calc_shot_percent_by_type(
# row_player_career_sum, item["stats"], online, 2
# ),
# "CareerTShot3Percent": calc_shot_percent_by_type(
# row_player_career_sum, item["stats"], online, 3
# ),
# "CareerTShot23Percent": calc_shot_percent_by_type(
# row_player_career_sum, item["stats"], online, [2, 3]
# ),
# "CareerTPoints": sum_stat_with_online(
# "points", row_player_career_sum, item["stats"], online
# ),
# "CareerTAssist": sum_stat_with_online(
# "assist", row_player_career_sum, item["stats"], online
# ),
# "CareerTBlocks": sum_stat_with_online(
# "blockShot", row_player_career_sum, item["stats"], online
# ),
# "CareerTDefRebound": sum_stat_with_online(
# "defRebound", row_player_career_sum, item["stats"], online
# ),
# "CareerTOffRebound": sum_stat_with_online(
# "offRebound", row_player_career_sum, item["stats"], online
# ),
# "CareerTRebound": (
# sum_stat_with_online(
# "defRebound",
# row_player_career_sum,
# item["stats"],
# online,
# )
# + sum_stat_with_online(
# "offRebound",
# row_player_career_sum,
# item["stats"],
# online,
# )
# ),
# "CareerTSteal": sum_stat_with_online(
# "steal", row_player_career_sum, item["stats"], online
# ),
# "CareerTTurnover": sum_stat_with_online(
# "turnover", row_player_career_sum, item["stats"], online
# ),
# "CareerTFoul": sum_stat_with_online(
# "foul", row_player_career_sum, item["stats"], online
# ),
# "CareerTOpponentFoul": sum_stat_with_online(
# "foulsOnPlayer",
# row_player_career_sum,
# item["stats"],
# online,
# ),
# "CareerTPlusMinus": 0, # оставить как есть
# "CareerTDunk": sum_stat_with_online(
# "dunk", row_player_career_sum, item["stats"], online
# ),
# "CareerTPlayedTime": (
# row_player_career_sum["playedTime"]
# if row_player_career_sum
# else "0:00"
# ),
# "CareerTGameCount": sum_stat_with_online(
# "games", row_player_career_sum, item["stats"], online
# )
# + (1 if online else 0),
# "CareerTStartCount": sum_stat_with_online(
# "isStarts", row_player_career_sum, item["stats"], online
# ), # если нужно, можно +1 при старте
# "AvgCarPoints": (
# row_player_career_avg["points"]
# if row_player_career_avg
# and row_player_career_avg["points"] != ""
# else "0.0"
# ),
# "AvgCarAssist": (
# row_player_career_avg["assist"]
# if row_player_career_avg
# and row_player_career_avg["assist"] != ""
# else "0.0"
# ),
# "AvgCarBlocks": (
# row_player_career_avg["blockShot"]
# if row_player_career_avg
# and row_player_career_avg["blockShot"] != ""
# else "0.0"
# ),
# "AvgCarDefRebound": (
# row_player_career_avg["defRebound"]
# if row_player_career_avg
# and row_player_career_avg["defRebound"] != ""
# else "0.0"
# ),
# "AvgCarOffRebound": (
# row_player_career_avg["offRebound"]
# if row_player_career_avg
# and row_player_career_avg["offRebound"] != ""
# else "0.0"
# ),
# "AvgCarRebound": (
# row_player_career_avg["rebound"]
# if row_player_career_avg
# and row_player_career_avg["rebound"] != ""
# else "0.0"
# ),
# "AvgCarSteal": (
# row_player_career_avg["steal"]
# if row_player_career_avg
# and row_player_career_avg["steal"] != ""
# else "0.0"
# ),
# "AvgCarTurnover": (
# row_player_career_avg["turnover"]
# if row_player_career_avg
# and row_player_career_avg["turnover"] != ""
# else "0.0"
# ),
# "AvgCarFoul": (
# row_player_career_avg["foul"]
# if row_player_career_avg
# and row_player_career_avg["foul"] != ""
# else "0.0"
# ),
# "AvgCarOpponentFoul": (
# row_player_career_avg["foulsOnPlayer"]
# if row_player_career_avg
# and row_player_career_avg["foulsOnPlayer"] != ""
# else "0.0"
# ),
# "AvgCarPlusMinus": (
# row_player_career_avg["plusMinus"]
# if row_player_career_avg
# and row_player_career_avg["plusMinus"] != ""
# else "0.0"
# ),
# "AvgCarDunk": (
# row_player_career_avg["dunk"]
# if row_player_career_avg
# and row_player_career_avg["dunk"] != ""
# else "0.0"
# ),
# "AvgCarKPI": "0.0",
# "AvgCarPlayedTime": (
# row_player_career_avg["playedTime"]
# if row_player_career_avg
# and row_player_career_avg["playedTime"] != ""
# else "0:00"
# ),
# "HeadCoachStatsCareer": HeadCoachStatsCareer,
# "HeadCoachStatsTeam": HeadCoachStatsTeam,
# # "PTS_Career_High": get_carrer_high(item["personId"], "points"),
# # "AST_Career_High": get_carrer_high(item["personId"], "assist"),
# # "REB_Career_High": get_carrer_high(item["personId"], "rebound"),
# # "STL_Career_High": get_carrer_high(item["personId"], "steal"),
# # "BLK_Career_High": get_carrer_high(item["personId"], "blockShot"),
}
team.append(player)
count_player = sum(1 for x in team if x["startRole"] == "Player")
# print(count_player)
if count_player < 12:
if team: # Check if team is not empty
empty_rows = [
{
key: (
False
if key in ["captain", "isStart", "isOnCourt"]
else (
0
if key
in [
"id",
"pts",
"weight",
"height",
"age",
"ast",
"stl",
"blk",
"blkVic",
"dreb",
"oreb",
"reb",
"to",
"foul",
"foulT",
"foulD",
"foulC",
"foulB",
"fouled",
"plusMinus",
"dunk",
"kpi",
]
else ""
)
)
for key in team[0].keys()
}
for _ in range(
(4 if count_player <= 4 else 12) - count_player
)
]
team.extend(empty_rows)
role_priority = {
"Player": 0,
"": 1,
"Coach": 2,
"Team": 3,
None: 4,
"Other": 5, # на случай неизвестных
}
# print(team)
sorted_team = sorted(
team,
key=lambda x: role_priority.get(
x.get("startRole", 99), 99
), # 99 — по умолчанию
)
out_path = Path(out_dir) / f"{who}.json"
atomic_write_json(out_path, sorted_team)
logging.getLogger("game_watcher").info("Сохранил payload: %s", out_path)
# ==========================
# ---- ДОМЕННАЯ ЛОГИКА
# ==========================
def validate_league_or_die(league: str) -> str:
league = (league or DEFAULT_LEAGUE).lower().strip()
if league not in ALLOWED_LEAGUES:
logger.warning(f"Неверный тег лиги: '{league}'. Допустимо: {sorted(ALLOWED_LEAGUES)}")
sys.exit(2)
return league
def get_last_season_or_die(league: str, lang: str) -> str:
url = URL_SEASON.format(host=HOST, league=league, lang=lang)
try:
data = fetch_json(url)
season = extract_last_season(data)
logging.info(
f"Последний сезон для {league}: {season}"
)
return season
except Exception as e:
logger.warning(f"Не получилось получить последний сезон для {league}: {e}")
sys.exit(3)
def get_team_schedule_or_die(league: str, season: str, team: str, lang: str) -> list[dict]:
url = URL_SCHEDULE.format(host=HOST, league=league, season=season, lang=lang)
try:
data = fetch_json(url)
team_games = extract_team_schedule_for_season(data, team)
if not team_games:
logger.warning(f"Для команды {team} не найдено игр в сезоне {season}.")
return team_games
except Exception as e:
logger.warning(f"Не получилось получить расписание {league}/{season}: {e}")
return []
def pick_today_or_last_played(
team_games: list[dict], now: datetime
) -> tuple[dict | None, dict | None]:
"""
Возвращает (сегодняшняя игра, последний сыгранный матч).
"""
today = now.date()
games_sorted = sorted(team_games, key=parse_game_start_dt)
today_game = None
last_played = None
for g in games_sorted:
start = parse_game_start_dt(g)
status = g.get("game", {}).get("gameStatus", "").lower()
if start.date() == today and today_game is None:
today_game = g
if start <= now and status == "resultconfirmed":
last_played = g
return today_game, last_played
def is_game_online(league: str, game_id: str, lang:str) -> str:
"""
Возвращает статус: inprogress|scheduled|finished (или то, что твой API даёт).
"""
url = URL_GAME.format(host=HOST, league=league, game_id=game_id, lang=lang)
data = fetch_json(url)
return extract_game_status(data)
class PostProcessor:
def __init__(self):
self.q = queue.Queue(maxsize=1)
self._t = threading.Thread(target=self._worker, daemon=True)
self._stop = threading.Event()
self._t.start()
def submit(self, merged):
# кладём только «последний» payload
try:
# если очередь занята, выкидываем старое задание
while True:
self.q.get_nowait()
except queue.Empty:
pass
# не блокируем: если за эту миллисекунду кто-то положил — просто заменим в следующий раз
try:
self.q.put_nowait(merged)
except queue.Full:
pass
def _worker(self):
while not self._stop.is_set():
merged = self.q.get()
try:
Json_Team_Generation(merged, out_dir="static", who="team1")
Json_Team_Generation(merged, out_dir="static", who="team2")
except Exception as e:
logging.exception(f"Postproc failed: {e}")
def stop(self):
self._stop.set()
class OnlinePoller:
def __init__(self, league: str, game_id: str, lang: str, on_update: callable | None = None):
self.league = league
self.game_id = game_id
self.lang = lang
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._log = logging.info("start")
self._on_update = on_update
self._post = PostProcessor()
# 1) Постоянная сессия и пул соединений
self._session = requests.Session()
retry = Retry(
total=2, connect=2, read=2, backoff_factor=0.1,
status_forcelist=(502, 503, 504),
allowed_methods=frozenset(["GET"])
)
adapter = HTTPAdapter(pool_connections=1, pool_maxsize=10, max_retries=retry)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
self._session.headers.update({
"Connection": "keep-alive",
"Accept": "application/json, */*",
"Accept-Encoding": "gzip, deflate, br",
"User-Agent": "game-watcher/1.0"
})
def stop(self):
if self._thread and self._thread.is_alive():
self._stop_event.set()
self._thread.join(timeout=2)
self._log.info(f"Онлайн-поллер для игры {self.game_id} остановлен.")
self._thread = None
try:
self._session.close()
except Exception:
pass
try:
self._post.stop()
except Exception:
pass
def _run(self):
# Исполнитель для параллельных GET
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
while not self._stop_event.is_set():
started = time.perf_counter()
try:
futures = [
pool.submit(fetch_box_score, self.league, self.game_id, self.lang, self._session),
pool.submit(fetch_play_by_play, self.league, self.game_id, self.lang, self._session),
pool.submit(fetch_live_status, self.league, self.game_id, self.lang, self._session),
]
bs, pbp, ls = (f.result() for f in futures)
merged = ensure_merged_payload(
None,
box_score=bs,
play_by_play=pbp,
live_status=ls,
game_meta={"id": self.game_id, "league": self.league},
)
# внешний коллбек, если задан
if self._on_update:
self._on_update(merged)
# твоя общая обработка + сохранение
self._post.submit(merged)
self._log.debug(
"Обновления online: box-score(%s keys), pbp(%s keys), live-status(%s keys)",
len(bs) if isinstance(bs, dict) else "",
len(pbp) if isinstance(pbp, dict) else "",
len(ls) if isinstance(ls, dict) else "",
)
except Exception as e:
logger.warning(f"Сбой online-поллера для игры {self.game_id}: {e}")
# лёгкая задержка после ошибки, но не «наказание» на целую секунду
time.sleep(0.2)
# Точное выдерживание частоты: «1 цикл в секунду»
elapsed = time.perf_counter() - started
rest = ONLINE_FETCH_INTERVAL_SEC - elapsed
if rest > 0:
# спим только остаток
self._stop_event.wait(rest)
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run,
name=f"poller-{self.game_id}",
daemon=True,
)
self._thread.start()
self._log.info(f"Онлайн-поллер для игры {self.game_id} запущен.")
def monitor_game_loop(league: str, game_id: str, lang:str, stop_event: threading.Event) -> None:
logger.info(f"Старт мониторинга игры {game_id} ({league}).")
poller = OnlinePoller(league, game_id, lang)
was_online = False
while not stop_event.is_set():
try:
status = is_game_online(league, game_id, lang)
is_online = status in {"inprogress", "live", "online"}
is_finished = status in {"resultconfirmed", "result"}
if is_finished:
logger.info(f"Матч {game_id} завершён.\nОстанавливаем мониторинг.")
break
if is_online and not was_online:
logger.info(f"Матч {game_id} перешёл в онлайн.\nЗапускаем быстрый опрос (1 сек).")
poller.start()
elif not is_online and was_online:
logger.info(f"Матч {game_id} вышел из онлайна (или ещё не стартовал).\nОстанавливаем быстрый опрос.")
poller.stop()
was_online = is_online
# Проверяем статус снова через минуту
stop_event.wait(STATUS_CHECK_INTERVAL_SEC)
except Exception as e:
logger.warning(f"Сбой проверки статуса матча {game_id}: {e}")
# При ошибке — не дергаем быстро, подождём немного и повторим
stop_event.wait(POLL_INTERVAL_OFFLINE_SEC)
# Гарантированно остановим быстрый опрос при завершении
poller.stop()
logger.info(f"Мониторинг матча {game_id} остановлен.")
def next_midnight_local(now: datetime) -> datetime:
tomorrow = (now + timedelta(days=1)).date()
return datetime.combine(tomorrow, datetime.min.time(), tzinfo=APP_TZ) + timedelta(minutes=5)
# return now + timedelta(seconds=30)
def daily_rollover_loop(
league: str,
team: str,
lang: str,
season_getter,
schedule_getter,
monitor_mgr,
stop_event: threading.Event,
):
"""
Каждый день в ~00:05 по Europe/Moscow:
- узнаём актуальный сезон
- заново тянем расписание
- выбираем сегодняшнюю игру или последний сыгранный
- при наличии сегодняшней игры — перезапускаем монитор на неё
"""
while not stop_event.is_set():
now = datetime.now(APP_TZ)
wakeup_at = next_midnight_local(now)
print(type(wakeup_at))
seconds = (wakeup_at - now).total_seconds()
logger.info(
# f"Ежедневка: проснусь {datetime.fromisoformat(wakeup_at.isoformat())} (через {int(seconds)} сек)."
f"Ежедневная проверка матча:\nпроснусь {wakeup_at.strftime('%Y-%m-%d %H:%M:%S')} (через {int(seconds)} сек)."
)
if stop_event.wait(seconds):
break
# Выполняем ежедневную проверку
try:
season = season_getter(league, lang)
games = schedule_getter(league, season, team, lang)
if not games:
logger.info(
f"Ежедневная проверка:\nу {team} нет игр в расписании сезона {season}."
)
continue
today_game, last_played = pick_today_or_last_played(
games, datetime.now(APP_TZ)
)
if today_game:
gid = today_game["game"]["id"]
logger.info(
f"Сегодня у {team} есть игра: gameID={gid}. \nПерезапуск мониторинга."
)
monitor_mgr.restart(gid, lang)
elif last_played:
gid = last_played["game"]["id"]
logger.info(
f"Сегодня у {team} нет игры. \nПоследняя сыгранная: gameID={gid}.\nМониторинг НЕ запускаем."
)
else:
logger.info(f"Сегодня у {team} нет игры и нет предыдущих сыгранных.")
except Exception as e:
logger.warning(f"Ошибка ежедневной проверки: {e}")
class MonitorManager:
"""
Управляет потоком мониторинга, чтобы можно было
безопасно перезапускать на новый gameId.
"""
def __init__(self, league: str):
self.league = league
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._lock = threading.Lock()
def restart(self, game_id: str, lang: str):
with self._lock:
self.stop()
self._stop_event = threading.Event()
self._thread = threading.Thread(
target=monitor_game_loop,
args=(self.league, game_id, lang, self._stop_event),
name=f"monitor-{game_id}",
daemon=True,
)
self._thread.start()
def stop(self):
if self._thread and self._thread.is_alive():
self._stop_event.set()
self._thread.join(timeout=5)
self._thread = None
# ==========================
# ---- MAIN
# ==========================
def main():
# global MYHOST
parser = argparse.ArgumentParser(description="Game watcher")
parser.add_argument("--league", type=str, default=DEFAULT_LEAGUE, help="тег лиги")
parser.add_argument(
"--team", type=str, required=True, help="код/тег команды (например, BOS)"
)
parser.add_argument(
"--lang", type=str, default="en", help="язык получения данных"
)
parser.add_argument(
"--log-level", type=str, default="INFO", help="DEBUG|INFO|WARNING|ERROR"
)
args = parser.parse_args()
print(args)
# logger.info(f"Запуск программы пользователем: {MYHOST}")
logger.info(f"Запуск с параметрами:\nleague={args.league}\nteam={args.team}\nlang={args.lang}")
league = validate_league_or_die(args.league)
team = args.team.lower()
# 1) Узнать последний сезон
season = get_last_season_or_die(league, args.lang)
# 2) Получить расписание для команды
team_games = get_team_schedule_or_die(league, season, team, args.lang)
if not team_games:
logger.warning("Расписание пустое — работа завершена.")
sys.exit(4)
# 3) Найти сегодняшнюю или последнюю сыгранную игру
now = datetime.now(APP_TZ)
today_game, last_played = pick_today_or_last_played(team_games, now)
monitor_mgr = MonitorManager(league=league)
if today_game:
# В исходном расписании предполагалось наличие game.id
game_id = today_game["game"]["id"]
logger.info(
f"Сегодня у {team} есть игра: gameID={game_id}.\nЗапускаю мониторинг."
)
monitor_mgr.restart(game_id, args.lang)
else:
if last_played:
game_id = last_played["game"]["id"]
try:
url = URL_GAME.format(host=HOST, league=league, game_id=game_id, lang=args.lang)
game_json = fetch_json(url)
merged = ensure_merged_payload(
game_json,
game_meta={
"id": game_json.get("result", {}).get("gameId"),
"league": args.league
}
)
Json_Team_Generation(merged, out_dir="static", who="team1")
Json_Team_Generation(merged, out_dir="static", who="team2")
# print(merged)
logger.info(
f"Сегодня у {team} нет игры.\nПоследняя сыгранная: gameID={game_id}.\nМониторинг не запускаю."
)
except Exception as e:
logging.exception(
f"Оффлайн-сохранение для gameID={game_id}\nупало: {e}"
)
else:
logger.info(f"Сегодня у {team} нет игры и нет предыдущих сыгранных.")
# 4) Ежедневная перекладка расписания
stop_event = threading.Event()
rollover_thread = threading.Thread(
target=daily_rollover_loop,
args=(
league,
team,
args.lang,
get_last_season_or_die,
get_team_schedule_or_die,
monitor_mgr,
stop_event,
),
name="daily-rollover",
daemon=True,
)
rollover_thread.start()
# Держим главный поток живым
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Завершение по Ctrl+C…")
finally:
stop_event.set()
monitor_mgr.stop()
rollover_thread.join(timeout=5)
logger.info("Остановлено.")
if __name__ == "__main__":
main()