from fastapi import FastAPI
from fastapi.responses import Response, HTMLResponse
from fastapi import HTTPException
from fastapi import Request
from contextlib import asynccontextmanager
import requests
import threading
import time
import queue
import argparse
import uvicorn
import os
import pandas as pd
import json
from datetime import datetime, time as dtime, timedelta
from fastapi.responses import Response
import logging
import logging.config
import platform
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument("--league", default="vtb")
parser.add_argument("--team", required=True)
parser.add_argument("--lang", default="en")
args = parser.parse_args()
MYHOST = platform.node()
if not os.path.exists("logs"):
os.makedirs("logs")
telegram_bot_token = "7639240596:AAH0YtdQoWZSC-_R_EW4wKAHHNLIA0F_ARY"
# telegram_chat_id = 228977654
# telegram_chat_id = -4803699526
telegram_chat_id = -1003388354193
log_config = {
"version": 1,
"handlers": {
"telegram": {
"class": "telegram_handler.TelegramHandler",
"level": "INFO",
"token": telegram_bot_token,
"chat_id": telegram_chat_id,
"formatter": "telegram",
},
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "simple",
"filename": f"logs/GFX_{MYHOST}.log",
"encoding": "utf-8",
},
},
"loggers": {
__name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"},
},
"formatters": {
"telegram": {
"class": "telegram_handler.HtmlFormatter",
"format": f"%(levelname)s [{MYHOST.upper()}]\n%(message)s",
"use_emoji": "True",
},
"simple": {
"class": "logging.Formatter",
"format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s",
"datefmt": "%d.%m.%Y %H:%M:%S",
},
},
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
logger.handlers[2].formatter.use_emoji = True
LEAGUE = args.league
TEAM = args.team
LANG = args.lang
HOST = "https://pro.russiabasket.org"
STATUS = False
GAME_ID = None
SEASON = None
GAME_START_DT = None # datetime начала матча (локальная из календаря)
GAME_TODAY = False # флаг: игра сегодня
GAME_SOON = False # флаг: игра сегодня и скоро (<1 часа)
OFFLINE_SWITCH_AT = None # timestamp, когда надо уйти в оффлайн
OFFLINE_DELAY_SEC = 600 # 10 минут
SLEEP_NOTICE_SENT = False # 👈 чтобы не слать уведомление повторно
# --- preload lock ---
PRELOAD_LOCK = False # когда True — consumer будет принимать только preloaded game
PRELOADED_GAME_ID = None # ID матча, который мы держим «тёплым»
PRELOAD_HOLD_UNTIL = None # timestamp, до какого момента держим (T-1:15)
# общая очередь
results_q = queue.Queue()
# тут будем хранить последние данные
latest_data = {}
# событие для остановки потоков
stop_event = threading.Event()
# отдельные события для разных наборов потоков
stop_event_live = threading.Event()
stop_event_offline = threading.Event()
# чтобы из consumer можно было их гасить
threads_live = []
threads_offline = []
# какой режим сейчас запущен: "live" / "offline" / None
CURRENT_THREADS_MODE = None
CLEAR_OUTPUT_FOR_VMIX = False
EMPTY_PHOTO_PATH = r"D:\Графика\БАСКЕТБОЛ\VTB League\ASSETS\EMPTY.png"
URLS = {
"seasons": "{host}/api/abc/comps/seasons?Tag={league}",
"actual-standings": "{host}/api/abc/comps/actual-standings?tag={league}&season={season}&lang={lang}",
"calendar": "{host}/api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000",
"game": "{host}/api/abc/games/game?Id={game_id}&Lang={lang}",
"pregame": "{host}/api/abc/games/pregame?tag={league}&season={season}&id={game_id}&lang={lang}",
"pregame-full-stats": "{host}/api/abc/games/pregame-full-stats?tag={league}&season={season}&id={game_id}&lang={lang}",
"live-status": "{host}/api/abc/games/live-status?id={game_id}",
"box-score": "{host}/api/abc/games/box-score?id={game_id}",
"play-by-play": "{host}/api/abc/games/play-by-play?id={game_id}",
}
def maybe_clear_for_vmix(payload):
"""
Если включён режим очистки — возвращаем payload,
где все значения заменены на "".
Иначе — возвращаем как есть.
"""
if CLEAR_OUTPUT_FOR_VMIX:
return wipe_json_values(payload)
return payload
def start_offline_threads(season, game_id):
"""Запускаем редкие запросы, когда матча нет или он уже сыгран."""
global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data
if CURRENT_THREADS_MODE == "offline":
logger.debug("[threads] already in OFFLINE mode → skip start_offline_threads")
return
stop_live_threads()
stop_offline_threads()
logger.info("[threads] switching to OFFLINE mode ...")
stop_event_offline.clear()
threads_offline = [
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
300, # опрашиваем раз в секунду/реже
stop_event_offline,
False,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
600,
stop_event_offline,
False,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
600,
stop_event_offline,
False,
True,
),
daemon=True,
),
]
for t in threads_offline:
t.start()
CURRENT_THREADS_MODE = "offline"
logger.info("[threads] OFFLINE threads started (data cleaned)")
def start_live_threads(season, game_id):
"""Запускаем частые онлайн-запросы, когда матч идёт/вот-вот."""
global threads_live, CURRENT_THREADS_MODE, stop_event_live
# если уже в лайве — не дублируем
if CURRENT_THREADS_MODE == "live":
return
# на всякий случай гасим офлайн
stop_offline_threads()
stop_event_live.clear()
threads_live = [
threading.Thread(
target=get_data_from_API,
args=(
"pregame",
URLS["pregame"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
600,
stop_event_live,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
600,
stop_event_live,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
600,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
300, # часто
stop_event_live,
True,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"live-status",
URLS["live-status"].format(host=HOST, game_id=game_id),
0.5,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"box-score",
URLS["box-score"].format(host=HOST, game_id=game_id),
0.5,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"play-by-play",
URLS["play-by-play"].format(host=HOST, game_id=game_id),
1,
stop_event_live,
),
daemon=True,
),
]
for t in threads_live:
t.start()
CURRENT_THREADS_MODE = "live"
logger.info("[threads] LIVE threads started")
def stop_live_threads():
"""Гасим только live-треды."""
global threads_live
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
if not threads_live:
logger.info("[threads] LIVE threads stopped (nothing to stop)")
return
logger.info(f"[threads] Stopping {len(threads_live)} LIVE thread(s)...")
stop_event_live.set()
still_alive = []
for t in threads_live:
t.join(timeout=2)
if t.is_alive():
logger.warning(
f"[{current_time}] [threads] LIVE thread is still alive: {t.name}"
)
still_alive.append(t.name)
threads_live = []
if still_alive:
logger.warning(
f"[{current_time}] [threads] Some LIVE threads did not stop: {still_alive}"
)
else:
logger.info("[threads] LIVE threads stopped")
CURRENT_THREADS_MODE = None # 👈 сбрасываем режим
def stop_offline_threads():
"""Гасим только offline-треды."""
global threads_offline
if not threads_offline:
return
stop_event_offline.set()
for t in threads_offline:
t.join(timeout=1)
threads_offline = []
CURRENT_THREADS_MODE = None # 👈 сбрасываем режим
logger.info("[threads] OFFLINE threads stopped")
def has_full_game_ready() -> bool:
game = latest_data.get("game")
if not game:
return False
payload = game.get("data", game)
return (
isinstance(payload, dict)
and isinstance(payload.get("data"), dict)
and isinstance(payload["data"].get("result"), dict)
and "teams" in payload["data"]["result"]
)
# Функция запускаемая в потоках
def get_data_from_API(
name: str,
url: str,
sleep_time: float,
stop_event: threading.Event,
stop_when_live=False,
stop_after_success: bool = False, # 👈 новый флаг
):
did_first_fetch = False
while not stop_event.is_set():
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
if (
stop_when_live
and globals().get("STATUS") == "live"
and has_full_game_ready()
):
logger.info(
f"{[{current_time}]} [{name}] stopping because STATUS='live' and full game is ready"
)
break
try:
value = requests.get(url, timeout=5).json()
did_first_fetch = True # помечаем, что один заход сделали
except json.JSONDecodeError as json_err:
logger.warning(
f"[{current_time}] [{name}] Ошибка парсинга JSON: {json_err}"
)
value = {"error": f"JSON decode error: {json_err}"}
except requests.exceptions.Timeout:
logger.warning(f"[{current_time}] [{name}] Таймаут при запросе {url}")
value = {"error": "timeout"}
except requests.exceptions.RequestException as req_err:
logger.warning(f"[{current_time}] [{name}] Ошибка запроса: {req_err}")
value = {"error": str(req_err)}
except Exception as ex:
logger.warning(f"[{current_time}] [{name}] Неизвестная ошибка: {ex}")
value = {"error": str(ex)}
# Проверяем, нет ли явного статуса ошибки в JSON
if isinstance(value, dict) and str(value.get("status", "")).lower() in (
"error",
"fail",
"no-status",
):
logger.warning(
f"[{current_time}] [{name}] API вернул статус '{value.get('status')}'"
)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
results_q.put({"source": name, "ts": ts, "data": value})
logger.debug(f"[{ts}] name: {name}, status: {value.get('status', 'no-status')}")
ok_status = not (
isinstance(value, dict)
and str(value.get("status", "")).lower() in ("error", "fail", "no-status")
)
if stop_after_success and ok_status:
logger.info(
f"[{name}] got successful response → stopping thread (stop_after_success)"
)
return
# сколько уже заняло
# elapsed = time.time() - start
# сколько надо доспать, чтобы в сумме вышла нужная частота
# to_sleep = sleep_time - elapsed
# print(to_sleep)
# if to_sleep > 0:
# умное ожидание с быстрым выходом при live
slept = 0
while slept < sleep_time:
if stop_event.is_set():
break
if (
stop_when_live
and globals().get("STATUS") == "live"
and has_full_game_ready()
):
logger.info(
f"[{name}] stopping during sleep because STATUS='live' and full game is ready"
)
return
time.sleep(1)
slept += 1
# если запрос занял дольше — просто сразу следующую итерацию
# Получение результатов из всех запущенных потоков
def results_consumer():
while not stop_event.is_set():
# ⬇️ проверяем, не пора ли в оффлайн (отложенный переход)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
off_at = globals().get("OFFLINE_SWITCH_AT")
if off_at is not None and time.time() >= off_at:
# делаем переход ТОЛЬКО если ещё не оффлайн
if globals().get("CURRENT_THREADS_MODE") != "offline":
logger.info("[status] switching to OFFLINE (delayed)")
stop_live_threads()
start_offline_threads(SEASON, GAME_ID)
# чтобы не повторять
globals()["OFFLINE_SWITCH_AT"] = None
try:
msg = results_q.get(timeout=0.5)
except queue.Empty:
continue
try:
source = msg.get("source")
payload = msg.get("data") or {}
# универсальный статус (может не быть)
incoming_status = payload.get("status") # может быть None
# print(source, incoming_status)
if source == "game":
# принимаем ТОЛЬКО тот game_id, который держим в PRELOAD_LOCK
try:
if PRELOAD_LOCK:
incoming_gid = extract_game_id_from_payload(payload)
if not incoming_gid or str(incoming_gid) != str(
PRELOADED_GAME_ID
):
logger.debug(
f"results_consumer: skip game (gid={incoming_gid}) due to PRELOAD_LOCK; keep {PRELOADED_GAME_ID}"
)
continue
except Exception as _e:
logger.debug(f"results_consumer: preload lock check error: {_e}")
else:
latest_data[source] = {
"ts": msg["ts"],
"data": incoming_status if incoming_status is not None else payload,
}
# 1) play-by-play
if "play-by-play" in source:
game = latest_data.get("game")
# если игра уже нормальная — приклеиваем плейи
if (
game
and isinstance(game, dict)
and "data" in game
and "result" in game["data"]
):
# у pbp тоже может не быть data/result
if "result" in payload:
game["data"]["result"]["plays"] = payload["result"]
# а вот статус у play-by-play иногда просто "no-status"
# 2) box-score
elif "box-score" in source:
game = latest_data.get("game")
if (
game
and "data" in game
and "result" in game["data"]
and "teams" in game["data"]["result"]
and "result" in payload
and "teams" in payload["result"]
):
# обновляем команды
game["data"]["result"]["game"]["fullScore"] = payload["result"][
"fullScore"
]
game["data"]["result"]["game"][
"score"
] = f'{payload["result"]["teams"][0]["total"]["points"]}:{payload["result"]["teams"][1]["total"]["points"]}'
for team in game["data"]["result"]["teams"]:
if team["teamNumber"] != 0:
box_team = [
t
for t in payload["result"]["teams"]
if t["teamNumber"] == team["teamNumber"]
]
if not box_team:
print("ERROR: box-score team not found")
continue
box_team = box_team[0]
for player in team["starts"]:
box_player = [
p
for p in box_team["starts"]
if p["startNum"] == player["startNum"]
]
if box_player:
player["stats"] = box_player[0]
team["total"] = box_team["total"]
team["startTotal"] = box_team["startTotal"]
team["benchTotal"] = box_team["benchTotal"]
team["maxLeading"] = box_team["maxLeading"]
team["pointsInRow"] = box_team["pointsInRow"]
team["maxPointsInRow"] = box_team["maxPointsInRow"]
elif "live-status" in source:
latest_data[source] = {
"ts": msg["ts"],
"data": payload,
}
try:
ls_data = payload.get("result") or payload
raw_ls_status = (
ls_data.get("status")
or ls_data.get("gameStatus")
or ls_data.get("state")
)
if raw_ls_status:
raw_ls_status_low = str(raw_ls_status).lower()
finished_markers = [
"finished",
"result",
"resultconfirmed",
"ended",
"final",
"game over",
]
# 1) матч ЗАКОНЧЕН → запускаем ОТСРОЧЕННЫЙ переход
if any(m in raw_ls_status_low for m in finished_markers):
now_ts = time.time()
# если ещё не назначали переход — назначим
if globals().get("OFFLINE_SWITCH_AT") is None:
switch_at = now_ts + globals().get(
"OFFLINE_DELAY_SEC", 600
)
globals()["OFFLINE_SWITCH_AT"] = switch_at
# статус тоже обозначим, что он завершён, но ждёт
if (
GAME_START_DT
and GAME_START_DT.date() == datetime.now().date()
):
globals()["STATUS"] = "finished_wait"
globals()[
"CLEAR_OUTPUT_FOR_VMIX"
] = True # 👈 включаем режим "пустых" данных
else:
globals()["STATUS"] = "finished_wait"
globals()[
"CLEAR_OUTPUT_FOR_VMIX"
] = True # 👈 включаем режим "пустых" данных
human_time = datetime.fromtimestamp(switch_at).strftime(
"%H:%M:%S"
)
logger.info(
f"[status] match finished → will switch to OFFLINE at {human_time} "
f"(in {globals().get('OFFLINE_DELAY_SEC', 600)}s)"
)
else:
# уже ждём — можно в debug
logger.debug(
"[status] match finished → OFFLINE already scheduled"
)
# 2) матч снова стал онлайном → СБРАСЫВАЕМ отложенный переход
elif (
"online" in raw_ls_status_low or "live" in raw_ls_status_low
):
# если до этого стояла отложка — уберём
globals()[
"CLEAR_OUTPUT_FOR_VMIX"
] = False # 👈 выключаем очистку
if globals().get("OFFLINE_SWITCH_AT") is not None:
logger.info(
"[status] match back to LIVE → cancel scheduled OFFLINE"
)
globals()["OFFLINE_SWITCH_AT"] = None
if globals().get("STATUS") != "live":
logger.info(
"[status] match became LIVE → switch to LIVE threads"
)
globals()["STATUS"] = "live"
start_live_threads(SEASON, GAME_ID)
except Exception as e:
logger.warning(
f"[{current_time}] results_consumer: live-status postprocess error: {e}"
)
else:
if source == "game":
has_game_already = "game" in latest_data and isinstance(
latest_data.get("game"), dict
)
# Полная структура?
is_full = (
isinstance(payload, dict)
and "data" in payload
and isinstance(payload["data"], dict)
and "result" in payload["data"]
and "teams" in payload["data"]["result"]
)
# ⚙️ ЛОГИКА:
# 1) Пока матч НЕ online (STATUS != 'live'): обновляем всегда,
# чтобы /status видел "живость" раз в 5 минут независимо от полноты JSON.
if globals().get("STATUS") != "live":
latest_data["game"] = {"ts": msg["ts"], "data": payload}
logger.debug(
"results_consumer: pre-live game → updated (full=%s)",
is_full,
)
else:
# ✅ если игры ещё НЕТ в кэше — примем ПЕРВЫЙ game даже неполный,
# чтобы box-score/play-by-play могли его дорастить
if is_full or not has_game_already:
latest_data["game"] = {"ts": msg["ts"], "data": payload}
logger.debug(
"results_consumer: LIVE → stored (full=%s, had=%s)",
is_full,
has_game_already,
)
else:
logger.debug(
"results_consumer: LIVE & partial game → keep previous one"
)
continue
else:
latest_data[source] = {
"ts": msg["ts"],
"data": payload,
}
continue
# ... остальная обработка ...
except Exception as e:
logger.warning(f"[{current_time}] results_consumer error: {repr(e)}")
continue
def get_items(data: dict) -> list:
"""
Мелкий хелпер: берём первый список в ответе API.
Многие ручки отдают {"result":[...]} или {"seasons":[...]}.
Если находим список — возвращаем его.
Если нет — возвращаем None (значит, нужно брать весь dict).
"""
for k, v in data.items():
if isinstance(v, list):
return data[k]
return None
def pick_game_for_team(calendar_json):
"""
Возвращает:
game_id: str | None
game_dt: datetime | None
is_today: bool
cal_status: str | None # Scheduled / Online / Result / ResultConfirmed
Логика:
1. если в календаре есть игра КОМАНДЫ на сегодня — берём ЕЁ и возвращаем её gameStatus
2. иначе — берём последнюю прошедшую и тоже возвращаем её gameStatus
"""
items = get_items(calendar_json)
if not items:
return None, None, False, None
today = datetime.now().date()
# 1) сначала — сегодняшняя
for game in reversed(items):
if game["team1"]["name"].lower() != TEAM.lower():
continue
gdt = extract_game_datetime(game)
gdate = (
gdt.date()
if gdt
else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date()
)
if gdate == today:
cal_status = game["game"].get("gameStatus")
return game["game"]["id"], gdt, True, cal_status
# 2) если на сегодня нет — берём последнюю прошедшую
last_id = None
last_dt = None
last_status = None
for game in reversed(items):
if game["team1"]["name"].lower() != TEAM.lower():
continue
gdt = extract_game_datetime(game)
gdate = (
gdt.date()
if gdt
else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date()
)
if gdate <= today:
last_id = game["game"]["id"]
last_dt = gdt
last_status = game["game"].get("gameStatus")
break
return last_id, last_dt, False, last_status
def extract_game_datetime(game_item: dict) -> datetime | None:
"""
Из элемента календаря достаём datetime матча.
В календаре есть localDate и часто localTime. Если localTime нет — берём 00:00.
"""
try:
date_str = game_item["game"]["localDate"] # '31.10.2025'
dt_date = datetime.strptime(date_str, "%d.%m.%Y").date()
time_str = game_item["game"].get("defaultZoneTime") # '19:30'
if time_str:
hh, mm = map(int, time_str.split(":"))
dt_time = dtime(hour=hh, minute=mm)
else:
dt_time = dtime(hour=0, minute=0)
return datetime.combine(dt_date, dt_time)
except Exception:
return None
def build_pretty_status_message():
"""
Собирает одно красивое сообщение про текущее состояние онлайна.
Если game ещё нет — шлём хотя бы статусы источников.
"""
lines = []
cgid = get_cached_game_id()
lines.append(f"🏀 {LEAGUE.upper()} • {TEAM}")
lines.append(f"📌 Game ID: {cgid or GAME_ID}")
lines.append(f"🕒 {GAME_START_DT}")
# сначала попробуем собрать нормальный game
game_wrap = latest_data.get("game")
has_game = False
if game_wrap:
raw = game_wrap.get("data") if isinstance(game_wrap, dict) else game_wrap
# raw может быть: dict (полный payload) | dict (уже result) | str ("ok"/"no-status")
result = {}
if isinstance(raw, dict):
# ваш нормальный полный ответ по game имеет структуру: {"data": {"result": {...}}}
# но на всякий случай поддержим и вариант, где сразу {"result": {...}} или уже {"game": ...}
result = (
raw.get("data", {}).get("result", {})
if "data" in raw
else (raw.get("result") or raw)
)
else:
result = {}
game_info = result.get("game") or {}
team1_name = (result.get("team1") or {}).get("name", "Team 1")
team2_name = (result.get("team2") or {}).get("name", "Team 2")
lines.append(f"👥 {team1_name} vs {team2_name}")
score_now = game_info.get("score") or ""
full_score = game_info.get("fullScore") or ""
if score_now:
lines.append(f"🔢 Score: {score_now}")
if isinstance(full_score, str) and full_score:
quarters = full_score.split(",")
q_text = " | ".join(f"Q{i+1} {q}" for i, q in enumerate(quarters) if q)
if q_text:
lines.append(f"🧱 By quarters: {q_text}")
has_game = bool(result)
# live-status отдельно
# ls = latest_data.get("live-status", {})
# ls_raw = ls.get("data") or {}
# ls_status = (
# ls_raw.get("status") or ls_raw.get("gameStatus") or ls_raw.get("state") or "—"
# )
# lines.append(f"🟢 LIVE status: {ls_status}")
ls_wrap = latest_data.get("live-status")
ls_status = "—"
if ls_wrap:
raw = ls_wrap.get("data")
if isinstance(raw, dict):
ls_dict = raw.get("result") or raw
ls_status = (
ls_dict.get("status")
or ls_dict.get("gameStatus")
or ls_dict.get("state")
or "—"
)
elif isinstance(raw, str):
# API/consumer могли положить просто строку статуса: "ok", "no-status", "error"
ls_status = raw
lines.append(f"🟢 LIVE status: {ls_status}")
# добавим блок по источникам — это как раз “состояние запросов”
sort_order = ["game", "live-status", "box-score", "play-by-play"]
keys = [k for k in sort_order if k in latest_data] + sorted(
[k for k in latest_data if k not in sort_order]
)
src_lines = []
for k in keys:
d = latest_data.get(k) or {}
ts = d.get("ts", "—")
dat = d.get("data")
if isinstance(dat, dict) and "status" in dat:
st = str(dat["status"]).lower()
else:
st = str(dat).lower()
# Эмодзи-кружки для статусов
if any(x in st for x in ["ok", "success", "live", "online"]):
emoji = "🟢"
elif any(x in st for x in ["error", "fail", "no-status", "none", "timeout"]):
emoji = "🔴"
else:
emoji = "🟡"
src_lines.append(f"{emoji} {k}: {st} ({ts})")
if src_lines:
lines.append("📡 Sources:")
lines.extend(src_lines)
# даже если game не успел — мы всё равно что-то вернём
return "\n".join(lines)
def status_broadcaster():
"""
Если матч live — сразу шлём статус.
Потом — раз в 5 минут.
Если матч не live — ждём и проверяем снова.
"""
INTERVAL = 300 # 5 минут
last_text = None
first_live_sent = False
while not stop_event.is_set():
# если игра не идёт — спим по чуть-чуть и крутимся
if STATUS not in ("live", "live_soon"):
first_live_sent = False # чтобы при новом лайве снова сразу отправить
time.sleep(5)
continue
# сюда попадаем только если live
text = build_pretty_status_message()
if text and text != last_text:
logger.info(text)
last_text = text
first_live_sent = True
# после первого лайва ждём 5 минут, а до него — 10 секунд
wait_sec = INTERVAL if first_live_sent else 10
for _ in range(wait_sec):
if stop_event.is_set():
break
time.sleep(1)
def get_cached_game_id() -> str | None:
game = latest_data.get("game")
if not game:
return None
payload = game.get("data", game)
if not isinstance(payload, dict):
return None
# структура может быть {"data":{"result":{...}}} или {"result":{...}}
result = (
payload.get("data", {}).get("result")
if "data" in payload
else payload.get("result")
)
if not isinstance(result, dict):
return None
g = result.get("game")
if isinstance(g, dict):
return g.get("id")
return None
def extract_game_id_from_payload(payload: dict) -> str | None:
if not isinstance(payload, dict):
return None
root = payload.get("data") if isinstance(payload.get("data"), dict) else payload
res = root.get("result") if isinstance(root.get("result"), dict) else None
if not isinstance(res, dict):
return None
g = res.get("game")
if isinstance(g, dict):
return g.get("id")
return None
def start_offline_prevgame(season, prev_game_id: str):
"""
Специальный оффлайн для ПРЕДЫДУЩЕЙ игры:
- гасит любые текущие треды
- запускает только 'game' для prev_game_id
- НЕ останавливается после первого 'ok' (stop_after_success=False)
"""
global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data
# всегда переключаемся чисто
stop_live_threads()
stop_offline_threads()
logger.info("[threads] switching to OFFLINE mode (previous game) ...")
stop_event_offline.clear()
threads_offline = [
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=prev_game_id, lang=LANG),
300, # редкий опрос
stop_event_offline,
False, # stop_when_live
False, # ✅ stop_after_success=False (держим тред)
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST,
league=LEAGUE,
season=season,
game_id=prev_game_id,
lang=LANG,
),
600,
stop_event_offline,
False,
False,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
600,
stop_event_offline,
False,
False,
),
daemon=True,
),
]
for t in threads_offline:
t.start()
CURRENT_THREADS_MODE = "offline"
logger.info(f"[threads] OFFLINE prev-game thread started for {prev_game_id}")
def start_prestart_watcher(game_dt: datetime | None):
"""
Логика на день игры:
1) Немедленно подгружаем ДАННЫЕ ПРОШЛОГО МАТЧА (один раз, оффлайн-поток 'game'),
чтобы программа имела данные до старта.
2) Ровно за 1:15 до начала — СБРАСЫВАЕМ эти данные (останавливаем оффлайн, чистим latest_data).
3) Ровно за 1:10 до начала — ВКЛЮЧАЕМ LIVE-треды.
"""
if not game_dt:
return
# разовое уведомление о "спячке"
global SLEEP_NOTICE_SENT, STATUS, SEASON, GAME_ID
now = datetime.now()
if not SLEEP_NOTICE_SENT and game_dt > now:
logger.info(
"🛌 Тред ушёл в спячку до начала игры.\n"
f"⏰ Матч начинается сегодня в {game_dt.strftime('%H:%M')}."
)
SLEEP_NOTICE_SENT = True
def _runner():
from datetime import time as dtime # для резервного парсинга времени
global STATUS
PRELOAD_LEAD = timedelta(hours=1, minutes=15) # T-1:15 → сброс
LIVE_LEAD = timedelta(hours=1, minutes=10) # T-1:10 → live
RESET_AT = game_dt - PRELOAD_LEAD
LIVE_AT = game_dt - LIVE_LEAD
PRELOAD_MAXWAIT_SEC = 180 # ждём до 3 мин готовности full game при предзагрузке
did_preload = False
did_reset = False
did_live = False
# --- вспомогательное: поиск предыдущей игры команды ДО сегодняшнего матча ---
def _find_prev_game_id(
calendar_json: dict, cutoff_dt: datetime
) -> tuple[str | None, datetime | None]:
items = get_items(calendar_json) or []
prev_id, prev_dt = None, None
team_norm = (TEAM or "").strip().casefold()
for g in reversed(items):
try:
t1 = (g["team1"]["name"] or "").strip().casefold()
t2 = (g["team2"]["name"] or "").strip().casefold()
if team_norm not in (t1, t2):
continue
except Exception:
continue
gdt = extract_game_datetime(g)
if not gdt:
try:
gd = datetime.strptime(
g["game"]["localDate"], "%d.%m.%Y"
).date()
gdt = datetime.combine(gd, dtime(0, 0))
except Exception:
continue
if gdt < cutoff_dt:
prev_id, prev_dt = g["game"]["id"], gdt
break
return prev_id, prev_dt
# --- Шаг 1: сразу включаем оффлайн по ПРЕДЫДУЩЕЙ игре и держим до T-1:15 ---
try:
now = datetime.now()
if now < RESET_AT:
calendar_resp = requests.get(
URLS["calendar"].format(
host=HOST, league=LEAGUE, season=SEASON, lang=LANG
),
timeout=6,
).json()
prev_game_id, prev_game_dt = _find_prev_game_id(calendar_resp, game_dt)
if prev_game_id and str(prev_game_id) != str(GAME_ID):
logger.info(
f"[preload] старт оффлайна по предыдущей игре {prev_game_id} ({prev_game_dt})"
)
# включаем «замок», чтобы consumer принимал только старую игру
globals()["PRELOAD_LOCK"] = True
globals()["PRELOADED_GAME_ID"] = str(prev_game_id)
globals()["PRELOAD_HOLD_UNTIL"] = RESET_AT.timestamp()
# поднимаем один оффлайн-тред по старой игре (без stop_after_success)
start_offline_prevgame(SEASON, prev_game_id)
did_preload = True
else:
logger.warning("[preload] предыдущая игра не найдена — пропускаем")
else:
logger.info(
"[preload] уже поздно для предзагрузки (прошло T-1:15) — пропуск"
)
except Exception as e:
logger.warning(f"[preload] ошибка предзагрузки прошлой игры: {e}")
# --- Основной цикл ожидания контрольных моментов ---
while not stop_event.is_set():
now = datetime.now()
# если матч уже в другом конечном состоянии — выходим
if STATUS in ("live", "finished", "finished_wait", "finished_today"):
break
# Шаг 2: ровно T-1:15 — сбрасываем предзагруженные данные
if not did_reset and now >= RESET_AT:
logger.info(
f"[reset] {now:%H:%M:%S} → T-1:15: сбрасываем предзагруженные данные"
)
try:
stop_offline_threads() # на всякий
# for key in latest_data:
# latest_data[key] = wipe_json_values(latest_data[key])
# latest_data.clear() # полный сброс кэша
# снять замок предзагрузки
globals()["PRELOAD_LOCK"] = False
globals()["PRELOADED_GAME_ID"] = None
globals()["PRELOAD_HOLD_UNTIL"] = None
logger.info(
"[reset] latest_data очищен; ждём T-1:10 для запуска live"
)
except Exception as e:
logger.warning(f"[reset] ошибка при очистке: {e}")
globals()["CLEAR_OUTPUT_FOR_VMIX"] = True
did_reset = True
# Шаг 3: T-1:10 — включаем live-треды
if not did_live and now >= LIVE_AT:
logger.info(
f"[prestart] {now:%H:%M:%S}, игра в {game_dt:%H:%M}, включаем LIVE threads по правилу T-1:10"
)
STATUS = "live_soon"
globals()[
"CLEAR_OUTPUT_FOR_VMIX"
] = False # можно оставить пустоту до первых живых данных
stop_offline_threads() # на всякий случай
start_live_threads(SEASON, GAME_ID)
did_live = True
break
time.sleep(15)
t = threading.Thread(target=_runner, daemon=True)
t.start()
@asynccontextmanager
async def lifespan(app: FastAPI):
global STATUS, GAME_ID, SEASON, GAME_START_DT, GAME_TODAY, GAME_SOON
# 1. проверяем API: seasons
try:
seasons_resp = requests.get(
URLS["seasons"].format(host=HOST, league=LEAGUE)
).json()
season = seasons_resp["items"][0]["season"]
except Exception:
now = datetime.now()
if now.month > 9:
season = now.year + 1
else:
season = now.year
logger.info(f"предположили номер сезона: {season}")
SEASON = season
# 2. берём календарь
try:
calendar = requests.get(
URLS["calendar"].format(host=HOST, league=LEAGUE, season=season, lang=LANG)
).json()
except Exception as ex:
logger.error(f"не получилось проверить работу API. код ошибки: {ex}")
# тут можно вообще не запускать сервер, но оставим как есть
calendar = None
# 3. определяем игру
game_id, game_dt, is_today, cal_status = (
pick_game_for_team(calendar) if calendar else (None, None, False, None)
)
GAME_ID = game_id
GAME_START_DT = game_dt
GAME_TODAY = is_today
logger.info(f"Лига: {LEAGUE}\nСезон: {season}\nКоманда: {TEAM}\nGame ID: {game_id}")
# 4. запускаем "длинные" потоки (они у тебя и так всегда)
thread_result_consumer = threading.Thread(
target=results_consumer,
daemon=True,
)
thread_result_consumer.start()
thread_status_broadcaster = threading.Thread(
target=status_broadcaster,
daemon=True,
)
thread_status_broadcaster.start()
# 5. решаем, что запускать
if not is_today:
STATUS = "no_game_today"
start_offline_threads(SEASON, GAME_ID)
else:
# игра сегодня
# в любом случае запускаем сторож, если знаем время игры
start_prestart_watcher(game_dt)
if cal_status is None:
STATUS = "today_not_started"
start_offline_threads(SEASON, GAME_ID)
elif cal_status == "Scheduled":
if game_dt:
delta = game_dt - datetime.now()
# если мы уже МЕНЬШЕ чем за 1:10 до игры — сразу в live
if delta <= timedelta(hours=1, minutes=10):
STATUS = "live_soon"
start_live_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
# start_offline_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
# start_offline_threads(SEASON, GAME_ID)
elif cal_status == "Online":
STATUS = "live"
start_live_threads(SEASON, GAME_ID)
elif cal_status in ["Result", "ResultConfirmed"]:
STATUS = "finished_today"
start_offline_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
start_offline_threads(SEASON, GAME_ID)
yield
# -------- shutdown --------
stop_event.set()
stop_live_threads()
stop_offline_threads()
thread_result_consumer.join(timeout=1)
thread_status_broadcaster.join(timeout=1)
app = FastAPI(
lifespan=lifespan,
docs_url=None, # ❌ отключает /docs
redoc_url=None, # ❌ отключает /redoc
openapi_url=None, # ❌ отключает /openapi.json
)
def format_time(seconds: float | int) -> str:
"""
Удобный формат времени для игроков:
71 -> "1:11"
0 -> "0:00"
Любые кривые значения -> "0:00".
"""
try:
total_seconds = int(float(seconds))
minutes = total_seconds // 60
sec = total_seconds % 60
return f"{minutes}:{sec:02}"
except (ValueError, TypeError):
return "0:00"
@app.get("/team1")
async def team1():
game = get_latest_game_safe("game")
if not game:
# если данных вообще нет (ещё ни одной игры) — тут реально нечего отдавать
raise HTTPException(status_code=503, detail="game data not ready")
data = await team("team1")
return maybe_clear_for_vmix(data)
@app.get("/team2")
async def team2():
game = get_latest_game_safe("game")
if not game:
raise HTTPException(status_code=503, detail="game data not ready")
data = await team("team2")
return maybe_clear_for_vmix(data)
@app.get("/top_team1")
async def top_team1():
data = await team("team1")
top = await top_sorted_team(data)
return maybe_clear_for_vmix(top)
@app.get("/top_team2")
async def top_team2():
data = await team("team2")
top = await top_sorted_team(data)
return maybe_clear_for_vmix(top)
def _b(v) -> bool:
if isinstance(v, bool):
return v
if isinstance(v, (int, float)):
return v != 0
if isinstance(v, str):
return v.strip().lower() in ("1", "true", "yes", "on")
return False
def _placeholders(n=5):
return [
{
"NameGFX": "",
"Name1GFX": "",
"Name2GFX": "",
"isOnCourt": False,
"num": "",
"photoGFX": EMPTY_PHOTO_PATH,
}
for _ in range(n)
]
def wipe_json_values(obj):
"""
Рекурсивно заменяет все значения JSON на пустые строки.
Если ключ содержит "photo", заменяет значение на EMPTY_PHOTO_PATH.
"""
# если словарь — обрабатываем ключи
if isinstance(obj, dict):
new_dict = {}
for k, v in obj.items():
if "photo" in str(k).lower():
# ключ содержит photo → отдаём пустую картинку
new_dict[k] = EMPTY_PHOTO_PATH
else:
new_dict[k] = wipe_json_values(v)
return new_dict
# если список — рекурсивно обработать элементы
elif isinstance(obj, list):
return [wipe_json_values(v) for v in obj]
# любое конечное значение → ""
else:
return ""
@app.get("/started_team1")
async def started_team1(sort_by: str = None):
data = await team("team1")
players = await started_team(data) or []
# нормализуем флаги
for p in players:
p["isStart"] = _b(p.get("isStart", False))
p["isOnCourt"] = _b(p.get("isOnCourt", False))
if sort_by and sort_by.strip().lower() == "isstart":
starters = [p for p in players if p["isStart"]]
return maybe_clear_for_vmix(starters[:5] if starters else _placeholders(5))
if sort_by and sort_by.strip().lower() == "isoncourt":
on_court = [p for p in players if p["isOnCourt"]]
return maybe_clear_for_vmix(on_court[:5] if on_court else _placeholders(5))
# дефолт — без фильтра, как раньше
return maybe_clear_for_vmix(players)
@app.get("/started_team2")
async def started_team2(sort_by: str = None):
data = await team("team2")
players = await started_team(data) or []
for p in players:
p["isStart"] = _b(p.get("isStart", False))
p["isOnCourt"] = _b(p.get("isOnCourt", False))
if sort_by and sort_by.strip().lower() == "isstart":
starters = [p for p in players if p["isStart"]]
return maybe_clear_for_vmix(starters[:5] if starters else _placeholders(5))
if sort_by and sort_by.strip().lower() == "isoncourt":
on_court = [p for p in players if p["isOnCourt"]]
return maybe_clear_for_vmix(on_court[:5] if on_court else _placeholders(5))
return maybe_clear_for_vmix(players)
@app.get("/latest_data")
async def game():
return latest_data
@app.get("/status")
async def status(request: Request):
global STATUS # будем его править, если live-status свежее
def color_for_status(status_value: str) -> str:
"""Подбор цвета статуса в HEX"""
status_value = str(status_value).lower()
if status_value in ["ok", "success", "live", "live_soon", "online"]:
return "#00FF00" # зелёный
elif status_value in ["scheduled", "today_not_started", "upcoming"]:
return "#FFFF00" # жёлтый
elif status_value in [
"result",
"resultconfirmed",
"finished",
"finished_today",
]:
return "#FF0000" # красный
elif status_value in ["no_game_today", "unknown", "none"]:
return "#FFFFFF" # белый
else:
return "#FF7700" # серый (неизвестный статус)
# ✳️ сортируем latest_data в нужном порядке
sort_order = ["game", "live-status", "box-score", "play-by-play"]
sorted_keys = [k for k in sort_order if k in latest_data] + sorted(
[k for k in latest_data if k not in sort_order]
)
cached_game_id = get_cached_game_id() or GAME_ID
note = ""
if cached_game_id and GAME_ID and str(cached_game_id) != str(GAME_ID):
note = (
f' (предзагружены данные прошлой игры)'
)
data = {
"league": LEAGUE,
"team": TEAM,
"game_id": cached_game_id,
"game_status": STATUS,
"statuses": [
{
"name": TEAM,
"status": STATUS,
"ts": (
GAME_START_DT.strftime("%Y-%m-%d %H:%M") if GAME_START_DT else "N/A"
),
"link": LEAGUE,
"color": color_for_status(STATUS),
}
]
+ [
{
"name": item,
"status": (
latest_data[item]["data"]["status"]
if isinstance(latest_data[item]["data"], dict)
and "status" in latest_data[item]["data"]
else latest_data[item]["data"]
),
"ts": latest_data[item]["ts"],
"link": URLS[item].format(
host=HOST,
league=LEAGUE,
season=SEASON,
lang=LANG,
game_id=cached_game_id,
),
"color": color_for_status(
latest_data[item]["data"]["status"]
if isinstance(latest_data[item]["data"], dict)
and "status" in latest_data[item]["data"]
else latest_data[item]["data"]
),
}
for item in sorted_keys # ← используем отсортированный порядок
],
}
accept = request.headers.get("accept", "")
if "text/html" in accept:
status_raw = str(STATUS).lower()
# print(status_raw)
if status_raw in ["live", "online"]:
gs_class = "live"
gs_text = "🟢 LIVE"
elif status_raw in ["live_soon", "today_not_started"]:
gs_class = "live"
gs_text = "🟢 GAME TODAY (soon)"
elif status_raw in ["finished_wait"]:
gs_class = "upcoming"
# покажем, что он ДОЖИДАЕТСЯ оффлайна
off_at = OFFLINE_SWITCH_AT
if off_at:
human = datetime.fromtimestamp(off_at).strftime("%H:%M:%S")
gs_text = f"🟡 Game finished, cooling down → OFFLINE at {human}"
else:
human = "N/A"
gs_text = "🟡 Game finished, cooling down"
elif status_raw in ["finished_today", "finished"]:
gs_class = "finished"
gs_text = "🔴 Game finished"
else:
gs_class = status_raw
gs_text = "⚪ Unknown"
html = f"""
League: {LEAGUE}
Team: {TEAM}
Game ID: {cached_game_id}{note}
Game Status: {gs_text}
| Name | Status | Timestamp | Link |
|---|---|---|---|
| {s["name"]} | {status_text} | {s["ts"]} | {s["link"]} |