Files
RFB/get_data.py

1970 lines
67 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI
from fastapi.responses import Response, HTMLResponse
from fastapi import HTTPException
from fastapi import Request
from contextlib import asynccontextmanager
import requests
import threading
import time
import queue
import argparse
import uvicorn
import os
import pandas as pd
import json
from datetime import datetime, time as dtime, timedelta
from fastapi.responses import Response
import logging
import logging.config
import platform
# передадим параметры через аргументы или глобальные переменные
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument("--league", default="vtb")
parser.add_argument("--team", required=True)
parser.add_argument("--lang", default="en")
args = parser.parse_args()
MYHOST = platform.node()
if not os.path.exists("logs"):
os.makedirs("logs")
telegram_bot_token = "7639240596:AAH0YtdQoWZSC-_R_EW4wKAHHNLIA0F_ARY"
# TELEGRAM_CHAT_ID = 228977654
telegram_chat_id = -4803699526
log_config = {
"version": 1,
"handlers": {
"telegram": {
"class": "telegram_handler.TelegramHandler",
"level": "INFO",
"token": telegram_bot_token,
"chat_id": telegram_chat_id,
"formatter": "telegram",
},
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "simple",
"filename": f"logs/GFX_{MYHOST}.log",
"encoding": "utf-8",
},
},
"loggers": {
__name__: {"handlers": ["console", "file", "telegram"], "level": "DEBUG"},
},
"formatters": {
"telegram": {
"class": "telegram_handler.HtmlFormatter",
"format": f"%(levelname)s <b>[{MYHOST.upper()}]</b> %(message)s",
"use_emoji": "True",
},
"simple": {
"class": "logging.Formatter",
"format": "%(asctime)s %(levelname)-8s %(funcName)s() - %(message)s",
"datefmt": "%d.%m.%Y %H:%M:%S",
},
},
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
logger.handlers[2].formatter.use_emoji = True
LEAGUE = args.league
TEAM = args.team
LANG = args.lang
HOST = "https://pro.russiabasket.org"
STATUS = False
GAME_ID = None
SEASON = None
GAME_START_DT = None # datetime начала матча (локальная из календаря)
GAME_TODAY = False # флаг: игра сегодня
GAME_SOON = False # флаг: игра сегодня и скоро (<1 часа)
# общая очередь
results_q = queue.Queue()
# тут будем хранить последние данные
latest_data = {}
# событие для остановки потоков
stop_event = threading.Event()
# отдельные события для разных наборов потоков
stop_event_live = threading.Event()
stop_event_offline = threading.Event()
# чтобы из consumer можно было их гасить
threads_live = []
threads_offline = []
# какой режим сейчас запущен: "live" / "offline" / None
CURRENT_THREADS_MODE = None
URLS = {
"seasons": "{host}/api/abc/comps/seasons?Tag={league}",
"actual-standings": "{host}/api/abc/comps/actual-standings?tag={league}&season={season}&lang={lang}",
"calendar": "{host}/api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000",
"game": "{host}/api/abc/games/game?Id={game_id}&Lang={lang}",
"pregame": "{host}/api/abc/games/pregame?tag={league}&season={season}&id={game_id}&lang={lang}",
"pregame-full-stats": "{host}/api/abc/games/pregame-full-stats?tag={league}&season={season}&id={game_id}&lang={lang}",
"live-status": "{host}/api/abc/games/live-status?id={game_id}",
"box-score": "{host}/api/abc/games/box-score?id={game_id}",
"play-by-play": "{host}/api/abc/games/play-by-play?id={game_id}",
}
def start_offline_threads(season, game_id):
"""Запускаем редкие запросы, когда матча нет или он уже сыгран."""
global threads_offline, CURRENT_THREADS_MODE, stop_event_offline, latest_data
if CURRENT_THREADS_MODE == "offline":
return
stop_live_threads()
# 🔹 очищаем latest_data безопасно, чтобы не ломать структуру
keep_keys = {"game", "pregame", "pregame-full-stats", "actual-standings", "calendar"}
for key in list(latest_data.keys()):
if key not in keep_keys:
del latest_data[key]
stop_event_offline.clear()
threads_offline = [
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
1, # опрашиваем раз в секунду/реже
stop_event_offline,
),
daemon=True,
),
# 👇 чтобы офлайн всё равно проверял live-status раз в минуту
threading.Thread(
target=get_data_from_API,
args=(
"live-status",
URLS["live-status"].format(host=HOST, game_id=game_id),
1 / 60, # раз в 60 секунд
stop_event_offline,
),
daemon=True,
),
]
for t in threads_offline:
t.start()
CURRENT_THREADS_MODE = "offline"
logger.info("[threads] OFFLINE threads started (data cleaned)")
def start_live_threads(season, game_id):
"""Запускаем частые онлайн-запросы, когда матч идёт/вот-вот."""
global threads_live, CURRENT_THREADS_MODE, stop_event_live
# если уже в лайве — не дублируем
if CURRENT_THREADS_MODE == "live":
return
# на всякий случай гасим офлайн
stop_offline_threads()
stop_event_live.clear()
threads_live = [
threading.Thread(
target=get_data_from_API,
args=(
"pregame",
URLS["pregame"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
0.0016667,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
0.0016667,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
0.0016667,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
0.00016, # часто
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"live-status",
URLS["live-status"].format(host=HOST, game_id=game_id),
1,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"box-score",
URLS["box-score"].format(host=HOST, game_id=game_id),
1,
stop_event_live,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"play-by-play",
URLS["play-by-play"].format(host=HOST, game_id=game_id),
1,
stop_event_live,
),
daemon=True,
),
]
for t in threads_live:
t.start()
CURRENT_THREADS_MODE = "live"
logger.info("[threads] LIVE threads started")
def stop_live_threads():
"""Гасим только live-треды."""
global threads_live
if not threads_live:
return
stop_event_live.set()
for t in threads_live:
t.join(timeout=1)
threads_live = []
logger.info("[threads] LIVE threads stopped")
def stop_offline_threads():
"""Гасим только offline-треды."""
global threads_offline
if not threads_offline:
return
stop_event_offline.set()
for t in threads_offline:
t.join(timeout=1)
threads_offline = []
logger.info("[threads] OFFLINE threads stopped")
# Функция запускаемая в потоках
def get_data_from_API(
name: str, url: str, quantity: float, stop_event: threading.Event
):
if quantity <= 0:
raise ValueError("quantity must be > 0")
sleep_time = 1.0 / quantity # это и есть "раз в N секунд"
while not stop_event.is_set():
start = time.time()
try:
value = requests.get(url, timeout=5).json()
except Exception as ex:
value = {"error": str(ex)}
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
results_q.put({"source": name, "ts": ts, "data": value})
logger.debug(f"[{ts}] name: {name}, status: {value.get('status', 'no-status')}")
# сколько уже заняло
elapsed = time.time() - start
# сколько надо доспать, чтобы в сумме вышла нужная частота
to_sleep = sleep_time - elapsed
if to_sleep > 0:
time.sleep(to_sleep)
# если запрос занял дольше — просто сразу следующую итерацию
# Получение результатов из всех запущенных потоков
def results_consumer():
while not stop_event.is_set():
try:
msg = results_q.get(timeout=0.5)
except queue.Empty:
continue
try:
source = msg.get("source")
payload = msg.get("data") or {}
# универсальный статус (может не быть)
incoming_status = payload.get("status") # может быть None
# 1) play-by-play
if "play-by-play" in source:
game = latest_data.get("game")
# если игра уже нормальная — приклеиваем плейи
if (
game
and isinstance(game, dict)
and "data" in game
and "result" in game["data"]
):
# у pbp тоже может не быть data/result
if "result" in payload:
game["data"]["result"]["plays"] = payload["result"]
# а вот статус у play-by-play иногда просто "no-status"
latest_data[source] = {
"ts": msg["ts"],
"data": incoming_status if incoming_status is not None else payload,
}
# 2) box-score
elif "box-score" in source:
game = latest_data.get("game")
if (
game
and "data" in game
and "result" in game["data"]
and "teams" in game["data"]["result"]
and "result" in payload
and "teams" in payload["result"]
):
# обновляем команды
game["data"]["result"]["game"]["fullScore"] = payload["result"][
"fullScore"
]
game["data"]["result"]["game"]["score"] = f'{payload["result"]["teams"][0]["total"]["points"]}:{payload["result"]["teams"][1]["total"]["points"]}'
for team in game["data"]["result"]["teams"]:
if team["teamNumber"] != 0:
box_team = [
t
for t in payload["result"]["teams"]
if t["teamNumber"] == team["teamNumber"]
]
if not box_team:
print("ERROR: box-score team not found")
continue
box_team = box_team[0]
for player in team["starts"]:
box_player = [
p
for p in box_team["starts"]
if p["startNum"] == player["startNum"]
]
if box_player:
player["stats"] = box_player[0]
team["total"] = box_team["total"]
team["startTotal"] = box_team["startTotal"]
team["benchTotal"] = box_team["benchTotal"]
team["maxLeading"] = box_team["maxLeading"]
team["pointsInRow"] = box_team["pointsInRow"]
team["maxPointsInRow"] = box_team["maxPointsInRow"]
# в любом случае сохраняем сам факт, что box-score пришёл
latest_data[source] = {
"ts": msg["ts"],
"data": incoming_status if incoming_status is not None else payload,
}
elif "live-status" in source:
latest_data[source] = {
"ts": msg["ts"],
"data": payload,
}
try:
ls_data = payload.get("result") or payload
raw_ls_status = (
ls_data.get("status")
or ls_data.get("gameStatus")
or ls_data.get("state")
)
if raw_ls_status:
raw_ls_status_low = str(raw_ls_status).lower()
finished_markers = [
"finished",
"result",
"resultconfirmed",
"ended",
"final",
"game over",
]
# матч ЗАКОНЧЕН → гасим live и включаем offline
if any(m in raw_ls_status_low for m in finished_markers):
logger.info("[status] match finished → switch to OFFLINE")
if (
GAME_START_DT
and GAME_START_DT.date() == datetime.now().date()
):
globals()["STATUS"] = "finished_today"
else:
globals()["STATUS"] = "finished"
stop_live_threads()
start_offline_threads(SEASON, GAME_ID)
# матч СТАЛ онлайном (напр., из Scheduled → Online)
elif (
"online" in raw_ls_status_low or "live" in raw_ls_status_low
):
if globals().get("STATUS") not in ["live", "live_soon"]:
logger.info(
"[status] match became LIVE → switch to LIVE threads"
)
globals()["STATUS"] = "live"
start_live_threads(SEASON, GAME_ID)
except Exception as e:
logger.warning(
"results_consumer: live-status postprocess error:", e
)
else:
if source == "game":
has_game_already = "game" in latest_data
# есть ли в ответе ПОЛНАЯ структура
is_full = (
"data" in payload
and isinstance(payload["data"], dict)
and "result" in payload["data"]
)
if is_full:
# полный game — всегда кладём
latest_data["game"] = {
"ts": msg["ts"],
"data": payload,
}
else:
# game неполный
if not has_game_already:
# 👉 раньше game вообще не было — лучше положить хоть что-то
latest_data["game"] = {
"ts": msg["ts"],
"data": payload,
}
else:
# 👉 уже есть какой-то game — неполным НЕ затираем
logger.debug(
"results_consumer: got partial game, keeping previous one"
)
# и обязательно continue/return из этого elif/if
else:
latest_data[source] = {
"ts": msg["ts"],
"data": payload,
}
continue
# ... остальная обработка ...
except Exception as e:
logger.warning("results_consumer error:", repr(e))
continue
def get_items(data: dict) -> list:
"""
Мелкий хелпер: берём первый список в ответе API.
Многие ручки отдают {"result":[...]} или {"seasons":[...]}.
Если находим список — возвращаем его.
Если нет — возвращаем None (значит, нужно брать весь dict).
"""
for k, v in data.items():
if isinstance(v, list):
return data[k]
return None
def pick_game_for_team(calendar_json):
"""
Возвращает:
game_id: str | None
game_dt: datetime | None
is_today: bool
cal_status: str | None # Scheduled / Online / Result / ResultConfirmed
Логика:
1. если в календаре есть игра КОМАНДЫ на сегодня — берём ЕЁ и возвращаем её gameStatus
2. иначе — берём последнюю прошедшую и тоже возвращаем её gameStatus
"""
items = get_items(calendar_json)
if not items:
return None, None, False, None
today = datetime.now().date()
# 1) сначала — сегодняшняя
for game in reversed(items):
if game["team1"]["name"].lower() != TEAM.lower():
continue
gdt = extract_game_datetime(game)
gdate = (
gdt.date()
if gdt
else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date()
)
if gdate == today:
cal_status = game["game"].get("gameStatus")
return game["game"]["id"], gdt, True, cal_status
# 2) если на сегодня нет — берём последнюю прошедшую
last_id = None
last_dt = None
last_status = None
for game in reversed(items):
if game["team1"]["name"].lower() != TEAM.lower():
continue
gdt = extract_game_datetime(game)
gdate = (
gdt.date()
if gdt
else datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date()
)
if gdate <= today:
last_id = game["game"]["id"]
last_dt = gdt
last_status = game["game"].get("gameStatus")
break
return last_id, last_dt, False, last_status
def extract_game_datetime(game_item: dict) -> datetime | None:
"""
Из элемента календаря достаём datetime матча.
В календаре есть localDate и часто localTime. Если localTime нет — берём 00:00.
"""
try:
date_str = game_item["game"]["localDate"] # '31.10.2025'
dt_date = datetime.strptime(date_str, "%d.%m.%Y").date()
time_str = game_item["game"].get("defaultZoneTime") # '19:30'
if time_str:
hh, mm = map(int, time_str.split(":"))
dt_time = dtime(hour=hh, minute=mm)
else:
dt_time = dtime(hour=0, minute=0)
return datetime.combine(dt_date, dt_time)
except Exception:
return None
def build_pretty_status_message():
"""
Собирает одно красивое сообщение про текущее состояние онлайна.
Если game ещё нет — шлём хотя бы статусы источников.
"""
lines = []
lines.append(f"\n🏀 <b>{LEAGUE.upper()}</b> • {TEAM}")
lines.append(f"📌 Game ID: <code>{GAME_ID}</code>")
lines.append(f"🕒 {GAME_START_DT}")
# сначала попробуем собрать нормальный game
game_wrap = latest_data.get("game")
has_game = False
if game_wrap:
game_data = game_wrap.get("data") or game_wrap
result = game_data.get("result") or {}
game_info = result.get("game") or {}
team1_name = result["team1"]["name"]
team2_name = result["team2"]["name"]
score_now = game_info.get("score") or ""
full_score = game_info.get("fullScore") or ""
lines.append(f"👥 {team1_name} vs {team2_name}")
if score_now:
lines.append(f"🔢 Score: <b>{score_now}</b>")
if isinstance(full_score, str) and full_score:
quarters = full_score.split(",")
q_text = " | ".join(
f"Q{i+1} {q}" for i, q in enumerate(quarters) if q
)
if q_text:
lines.append(f"🧱 By quarters: {q_text}")
has_game = True
# live-status отдельно
ls = latest_data.get("live-status", {})
ls_raw = ls.get("data") or {}
ls_status = (
ls_raw.get("status")
or ls_raw.get("gameStatus")
or ls_raw.get("state")
or ""
)
lines.append(f"🟢 LIVE status: <b>{ls_status}</b>")
# добавим блок по источникам — это как раз “состояние запросов”
sort_order = ["game", "live-status", "box-score", "play-by-play"]
keys = [k for k in sort_order if k in latest_data] + sorted(
[k for k in latest_data if k not in sort_order]
)
src_lines = []
for k in keys:
d = latest_data.get(k) or {}
ts = d.get("ts", "")
dat = d.get("data")
if isinstance(dat, dict) and "status" in dat:
st = str(dat["status"]).lower()
else:
st = str(dat).lower()
# Эмодзи-кружки для статусов
if any(x in st for x in ["ok", "success", "live", "online"]):
emoji = "🟢"
elif any(x in st for x in ["error", "fail", "no-status", "none", "timeout"]):
emoji = "🔴"
else:
emoji = "🟡"
src_lines.append(f"{emoji} <b>{k}</b>: {st} ({ts})")
if src_lines:
lines.append("📡 Sources:")
lines.extend(src_lines)
# даже если game не успел — мы всё равно что-то вернём
return "\n".join(lines)
def status_broadcaster():
"""
Если матч live — сразу шлём статус.
Потом — раз в 5 минут.
Если матч не live — ждём и проверяем снова.
"""
INTERVAL = 300 # 5 минут
last_text = None
first_live_sent = False
while not stop_event.is_set():
# если игра не идёт — спим по чуть-чуть и крутимся
if STATUS not in ("live", "live_soon"):
first_live_sent = False # чтобы при новом лайве снова сразу отправить
time.sleep(5)
continue
# сюда попадаем только если live
text = build_pretty_status_message()
if text and text != last_text:
logger.info(text)
last_text = text
first_live_sent = True
# после первого лайва ждём 5 минут, а до него — 10 секунд
wait_sec = INTERVAL if first_live_sent else 10
for _ in range(wait_sec):
if stop_event.is_set():
break
time.sleep(1)
@asynccontextmanager
async def lifespan(app: FastAPI):
global STATUS, GAME_ID, SEASON, GAME_START_DT, GAME_TODAY, GAME_SOON
# 1. проверяем API: seasons
try:
seasons_resp = requests.get(
URLS["seasons"].format(host=HOST, league=LEAGUE)
).json()
season = seasons_resp["items"][0]["season"]
except Exception:
now = datetime.now()
if now.month > 9:
season = now.year + 1
else:
season = now.year
logger.info(f"предположили номер сезона: {season}")
SEASON = season
# 2. берём календарь
try:
calendar = requests.get(
URLS["calendar"].format(host=HOST, league=LEAGUE, season=season, lang=LANG)
).json()
except Exception as ex:
logger.error(f"не получилось проверить работу API. код ошибки: {ex}")
# тут можно вообще не запускать сервер, но оставим как есть
calendar = None
# 3. определяем игру
game_id, game_dt, is_today, cal_status = (
pick_game_for_team(calendar) if calendar else (None, None, False, None)
)
GAME_ID = game_id
GAME_START_DT = game_dt
GAME_TODAY = is_today
logger.info(
f"\nЛига: {LEAGUE}\nСезон: {season}\nКоманда: {TEAM}\nGame ID: {game_id}"
)
# 4. запускаем "длинные" потоки (они у тебя и так всегда)
thread_result_consumer = threading.Thread(
target=results_consumer,
daemon=True,
)
thread_result_consumer.start()
thread_status_broadcaster = threading.Thread(
target=status_broadcaster,
daemon=True,
)
thread_status_broadcaster.start()
# 5. Подготовим онлайн и офлайн наборы (как у тебя)
threads_live = [
threading.Thread(
target=get_data_from_API,
args=(
"pregame",
URLS["pregame"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
0.0016667,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
0.0016667,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
0.0016667,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
0.00016,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"live-status",
URLS["live-status"].format(host=HOST, game_id=game_id),
1,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"box-score",
URLS["box-score"].format(host=HOST, game_id=game_id),
1,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"play-by-play",
URLS["play-by-play"].format(host=HOST, game_id=game_id),
1,
stop_event,
),
daemon=True,
),
]
threads_offline = [
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
1, # реже
stop_event,
),
daemon=True,
)
]
# 5. решаем, что запускать
if not is_today:
STATUS = "no_game_today"
start_offline_threads(SEASON, GAME_ID)
else:
# игра сегодня
if cal_status is None:
STATUS = "today_not_started"
start_offline_threads(SEASON, GAME_ID)
elif cal_status == "Scheduled":
if game_dt:
delta = game_dt - datetime.now()
if delta <= timedelta(hours=1):
STATUS = "live_soon"
start_live_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
start_offline_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
start_offline_threads(SEASON, GAME_ID)
elif cal_status == "Online":
STATUS = "live"
start_live_threads(SEASON, GAME_ID)
elif cal_status in ["Result", "ResultConfirmed"]:
STATUS = "finished_today"
start_offline_threads(SEASON, GAME_ID)
else:
STATUS = "today_not_started"
start_offline_threads(SEASON, GAME_ID)
yield
# -------- shutdown --------
stop_event.set()
stop_live_threads()
stop_offline_threads()
thread_result_consumer.join(timeout=1)
thread_status_broadcaster.join(timeout=1)
app = FastAPI(lifespan=lifespan)
def format_time(seconds: float | int) -> str:
"""
Удобный формат времени для игроков:
71 -> "1:11"
0 -> "0:00"
Любые кривые значения -> "0:00".
"""
try:
total_seconds = int(float(seconds))
minutes = total_seconds // 60
sec = total_seconds % 60
return f"{minutes}:{sec:02}"
except (ValueError, TypeError):
return "0:00"
@app.get("/team1")
async def team1():
game = get_latest_game_safe()
if not game:
raise HTTPException(status_code=503, detail="game data not ready")
return await team("team1")
@app.get("/team2")
async def team2():
game = get_latest_game_safe()
if not game:
raise HTTPException(status_code=503, detail="game data not ready")
return await team("team2")
@app.get("/top_team1")
async def top_team1():
data = await team("team1")
return await top_sorted_team(data)
@app.get("/top_team2")
async def top_team2():
data = await team("team2")
return await top_sorted_team(data)
@app.get("/started_team1")
async def started_team1():
data = await team("team1")
return await started_team(data)
@app.get("/started_team2")
async def started_team2():
data = await team("team2")
return await started_team(data)
@app.get("/game")
async def game():
return latest_data["game"]
@app.get("/status")
async def status(request: Request):
def color_for_status(status_value: str) -> str:
"""Подбор цвета статуса в HEX"""
status_value = str(status_value).lower()
if status_value in ["ok", "success", "live", "live_soon", "online"]:
return "#00FF00" # зелёный
elif status_value in ["scheduled", "today_not_started", "upcoming"]:
return "#FFFF00" # жёлтый
elif status_value in [
"result",
"resultconfirmed",
"finished",
"finished_today",
]:
return "#FF0000" # красный
elif status_value in ["no_game_today", "unknown", "none"]:
return "#FFFFFF" # белый
else:
return "#808080" # серый (неизвестный статус)
# ✳️ сортируем latest_data в нужном порядке
sort_order = ["game", "live-status", "box-score", "play-by-play"]
sorted_keys = [k for k in sort_order if k in latest_data] + sorted(
[k for k in latest_data if k not in sort_order]
)
data = {
"league": LEAGUE,
"team": TEAM,
"game_id": GAME_ID,
"game_status": STATUS,
"statuses": [
{
"name": TEAM,
"status": STATUS,
"ts": (
GAME_START_DT.strftime("%Y-%m-%d %H:%M") if GAME_START_DT else "N/A"
),
"link": LEAGUE,
"color": color_for_status(STATUS),
}
]
+ [
{
"name": item,
"status": (
latest_data[item]["data"]["status"]
if isinstance(latest_data[item]["data"], dict)
and "status" in latest_data[item]["data"]
else latest_data[item]["data"]
),
"ts": latest_data[item]["ts"],
"link": URLS[item].format(
host=HOST,
league=LEAGUE,
season=SEASON,
lang=LANG,
game_id=GAME_ID,
),
"color": color_for_status(
latest_data[item]["data"]["status"]
if isinstance(latest_data[item]["data"], dict)
and "status" in latest_data[item]["data"]
else latest_data[item]["data"]
),
}
for item in sorted_keys # ← используем отсортированный порядок
],
}
accept = request.headers.get("accept", "")
if "text/html" in accept:
status_raw = str(STATUS).lower()
if status_raw in ["live"]:
gs_class = "live"
gs_text = "🟢 LIVE"
elif status_raw in ["live_soon"]:
gs_class = "live"
gs_text = "🟢 GAME TODAY (soon)"
elif status_raw == "today_not_started":
gs_class = "upcoming"
gs_text = "🟡 Game today, not started"
elif status_raw in ["finished_today", "finished"]:
gs_class = "finished"
gs_text = "🔴 Game finished"
elif status_raw == "no_game_today":
gs_class = "unknown"
gs_text = "⚪ No game today"
else:
gs_class = "unknown"
gs_text = "⚪ UNKNOWN"
html = f"""
<html>
<head>
<meta http-equiv="refresh" content="1">
<style>
body {{
font-family: "Segoe UI", Roboto, monospace;
background: #0f0f0f;
color: #eee;
padding: 20px;
}}
table {{
border-collapse: collapse;
width: 100%;
margin-top: 10px;
}}
th, td {{
border: 1px solid #333;
padding: 6px 10px;
text-align: left;
}}
th {{
background: #222;
color: #ccc;
}}
tr:nth-child(even) {{ background-color: #1a1a1a; }}
.ok {{ color: #00ff7f; font-weight: bold; }}
.fail {{ color: #ff4d4d; font-weight: bold; }}
.live {{ color: #00ff7f; font-weight: bold; }}
.finished {{ color: #ff4d4d; font-weight: bold; }}
.unknown {{ color: #cccccc; font-weight: bold; }}
a {{
color: #66b3ff;
text-decoration: none;
}}
a:hover {{ text-decoration: underline; }}
h2 {{ margin-bottom: 5px; }}
.header-info p {{ margin: 2px 0; }}
</style>
</head>
<body>
<h2>📊 Game Status Monitor</h2>
<div class="header-info">
<p><b>League:</b> {LEAGUE}</p>
<p><b>Team:</b> {TEAM}</p>
<p><b>Game ID:</b> {GAME_ID}</p>
<p><b>Game Status:</b> <span class="{gs_class}">{gs_text}</span></p>
</div>
<table>
<tr><th>Name</th><th>Status</th><th>Timestamp</th><th>Link</th></tr>
"""
for s in data["statuses"]:
status_text = str(s["status"]).strip().lower()
if any(
x in status_text
for x in ["ok", "success", "live", "live_soon", "online"]
):
color_class = "ok"
elif any(
x in status_text for x in ["scheduled", "today_not_started", "upcoming"]
):
color_class = "warn"
elif any(
x in status_text
for x in ["result", "resultconfirmed", "finished", "finished_today"]
):
color_class = "fail"
else:
color_class = "unknown"
html += f"""
<tr>
<td>{s["name"]}</td>
<td class="{color_class}">{status_text}</td>
<td>{s["ts"]}</td>
<td><a href="{s["link"]}" target="_blank">{s["link"]}</a></td>
</tr>
"""
html += """
</table>
</body>
</html>
"""
return HTMLResponse(content=html, media_type="text/html")
# JSON для API (красиво отформатированный)
formatted = json.dumps(data, indent=4, ensure_ascii=False)
response = Response(content=formatted, media_type="application/json")
response.headers["Refresh"] = "1"
return response
@app.get("/scores")
async def scores():
game = get_latest_game_safe()
if not game:
# игры ещё нет или пришёл только частичный ответ
# отдаём пустую структуру, чтобы фронт не падал
return [
{"Q": "Q1", "score1": "", "score2": ""},
{"Q": "Q2", "score1": "", "score2": ""},
{"Q": "Q3", "score1": "", "score2": ""},
{"Q": "Q4", "score1": "", "score2": ""},
]
game_data = game["data"] if "data" in game else game
result = game_data.get("result", {})
game_info = result.get("game", {})
full_score = game_info.get("fullScore")
if not full_score:
# поле есть, но ещё пустое/None
return [
{"Q": "Q1", "score1": "", "score2": ""},
{"Q": "Q2", "score1": "", "score2": ""},
{"Q": "Q3", "score1": "", "score2": ""},
{"Q": "Q4", "score1": "", "score2": ""},
]
quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"]
score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters]
full_score_list = full_score.split(",")
for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]):
parts = score_str.split(":")
if len(parts) == 2:
score_by_quarter[i]["score1"] = parts[0]
score_by_quarter[i]["score2"] = parts[1]
return score_by_quarter
async def top_sorted_team(data):
top_sorted_team = sorted(
(p for p in data if p.get("startRole") in ["Player", ""]),
key=lambda x: (
x.get("pts", 0),
x.get("dreb", 0) + x.get("oreb", 0),
x.get("ast", 0),
x.get("stl", 0),
x.get("blk", 0),
x.get("time", "0:00"),
),
reverse=True,
)
# пустые строки не должны ломать UI процентами фолов/очков
for player in top_sorted_team:
if player.get("num", "") == "":
player["pts"] = ""
player["foul"] = ""
return top_sorted_team
def get_latest_game_safe():
"""
Безопасно достаём актуальный game из latest_data.
Возвращаем None, если структура ещё не готова или прилетел "плохой" game
(например, с {"status": "no-status"} без data/result).
"""
game = latest_data.get("game")
if not game:
return None
# у нас в latest_data["game"] лежит {"ts": ..., "data": {...}} или сразу {...}
# в consumer мы клали {"ts": ..., "data": payload}, так что берём .get("data")
if "data" in game:
game_data = game["data"]
else:
# на всякий случай, если где-то клали сразу payload
game_data = game
if not isinstance(game_data, dict):
return None
result = game_data.get("result")
if not result:
return None
# если всё ок — вернём в исходном виде (с ts и т.п.)
return game
async def team(who: str):
"""
Возвращает данные по команде (team1 / team2) из актуального game.
Защищена от ситуации, когда latest_data["game"] ещё не прогрелся
или в него прилетел "плохой" ответ от API.
"""
game = get_latest_game_safe()
if not game:
# игра ещё не подгружена или структура кривоватая
raise HTTPException(status_code=503, detail="game data not ready")
# нормализуем доступ к данным
game_data = game["data"] if "data" in game else game
result = game_data[
"result"
] # здесь уже безопасно, мы проверили в get_latest_game_safe
# в result ожидаем "teams"
teams = result.get("teams")
if not teams:
raise HTTPException(status_code=503, detail="game teams not ready")
# выбираем команду
if who == "team1":
payload = next((t for t in teams if t.get("teamNumber") == 1), None)
else:
payload = next((t for t in teams if t.get("teamNumber") == 2), None)
if payload is None:
raise HTTPException(status_code=404, detail=f"{who} not found in game data")
# дальше — твоя исходная логика формирования ответа по команде
# я не знаю весь твой оригинальный код ниже, поэтому вставляю каркас
# и показываю, где нужно аккуратно брать plays/box-score из latest_data
role_list = [
("Center", "C"),
("Guard", "G"),
("Forward", "F"),
("Power Forward", "PF"),
("Small Forward", "SF"),
("Shooting Guard", "SG"),
("Point Guard", "PG"),
("Forward-Center", "FC"),
]
starts = payload.get("starts", [])
team_rows = []
for item in starts:
stats = item.get("stats") or {}
row = {
"id": item.get("personId") or "",
"num": item.get("displayNumber"),
"startRole": item.get("startRole"),
"role": item.get("positionName"),
"roleShort": (
[
r[1]
for r in role_list
if r[0].lower() == (item.get("positionName") or "").lower()
][0]
if any(
r[0].lower() == (item.get("positionName") or "").lower()
for r in role_list
)
else ""
),
"NameGFX": (
f"{(item.get('firstName') or '').strip()} {(item.get('lastName') or '').strip()}".strip()
if item.get("firstName") is not None
and item.get("lastName") is not None
else "Команда"
),
"captain": item.get("isCapitan", False),
"age": item.get("age") or 0,
"height": f"{item.get('height')} cm" if item.get("height") else 0,
"weight": f"{item.get('weight')} kg" if item.get("weight") else 0,
"isStart": stats.get("isStart", False),
"isOn": "🏀" if stats.get("isOnCourt") is True else "",
"flag": (
"https://flagicons.lipis.dev/flags/4x3/"
+ (
"ru"
if item.get("countryId") is None
and item.get("countryName") == "Russia"
else (
""
if item.get("countryId") is None
else (
(item.get("countryId") or "").lower()
if item.get("countryName") is not None
else ""
)
)
)
+ ".svg"
),
"pts": stats.get("points", 0),
"pt-2": f"{stats.get('goal2',0)}/{stats.get('shot2',0)}" if stats else 0,
"pt-3": f"{stats.get('goal3',0)}/{stats.get('shot3',0)}" if stats else 0,
"pt-1": f"{stats.get('goal1',0)}/{stats.get('shot1',0)}" if stats else 0,
"fg": (
f"{stats.get('goal2',0)+stats.get('goal3',0)}/"
f"{stats.get('shot2',0)+stats.get('shot3',0)}"
if stats
else 0
),
"ast": stats.get("assist", 0),
"stl": stats.get("steal", 0),
"blk": stats.get("block", 0),
"blkVic": stats.get("blocked", 0),
"dreb": stats.get("defReb", 0),
"oreb": stats.get("offReb", 0),
"reb": stats.get("defReb", 0) + stats.get("offReb", 0),
"to": stats.get("turnover", 0),
"foul": stats.get("foul", 0),
"foulT": stats.get("foulT", 0),
"foulD": stats.get("foulD", 0),
"foulC": stats.get("foulC", 0),
"foulB": stats.get("foulB", 0),
"fouled": stats.get("foulsOn", 0),
"plusMinus": stats.get("plusMinus", 0),
"dunk": stats.get("dunk", 0),
"kpi": (
stats.get("points", 0)
+ stats.get("defReb", 0)
+ stats.get("offReb", 0)
+ stats.get("assist", 0)
+ stats.get("steal", 0)
+ stats.get("block", 0)
+ stats.get("foulsOn", 0)
+ (stats.get("goal1", 0) - stats.get("shot1", 0))
+ (stats.get("goal2", 0) - stats.get("shot2", 0))
+ (stats.get("goal3", 0) - stats.get("shot3", 0))
- stats.get("turnover", 0)
- stats.get("foul", 0)
),
"time": format_time(stats.get("second", 0)),
"pts1q": 0,
"pts2q": 0,
"pts3q": 0,
"pts4q": 0,
"pts1h": 0,
"pts2h": 0,
"Name1GFX": (item.get("firstName") or "").strip(),
"Name2GFX": (item.get("lastName") or "").strip(),
"photoGFX": (
os.path.join(
"D:\\Photos",
LEAGUE.lower(),
result[who]["name"],
f"{item.get('displayNumber')}.png",
)
if item.get("startRole") == "Player"
else ""
),
"isOnCourt": stats.get("isOnCourt", False),
}
team_rows.append(row)
# добиваем до 12 строк, чтобы UI был ровный
count_player = sum(1 for x in team_rows if x["startRole"] == "Player")
if count_player < 12 and team_rows:
filler_count = (4 if count_player <= 4 else 12) - count_player
template_keys = list(team_rows[0].keys())
for _ in range(filler_count):
empty_row = {}
for key in template_keys:
if key in ["captain", "isStart", "isOnCourt"]:
empty_row[key] = False
elif key in [
"id",
"pts",
"weight",
"height",
"age",
"ast",
"stl",
"blk",
"blkVic",
"dreb",
"oreb",
"reb",
"to",
"foul",
"foulT",
"foulD",
"foulC",
"foulB",
"fouled",
"plusMinus",
"dunk",
"kpi",
]:
empty_row[key] = 0
else:
empty_row[key] = ""
team_rows.append(empty_row)
# сортируем игроков по типу роли: сначала "Player", потом "", потом "Coach" и т.д.
role_priority = {
"Player": 0,
"": 1,
"Coach": 2,
"Team": 3,
None: 4,
"Other": 5,
}
sorted_team = sorted(
team_rows,
key=lambda x: role_priority.get(x.get("startRole", 99), 99),
)
return sorted_team
async def started_team(data):
started_team = sorted(
(
p
for p in data
if p.get("startRole") == "Player" and p.get("isOnCourt") is True
),
key=lambda x: int(x.get("num") or 0),
)
return started_team
def add_new_team_stat(
data: dict,
avg_age: float,
points,
avg_height: float,
timeout_str: str,
timeout_left: int,
) -> dict:
"""
Берёт словарь total по команде (очки, подборы, броски и т.д.),
добавляет:
- проценты попаданий
- средний возраст / рост
- очки старт / бенч
- информацию по таймаутам
и всё приводит к строкам (для UI, чтобы не ловить типы).
Возвращает обновлённый словарь.
"""
def safe_int(v):
try:
return int(v)
except (ValueError, TypeError):
return 0
def format_percent(goal, shot):
goal, shot = safe_int(goal), safe_int(shot)
return f"{round(goal * 100 / shot)}%" if shot else "0%"
goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1"))
goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2"))
goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3"))
def_reb = safe_int(data.get("defReb"))
off_reb = safe_int(data.get("offReb"))
data.update(
{
"pt-1": f"{goal1}/{shot1}",
"pt-2": f"{goal2}/{shot2}",
"pt-3": f"{goal3}/{shot3}",
"fg": f"{goal2 + goal3}/{shot2 + shot3}",
"pt-1_pro": format_percent(goal1, shot1),
"pt-2_pro": format_percent(goal2, shot2),
"pt-3_pro": format_percent(goal3, shot3),
"fg_pro": format_percent(goal2 + goal3, shot2 + shot3),
"Reb": str(def_reb + off_reb),
"avgAge": str(avg_age),
"ptsStart": str(points[0]),
"ptsStart_pro": str(points[1]),
"ptsBench": str(points[2]),
"ptsBench_pro": str(points[3]),
"avgHeight": f"{avg_height} cm",
"timeout_left": str(timeout_left),
"timeout_str": str(timeout_str),
}
)
for k in data:
data[k] = str(data[k])
return data
def time_outs_func(data_pbp):
"""
Считает таймауты для обеих команд и формирует читабельные строки вида:
"2 Time-outs left in 2nd half"
Возвращает:
(строка_для_команды1, остаток1, строка_для_команды2, остаток2)
"""
timeout1 = []
timeout2 = []
for event in data_pbp:
if event.get("play") == 23: # 23 == таймаут
if event.get("startNum") == 1:
timeout1.append(event)
elif event.get("startNum") == 2:
timeout2.append(event)
def timeout_status(timeout_list, last_event: dict):
period = last_event.get("period", 0)
sec = last_event.get("sec", 0)
if period < 3:
timeout_max = 2
count = sum(1 for t in timeout_list if t.get("period", 0) <= period)
quarter = "1st half"
elif period < 5:
count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period)
quarter = "2nd half"
if period == 4 and sec >= 4800 and count in (0, 1):
timeout_max = 2
else:
timeout_max = 3
else:
timeout_max = 1
count = sum(1 for t in timeout_list if t.get("period", 0) == period)
quarter = f"OverTime {period - 4}"
left = max(0, timeout_max - count)
word = "Time-outs" if left != 1 else "Time-out"
text = f"{left if left != 0 else 'No'} {word} left in {quarter}"
return text, left
if not data_pbp:
return "", 0, "", 0
last_event = data_pbp[-1]
t1_str, t1_left = timeout_status(timeout1, last_event)
t2_str, t2_left = timeout_status(timeout2, last_event)
return t1_str, t1_left, t2_str, t2_left
def add_data_for_teams(new_data):
"""
Считает командные агрегаты:
- средний возраст
- очки со старта vs со скамейки, + их проценты
- средний рост
Возвращает кортеж:
(avg_age, [start_pts, start%, bench_pts, bench%], avg_height_cm)
"""
players = [item for item in new_data if item["startRole"] == "Player"]
points_start = 0
points_bench = 0
total_age = 0
total_height = 0
player_count = len(players)
for player in players:
# print(player)
stats = player["stats"]
if stats:
# print(stats)
if stats["isStart"] is True:
points_start += stats["points"]
elif stats["isStart"] is False:
points_bench += stats["points"]
total_age += player["age"]
total_height += player["height"]
total_points = points_start + points_bench
points_start_pro = (
f"{round(points_start * 100 / total_points)}%" if total_points else "0%"
)
points_bench_pro = (
f"{round(points_bench * 100 / total_points)}%" if total_points else "0%"
)
avg_age = round(total_age / player_count, 1) if player_count else 0
avg_height = round(total_height / player_count, 1) if player_count else 0
points = [points_start, points_start_pro, points_bench, points_bench_pro]
return avg_age, points, avg_height
stat_name_list = [
("points", "Очки", "points"),
("pt-1", "Штрафные", "free throws"),
("pt-1_pro", "штрафные, процент", "free throws pro"),
("pt-2", "2-очковые", "2-points"),
("pt-2_pro", "2-очковые, процент", "2-points pro"),
("pt-3", "3-очковые", "3-points"),
("pt-3_pro", "3-очковые, процент", "3-points pro"),
("fg", "очки с игры", "field goals"),
("fg_pro", "Очки с игры, процент", "field goals pro"),
("assist", "Передачи", "assists"),
("pass", "", ""),
("defReb", "подборы в защите", ""),
("offReb", "подборы в нападении", ""),
("Reb", "Подборы", "rebounds"),
("steal", "Перехваты", "steals"),
("block", "Блокшоты", "blocks"),
("blocked", "", ""),
("turnover", "Потери", "turnovers"),
("foul", "Фолы", "fouls"),
("foulsOn", "", ""),
("foulT", "", ""),
("foulD", "", ""),
("foulC", "", ""),
("foulB", "", ""),
("second", "секунды", "seconds"),
("dunk", "данки", "dunks"),
("fastBreak", "", "fast breaks"),
("plusMinus", "+/-", "+/-"),
("avgAge", "", "avg Age"),
("ptsBench", "", "Bench PTS"),
("ptsBench_pro", "", "Bench PTS, %"),
("ptsStart", "", "Start PTS"),
("ptsStart_pro", "", "Start PTS, %"),
("avgHeight", "", "avg height"),
("timeout_left", "", "timeout left"),
("timeout_str", "", "timeout str"),
]
@app.get("/team_stats")
async def team_stats():
teams = latest_data["game"]["data"]["result"]["teams"]
plays = latest_data["game"]["data"]["result"]["plays"]
team_1 = next((t for t in teams if t["teamNumber"] == 1), None)
team_2 = next((t for t in teams if t["teamNumber"] == 2), None)
timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays)
avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1["starts"])
avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2["starts"])
total_1 = add_new_team_stat(
team_1["total"],
avg_age_1,
points_1,
avg_height_1,
timeout_str1,
timeout_left1,
)
total_2 = add_new_team_stat(
team_2["total"],
avg_age_2,
points_2,
avg_height_2,
timeout_str2,
timeout_left2,
)
result_json = []
for key in total_1:
val1 = total_1[key]
val2 = total_2[key]
stat_rus = ""
stat_eng = ""
for metric_name, rus, eng in stat_name_list:
if metric_name == key:
stat_rus, stat_eng = rus, eng
break
result_json.append(
{
"name": key,
"nameGFX_rus": stat_rus,
"nameGFX_eng": stat_eng,
"val1": val1,
"val2": val2,
}
)
return result_json
@app.get("/referee")
async def referee():
desired_order = [
"Crew chief",
"Referee 1",
"Referee 2",
"Commissioner",
"Ст.судья",
"Судья 1",
"Судья 2",
"Комиссар",
]
# Найти судей (teamNumber == 0)
team_ref = next(
(
t
for t in latest_data["game"]["data"]["result"]["teams"]
if t["teamNumber"] == 0
),
None,
)
referees_raw = team_ref.get("starts", [])
referees = []
for r in referees_raw:
flag_code = r.get("countryId", "").lower() if r.get("countryName") else ""
referees.append(
{
"displayNumber": r.get("displayNumber", ""),
"positionName": r.get("positionName", ""),
"lastNameGFX": f"{r.get('firstName', '')} {r.get('lastName', '')}".strip(),
"secondName": r.get("secondName", ""),
"birthday": r.get("birthday", ""),
"age": r.get("age", 0),
"flag": f"https://flagicons.lipis.dev/flags/4x3/{flag_code}.svg",
}
)
# Сортировка по позиции
referees = sorted(
referees,
key=lambda x: (
desired_order.index(x["positionName"])
if x["positionName"] in desired_order
else len(desired_order)
),
)
return referees
@app.get("/team_comparison")
async def team_comparison():
if STATUS not in ["no_game_today", "finished_today"]:
data = latest_data["pregame"]["data"]["result"]
teams = []
for data_team in (data["teamStats1"], data["teamStats2"]):
temp_team = {
"team": data_team["team"]["name"],
"games": data_team["games"],
"points": round(
(data_team["totalStats"]["points"] / data_team["games"]), 1
),
"points_2": round(
(
data_team["totalStats"]["goal2"]
* 100
/ data_team["totalStats"]["shot2"]
),
1,
),
"points_3": round(
(
data_team["totalStats"]["goal3"]
* 100
/ data_team["totalStats"]["shot3"]
),
1,
),
"points_23": round(
(
data_team["totalStats"]["goal23"]
* 100
/ data_team["totalStats"]["shot23"]
),
1,
),
"points_1": round(
(
data_team["totalStats"]["goal1"]
* 100
/ data_team["totalStats"]["shot1"]
),
1,
),
"assists": round(
(data_team["totalStats"]["assist"] / data_team["games"]), 1
),
"rebounds": round(
(
(
data_team["totalStats"]["defRebound"]
+ data_team["totalStats"]["offRebound"]
)
/ data_team["games"]
),
1,
),
"steals": round(
(data_team["totalStats"]["steal"] / data_team["games"]), 1
),
"turnovers": round(
(data_team["totalStats"]["turnover"] / data_team["games"]), 1
),
"blocks": round(
(data_team["totalStats"]["blockShot"] / data_team["games"]), 1
),
"fouls": round(
(data_team["totalStats"]["foul"] / data_team["games"]), 1
),
}
teams.append(temp_team)
return teams
else:
return [{"Данных о сравнении команд нет!"}]
@app.get("/standings")
async def regular_standings():
data = latest_data["actual-standings"]["data"]["items"]
for item in data:
if item["comp"]["name"] == "Regular Season":
if item.get("standings"):
standings_rows = item["standings"]
df = pd.json_normalize(standings_rows)
if "scores" in df.columns:
df = df.drop(columns=["scores"])
if (
"totalWin" in df.columns
and "totalDefeat" in df.columns
and "totalGames" in df.columns
and "totalGoalPlus" in df.columns
and "totalGoalMinus" in df.columns
):
tw = (
pd.to_numeric(df["totalWin"], errors="coerce")
.fillna(0)
.astype(int)
)
td = (
pd.to_numeric(df["totalDefeat"], errors="coerce")
.fillna(0)
.astype(int)
)
df["w_l"] = tw.astype(str) + " / " + td.astype(str)
def calc_percent(row):
win = row.get("totalWin", 0)
games = row.get("totalGames", 0)
# гарантируем числа
try:
win = int(win)
except (TypeError, ValueError):
win = 0
try:
games = int(games)
except (TypeError, ValueError):
games = 0
if games == 0 or row["w_l"] == "0 / 0":
return 0
return round(win * 100 / games + 0.000005)
df["procent"] = df.apply(calc_percent, axis=1)
tg_plus = (
pd.to_numeric(df["totalGoalPlus"], errors="coerce")
.fillna(0)
.astype(int)
)
tg_minus = (
pd.to_numeric(df["totalGoalMinus"], errors="coerce")
.fillna(0)
.astype(int)
)
df["plus_minus"] = tg_plus - tg_minus
standings_payload = df.to_dict(orient="records")
return standings_payload
@app.get("/live_status")
async def live_status():
# если матч реально идёт/вот-вот — пытаемся отдать то, что есть
if STATUS in ["live", "live_soon"]:
ls = latest_data.get("live-status")
if not ls:
# live-status ещё не прилетел
return [{"foulsA": 0, "foulsB": 0}]
raw = ls.get("data")
# 1) если это уже готовый dict и в нём есть result → как было раньше
if isinstance(raw, dict):
# иногда API кладёт всё прямо в root, иногда внутрь result
if "result" in raw and isinstance(raw["result"], dict):
return [raw["result"]]
else:
# отдадим как есть, но в списке, чтобы фронт не сломать
return [raw]
# 2) если это просто строка статуса ("ok" / "no-status" / "error")
if isinstance(raw, str):
return [{"status": raw}]
# fallback
return [{"foulsA": 0, "foulsB": 0}]
else:
# матч не идёт — как у тебя было
return [{"foulsA": 0, "foulsB": 0}]
if __name__ == "__main__":
uvicorn.run(
"get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="critical"
)