Files
RFB/visual.py
2025-10-22 09:47:44 +00:00

1798 lines
66 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import socket
import platform
import numpy as np
import pandas as pd
import streamlit as st
import sys
import requests
from streamlit_autorefresh import st_autorefresh
import errno
import time
import re
from datetime import datetime, timedelta, timezone
st.set_page_config(
page_title="Баскетбол",
page_icon="🏀",
layout="wide",
initial_sidebar_state="expanded",
menu_items={"About": "версия 2.0 от 08.10.2025"},
)
REMOVE_PADDING_FROM_SIDES = """
<style>
.block-container {
padding-top: 0rem;
padding-bottom: 0rem;
}
</style>
"""
st.markdown(REMOVE_PADDING_FROM_SIDES, unsafe_allow_html=True)
st_autorefresh()
def schedule_daily_restart():
"""Перезапуск get_season_and_schedule каждый день в 00:05 (только не на Windows)."""
if not sys.platform.startswith("win"):
while True:
now = datetime.now()
next_run = (now + timedelta(days=1)).replace(
hour=0, minute=5, second=0, microsecond=0
)
sleep_time = (next_run - now).total_seconds()
time.sleep(sleep_time)
st.cache_data.clear()
# Функции для стилизации
def highlight_max(data):
# Преобразуем данные к числовому типу, заменяя некорректные значения на NaN
numeric_data = pd.to_numeric(data, errors="coerce")
max_value = numeric_data.max() if pd.notna(numeric_data.max()) else None
return [
"background-color: green" if pd.notna(v) and v == max_value and v > 0 else ""
for v in numeric_data
]
def color_win(s):
return [
(
"background-color: ForestGreen"
if v == True
else "background-color: #FF4B4B" if v == False else None
)
for v in s
]
def highlight_grey(s):
return ["background-color: grey"] * len(s) if s.foul == 5 else [""] * len(s)
def highlight_foul(s):
return [
(
"background-color: orange"
if v == 4
else "background-color: red" if v == 5 else ""
)
for v in s
]
def load_json_data(filepath):
"""
Загружает данные из JSON файла и кэширует их.
Возвращает None, если файл не удается прочитать.
"""
try:
with open(filepath, "r", encoding="utf-8") as file:
return json.load(file)
except (json.JSONDecodeError, FileNotFoundError):
return None
# Функция для обработки данных одной команды
def process_team_data(team_json, columns_to_include):
team_data = pd.json_normalize(team_json)
# Оставляем только нужные колонки
team_data = team_data[:12][columns_to_include]
# Обработка height и weight
for column in ["height", "weight"]:
if column in team_data.columns:
team_data[column] = team_data[column].apply(
lambda value: "" if value == 0 else value
)
return team_data
def process_player_data(team_json, player_index):
team_data = pd.json_normalize(team_json)
player_data = team_data.iloc[player_index]
season_total = {
"name": "Season Total",
"game_count": str(player_data["TGameCount"]),
"start_count": str(player_data["TStartCount"]),
"pts": str(player_data["TPoints"]),
"pt-2": str(player_data["TShots2"]),
"pt-3": str(player_data["TShots3"]),
"pt-1": str(player_data["TShots1"]),
"fg": str(player_data["TShots23"]),
"ast": str(player_data["TAssist"]),
"stl": str(player_data["TSteal"]),
"blk": str(player_data["TBlocks"]),
"dreb": str(player_data["TDefRebound"]),
"oreb": str(player_data["TOffRebound"]),
"reb": str(player_data["TRebound"]),
# "to": str(player_data["TTurnover"]),
# "foul": str(player_data["TFoul"]),
"fouled": str(player_data["TOpponentFoul"]),
"dunk": str(player_data["TDunk"]),
"time": str(player_data["TPlayedTime"]),
}
season_avg = {
"name": "Season Average",
"game_count": "",
"start_count": "",
"pts": str(player_data["AvgPoints"]),
"pt-2": str(player_data["Shot2Percent"]),
"pt-3": str(player_data["Shot3Percent"]),
"pt-1": str(player_data["Shot1Percent"]),
"fg": str(player_data["Shot23Percent"]),
"ast": str(player_data["AvgAssist"]),
"stl": str(player_data["AvgSteal"]),
"blk": str(player_data["AvgBlocks"]),
"dreb": str(player_data["AvgDefRebound"]),
"oreb": str(player_data["AvgOffRebound"]),
"reb": str(player_data["AvgRebound"]),
# "to": str(player_data["AvgTurnover"]),
# "foul": str(player_data["AvgFoul"]),
"fouled": str(player_data["AvgOpponentFoul"]),
"dunk": str(player_data["AvgDunk"]),
"time": str(player_data["AvgPlayedTime"]),
}
career_total = {
"name": "Career Total",
"game_count": str(player_data["CareerTGameCount"]),
"start_count": str(player_data["CareerTStartCount"]),
"pts": str(player_data["CareerTPoints"]),
"pt-2": str(player_data["CareerTShots2"]),
"pt-3": str(player_data["CareerTShots3"]),
"pt-1": str(player_data["CareerTShots1"]),
"fg": str(player_data["CareerTShots23"]),
"ast": str(player_data["CareerTAssist"]),
"stl": str(player_data["CareerTSteal"]),
"blk": str(player_data["CareerTBlocks"]),
"dreb": str(player_data["CareerTDefRebound"]),
"oreb": str(player_data["CareerTOffRebound"]),
"reb": str(player_data["CareerTRebound"]),
# "to": str(player_data["CareerTTurnover"]),
# "foul": str(player_data["CareerTFoul"]),
"fouled": str(player_data["CareerTOpponentFoul"]),
"dunk": str(player_data["CareerTDunk"]),
"time": str(player_data["CareerTPlayedTime"]),
}
return [season_total, season_avg, career_total], player_data
config = {
"flag": st.column_config.ImageColumn("flag"),
"roleShort": st.column_config.TextColumn("R", width=27),
"num": st.column_config.TextColumn("#", width=27),
"NameGFX": st.column_config.TextColumn(width=170),
"isOn": st.column_config.TextColumn("🏀", width=27),
"pts": st.column_config.NumberColumn("PTS", width=27),
"pt-2": st.column_config.TextColumn("2-PT", width=45),
"pt-3": st.column_config.TextColumn("3-PT", width=45),
"pt-1": st.column_config.TextColumn("FT", width=45),
"fg": st.column_config.TextColumn("FG", width=45),
"ast": st.column_config.NumberColumn("AS", width=27),
"stl": st.column_config.NumberColumn("ST", width=27),
"blk": st.column_config.NumberColumn("BL", width=27),
"blkVic": st.column_config.NumberColumn("BV", width=27),
"dreb": st.column_config.NumberColumn("DR", width=27),
"oreb": st.column_config.NumberColumn("OR", width=27),
"reb": st.column_config.NumberColumn("R", width=27),
"to": st.column_config.NumberColumn("TO", width=27),
"foul": st.column_config.NumberColumn("F", width=27),
"fouled": st.column_config.NumberColumn("Fed", width=27),
"plusMinus": st.column_config.NumberColumn("+/-", width=27),
"dunk": st.column_config.NumberColumn("DUNK", width=27),
"kpi": st.column_config.NumberColumn("KPI", width=27),
"time": st.column_config.TextColumn("TIME"),
"game_count": st.column_config.TextColumn("G", width=27),
"start_count": st.column_config.TextColumn("S", width=27),
"q_pts": st.column_config.TextColumn("PTS", width=27),
"q_ast": st.column_config.TextColumn("AS", width=27),
"q_stl": st.column_config.TextColumn("ST", width=27),
"q_blk": st.column_config.TextColumn("BL", width=27),
"q_reb": st.column_config.TextColumn("R", width=27),
"q_rnk": st.column_config.TextColumn("KPI", width=27),
"q_f": st.column_config.TextColumn("F", width=27),
"q_f_on": st.column_config.TextColumn("Fed", width=27),
"q_to": st.column_config.TextColumn("TO", width=27),
"q_time": st.column_config.TextColumn("TIME"),
"q_pt2": st.column_config.TextColumn("2-PT", width=45),
"q_pt3": st.column_config.TextColumn("3-PT", width=45),
"q_pt23": st.column_config.TextColumn("FG", width=45),
"q_ft": st.column_config.TextColumn("FT", width=45),
}
config_season = {
"flag": st.column_config.ImageColumn("flag"),
"roleShort": st.column_config.TextColumn("R", width=27),
"num": st.column_config.TextColumn("#", width=27),
"NameGFX": st.column_config.TextColumn(width=170),
"isOn": st.column_config.TextColumn("🏀", width=27),
"pts": st.column_config.TextColumn("PTS", width=40),
"pt-2": st.column_config.TextColumn("2-PT", width=60),
"pt-3": st.column_config.TextColumn("3-PT", width=60),
"pt-1": st.column_config.TextColumn("FT", width=60),
"fg": st.column_config.TextColumn("FG", width=60),
"ast": st.column_config.TextColumn("AS", width=40),
"stl": st.column_config.TextColumn("ST", width=40),
"blk": st.column_config.TextColumn("BL", width=40),
"blkVic": st.column_config.TextColumn("BV", width=40),
"dreb": st.column_config.TextColumn("DR", width=40),
"oreb": st.column_config.TextColumn("OR", width=40),
"reb": st.column_config.TextColumn("R", width=40),
"to": st.column_config.TextColumn("TO", width=40),
"foul": st.column_config.TextColumn("F", width=40),
"fouled": st.column_config.TextColumn("Fed", width=40),
"plusMinus": st.column_config.TextColumn("+/-", width=40),
"dunk": st.column_config.TextColumn("DUNK", width=40),
"kpi": st.column_config.TextColumn("KPI", width=40),
"time": st.column_config.TextColumn("TIME"),
"game_count": st.column_config.TextColumn("G", width=40),
"start_count": st.column_config.TextColumn("S", width=40),
}
if "player1" not in st.session_state:
st.session_state.player1 = None
if "player2" not in st.session_state:
st.session_state.player2 = None
myhost = platform.node()
if sys.platform.startswith("win"): # было: if platform == "win32":
FOLDER_JSON = "JSON"
else:
FOLDER_JSON = "static"
def get_ip_address():
try:
# Попытка получить IP-адрес с использованием внешнего сервиса
# Может потребоваться подключение к интернету
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
except socket.error:
# Если не удалось получить IP-адрес через внешний сервис,
# используем метод для локального получения IP
ip_address = socket.gethostbyname(socket.gethostname())
return ip_address
def _is_meaningful_payload(obj) -> bool:
# Нулевые/битые/пустые — не считаем осмысленными
if obj is None:
return False
# Пустая коллекция — не осмысленно
if isinstance(obj, (list, dict, set, tuple)):
return len(obj) > 0
# Любой другой тип (str/числа) считаем осмысленным только если не пустая строка
if isinstance(obj, str):
return obj.strip() != ""
return True
def load_data_from_json(filepath):
"""
Читает JSON и обновляет session_state ТОЛЬКО если данные осмысленные.
Иначе оставляет предыдущий кэш нетронутым (stale-if-error/stale-if-empty).
"""
directory = FOLDER_JSON
os.makedirs(directory, exist_ok=True)
filepath_full = os.path.join(directory, f"{filepath}.json")
# print(filepath)
# print(filepath_full)
# вычисление ключа
# ip = get_ip_address()
# host = ip_check.get(ip, {}).get("host") or ""
host = _ipcheck()
base = os.path.basename(filepath_full).replace(".json", "")
key = base.replace(f"{host}", "", 1)
new_payload = load_json_data(filepath_full) # может быть None/битое/пустое
# if "team1_" in filepath_full:
# print(filepath_full)
# print(new_payload)
# Если новый payload осмысленный — обновляем кэш и "last_good_*"
if _is_meaningful_payload(new_payload):
st.session_state[key] = new_payload
st.session_state.setdefault("_last_good", {})
st.session_state["_last_good"][key] = new_payload
return
# Иначе: если раньше уже был хороший — восстанавливаем его и НЕ трогаем
if "_last_good" in st.session_state and key in st.session_state["_last_good"]:
# ничего не делаем — оставляем текущее значение как есть
return
# Иначе (нет ни нового, ни прошлого хорошего) — вообще не создаём ключ
# чтобы ниже по коду .get(...) вернул None и UI ничего не нарисовал
# (при желании можно явно "очищать": st.session_state.pop(key, None))
def _ipcheck() -> str:
"""Возвращает префикс для имени файла по IP.
Если IP локальный или host не найден — возвращает пустую строку.
"""
try:
ip_str = get_ip_address()
except Exception:
ip_str = None
ip_map = globals().get("ip_check") or {}
if ip_str and isinstance(ip_map, dict):
host = (ip_map.get(ip_str) or {}).get("host")
if host:
return f"{host}_"
return ""
def read_match_id_json(path="match_id.json", attempts=10, delay=0.2):
"""Надёжное чтение match_id.json с ретраями при EBUSY/битом JSON."""
d = delay
for i in range(attempts):
try:
if not os.path.isfile(path):
return {}
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
# файл переписывают — подождём и попробуем снова
time.sleep(d)
d = min(d * 1.6, 2.0)
except OSError as e:
# EBUSY (errno=16) — подождём и ещё раз
if getattr(e, "errno", None) == errno.EBUSY:
time.sleep(d)
d = min(d * 1.6, 2.0)
continue
# иные ошибки — пробрасываем дальше
raise
print("Не удалось прочитать match_id.json после нескольких попыток; возвращаю {}")
return {}
def rewrite_file(filename: str, data: dict, directory: str = "JSON") -> None:
"""
Перезаписывает JSON-файл с заданными данными.
Если запуск локальный (локальный IP), префикс host в имени файла не используется.
Если IP не локальный и есть словарь ip_check с хостами — добавим префикс host_.
"""
# Если глобальная константа задана — используем её
try:
directory = FOLDER_JSON # type: ignore[name-defined]
except NameError:
# иначе используем аргумент по умолчанию/переданный
pass
os.makedirs(directory, exist_ok=True)
host_prefix = _ipcheck()
filepath = os.path.join(directory, f"{host_prefix}{filename}.json")
# print(filepath) # оставил как у тебя; можно заменить на logger.debug при желании
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Ошибка при записи файла {filepath}: {e}")
ip_check = read_match_id_json("match_id.json") or {}
prefix = _ipcheck()
load_data_from_json(f"{prefix}game_online")
cached_game_online = st.session_state.get("game_online")
load_data_from_json(f"{prefix}team1")
cached_team1 = st.session_state.get("team1")
load_data_from_json(f"{prefix}team2")
cached_team2 = st.session_state.get("team2")
load_data_from_json(f"{prefix}referee")
cached_referee = st.session_state.get("referee")
# standings — может не быть тега/файла
league_tag = None
if isinstance(cached_game_online, dict):
league_tag = ((cached_game_online.get("result") or {}).get("league") or {}).get(
"tag"
)
comp_name = (
((cached_game_online.get("result") or {}).get("comp") or {})
.get("name")
.replace(" ", "_")
)
if league_tag:
load_data_from_json(f"{prefix}standings_{league_tag}_{comp_name}")
cached_standings = (
st.session_state.get(f"standings_{league_tag}_{comp_name}") if league_tag else None
)
load_data_from_json(f"{prefix}scores_quarter")
cached_scores_quarter = st.session_state.get("scores_quarter")
load_data_from_json(f"{prefix}play_by_play")
cached_play_by_play = st.session_state.get("play_by_play")
load_data_from_json(f"{prefix}team_stats")
cached_team_stats = st.session_state.get("team_stats")
load_data_from_json(f"{prefix}scores")
cached_scores = st.session_state.get("scores") or [] # важно!
load_data_from_json(f"{prefix}live_status")
cached_live_status = st.session_state.get("live_status")
load_data_from_json(f"{prefix}schedule")
cached_schedule = st.session_state.get("schedule")
load_data_from_json(f"{prefix}team_comparison")
cached_team_comparison = st.session_state.get("team_comparison")
def _is_empty_like(x) -> bool:
if x is None:
return True
if isinstance(x, pd.DataFrame):
return x.empty
try:
from pandas.io.formats.style import Styler
if isinstance(x, Styler):
return getattr(x, "data", pd.DataFrame()).empty
except Exception:
pass
if isinstance(x, (list, tuple, dict, set)):
return len(x) == 0
return False
def safe_show(func, *args, **kwargs):
for a in args:
if _is_empty_like(a):
return None
for k, v in kwargs.items():
if k.lower() in (
"height",
"width",
"use_container_width",
"unsafe_allow_html",
"on_select",
"selection_mode",
"column_config",
"hide_index",
"border",
"delta_color",
"key",
):
continue
if _is_empty_like(v):
return None
if "height" in kwargs:
h = kwargs.get("height")
if h is None:
kwargs.pop("height")
else:
try:
h = int(h)
if h < 0:
kwargs.pop("height")
else:
kwargs["height"] = h
except Exception:
kwargs.pop("height")
try:
return func(*args, **kwargs)
except Exception as e:
st.warning(f"⚠️ Ошибка при отображении: {e}")
return None
# ======== /SAFE WRAPPER ========
def ensure_state(key: str, default=None):
# Инициализирует ключ один раз и возвращает значение
return st.session_state.setdefault(key, default)
period_max = 0
if isinstance(cached_play_by_play, list) and isinstance(cached_game_online, dict):
plays = (cached_game_online.get("result") or {}).get("plays") or []
if plays:
df_data_pbp = pd.DataFrame(cached_play_by_play)
if not df_data_pbp.empty and "period" in df_data_pbp.columns:
period_max = int(df_data_pbp.iloc[0]["period"])
count_quarter = [
f"Четверть {i}" if i < 5 else f"Овертайм {i-4}"
for i in range(1, period_max + 1)
]
for i in range(1, period_max + 1):
key_team1 = f"team1_{i}"
key_team2 = f"team2_{i}"
load_data_from_json(key_team1)
load_data_from_json(key_team2)
_q1 = st.session_state.get(key_team1) # может быть None — это ок
_q2 = st.session_state.get(key_team2)
timeout1 = []
timeout2 = []
if isinstance(cached_game_online, dict):
result = cached_game_online.get("result") or {}
plays = result.get("plays") or []
timeout1, timeout2 = [], []
for event in plays:
if isinstance(event, dict) and event.get("play") == 23:
if event.get("startNum") == 1:
timeout1.append(event)
elif event.get("startNum") == 2:
timeout2.append(event)
col1, col4, col2, col5, col3 = st.columns([1, 5, 3, 5, 1])
t1 = result.get("team1") or {}
t2 = result.get("team2") or {}
if t1.get("logo"):
col1.image(t1["logo"], width=100)
team1_name = t1.get("name") or ""
team2_name = t2.get("name") or ""
if team1_name or team2_name:
col2.markdown(
f"<h2 style='text-align: center'>{team1_name}{team2_name}</h2>",
unsafe_allow_html=True,
)
col2.markdown(
f"<h3 style='text-align: center'>{result['game']['localDate']} {result['game']['defaultZoneTime']}</h3>",
unsafe_allow_html=True,
)
if t2.get("logo"):
col3.image(t2["logo"], width=100)
col4_1, col4_2, col4_3 = col4.columns((1, 1, 1))
col5_1, col5_2, col5_3 = col5.columns((1, 1, 1))
# Points метрики безопасно
val1 = val2 = None
if isinstance(cached_team_stats, list) and len(cached_team_stats) > 0:
v1 = cached_team_stats[0].get("val1")
v2 = cached_team_stats[0].get("val2")
if v1 is not None and v2 is not None:
val1, val2 = int(v1), int(v2)
delta_color_1 = "off" if val1 == val2 else "normal"
col4_1.metric("Points", v1, val1 - val2, delta_color_1)
col5_3.metric("Points", v2, val2 - val1, delta_color_1)
col4_3.metric("TimeOuts", len(timeout1))
col5_1.metric("TimeOuts", len(timeout2))
if isinstance(cached_live_status, list) and cached_live_status:
foulsA = (cached_live_status[0] or {}).get("foulsA")
foulsB = (cached_live_status[0] or {}).get("foulsB")
if foulsA is not None:
col4_2.metric("Fouls", foulsA)
if foulsB is not None:
col5_2.metric("Fouls", foulsB)
if isinstance(cached_game_online, dict) and (
(cached_game_online.get("result") or {}).get("plays") or []
):
col_1_col = [f"col_1_{i}" for i in range(1, period_max + 1)]
col_2_col = [f"col_2_{i}" for i in range(1, period_max + 1)]
count_q = 0
score_by_quarter_1 = [
x.get("score1")
for x in cached_scores
if isinstance(x, dict) and x.get("score1") not in ("", None)
]
score_by_quarter_2 = [
x.get("score2")
for x in cached_scores
if isinstance(x, dict) and x.get("score2") not in ("", None)
]
if score_by_quarter_1:
col_1_col = col4.columns([1 for _ in range(len(score_by_quarter_1))])
col_2_col = col5.columns([1 for _ in range(len(score_by_quarter_2))])
for q1, q2, col1_i, col2_i in zip(
score_by_quarter_1, score_by_quarter_2, col_1_col, col_2_col
):
count_q += 1
name_q = f"OT{count_q-4}" if count_q > 4 else f"Q{count_q}"
try:
delta_color = "off" if int(q1) == int(q2) else "normal"
col1_i.metric(name_q, q1, int(q1) - int(q2), delta_color, border=True)
col2_i.metric(name_q, q2, int(q2) - int(q1), delta_color, border=True)
except (ValueError, TypeError):
# если кривые данные в JSON, просто пропустим
pass
(
tab_temp_1,
tab_temp_2,
tab_temp_3,
tab_temp_4,
tab_temp_5,
tab_temp_6,
tab_pbp,
tab_temp_7,
tab_temp_8,
tab_schedule,
tab_online,
) = st.tabs(
[
"Игроки",
"Команды",
"Судьи",
"Турнирная таблица",
"Статистика четвертей",
"Ход игры",
"События игры",
"Статистика по четвертям",
"Milestones",
"Прошедшие/будущие матчи",
"Сегодня",
]
)
def check_milestone(value, milestone_type, name, num, where):
milestone_checks = {
"PTS": "point",
"AST": "assist",
"BLK": "block",
"REB": "rebound",
"DREB": "defensive rebound",
"OREB": "offensive rebound",
"STL": "steal",
"GAMES": "game",
}
if milestone_type == "GAMES":
list_data = [*range(50, 5100, 50)]
if int(value) % 100 in [49, 99]:
diff = [l - int(value) for l in list_data]
positive_numbers = [num for num in diff if num > -1]
count = 0
for i in diff:
count += 1
if i == min(positive_numbers):
break
full_word = [
word
for w, word in milestone_checks.items()
if w.lower() == milestone_type.lower()
][0]
# print(positive_numbers)
if min(positive_numbers) != 0:
word = full_word if min(positive_numbers) == 1 else f"{full_word}s"
if where == "season":
where = "in this season"
elif where == "league":
where = "in career VTB"
string_value = f"{name} needs {min(positive_numbers)} {word} to reach {list_data[count-1]} {full_word}s {where}"
else:
string_value = ""
return {
"NameGFX": f"{name} ({num})",
"type": milestone_type.upper(),
"value": value,
"string_value": string_value,
}
else:
list_data = [*range(100, 5100, 100)]
if (int(value) % 100) >= 90 or (int(value) % 1000) in list_data:
diff = [l - int(value) for l in list_data]
positive_numbers = [num for num in diff if num > -1]
count = 0
for i in diff:
count += 1
if i == min(positive_numbers):
break
# print(positive_numbers)
full_word = [
word
for w, word in milestone_checks.items()
if w.lower() == milestone_type.lower()
][0]
# print(positive_numbers)
if min(positive_numbers) != 0:
word = full_word if min(positive_numbers) == 1 else f"{full_word}s"
if where == "season":
where = "in this season"
elif where == "league":
where = "in career VTB"
string_value = f"{name} needs {min(positive_numbers)} {word} to reach {list_data[count-1]} {full_word}s {where}"
else:
string_value = ""
return {
"NameGFX": f"{name} ({num})",
"type": milestone_type.upper(),
"value": value,
"string_value": string_value,
}
return None
def milestones(data):
new_data_season = []
new_data_career = []
for d in data:
if d["startRole"] == "Player":
milestone_checks = {
"PTS": d["TPoints"],
"AST": d["TAssist"],
"BLK": d["TBlocks"],
"REB": d["TRebound"],
"DREB": d["TDefRebound"],
"OREB": d["TOffRebound"],
"STL": d["TSteal"],
"GAMES": d["TGameCount"],
}
milestone_career_checks = {
"PTS": d["CareerTPoints"],
"AST": d["CareerTAssist"],
"BLK": d["CareerTBlocks"],
"REB": d["CareerTRebound"],
"DREB": d["CareerTDefRebound"],
"OREB": d["CareerTOffRebound"],
"STL": d["CareerTSteal"],
"GAMES": d["CareerTGameCount"],
}
for milestone_type, value in milestone_checks.items():
milestone_data = check_milestone(
value, milestone_type, d["NameGFX"], d["num"], "season"
)
if milestone_data:
new_data_season.append(milestone_data)
for milestone_type_car, value_car in milestone_career_checks.items():
milestone_data_car = check_milestone(
value_car, milestone_type_car, d["NameGFX"], d["num"], "league"
)
if milestone_data_car:
new_data_career.append(milestone_data_car)
return new_data_season, new_data_career
columns_game = [
"num",
# "roleShort",
"NameGFX",
"isOn",
# "flag",
"pts",
"pt-2",
"pt-3",
"pt-1",
"fg",
"ast",
"stl",
"blk",
# "blkVic",
"dreb",
"oreb",
"reb",
"to",
"foul",
# "fouled",
"plusMinus",
"dunk",
"kpi",
"time",
]
if cached_team1 and cached_team2:
team1_data = process_team_data(cached_team1, columns_game)
team2_data = process_team_data(cached_team2, columns_game)
# Стилизация данных
team1_styled = (
team1_data.style.apply(highlight_grey, axis=1)
.apply(highlight_foul, subset="foul")
.apply(highlight_max, subset="pts")
.apply(highlight_max, subset="kpi")
)
team2_styled = (
team2_data.style.apply(highlight_grey, axis=1)
.apply(highlight_foul, subset="foul")
.apply(highlight_max, subset="pts")
.apply(highlight_max, subset="kpi")
)
def get_player_all_game(player_data_1):
try:
directory = FOLDER_JSON # type: ignore[name-defined]
except NameError:
# иначе используем аргумент по умолчанию/переданный
pass
os.makedirs(directory, exist_ok=True)
host_prefix = _ipcheck()
filename = player_data_1["id"]
filepath = os.path.join(directory, f"{host_prefix}{filename}.json")
# print(filepath) # оставил как у тебя; можно заменить на logger.debug при желании
try:
with open(filepath, "r", encoding="utf-8") as f:
players_game = json.load(f)
except Exception:
pass
df_player_game = pd.json_normalize(players_game)
# Отфильтровать строки, где class == "Normal"
# print(player_data_1)
df_filtered = df_player_game[df_player_game["class"] == "Normal"].copy()
# Преобразуем game.gameDate в datetime
df_filtered["game.gameDate"] = pd.to_datetime(
df_filtered["game.gameDate"], errors="coerce", dayfirst=True
)
# Удалим строки без корректной даты (если такие есть)
# df_filtered = df_filtered.dropna(subset=["game.gameDate"])
# Сортировка от последнего матча к первому
df_filtered = df_filtered.sort_values(by="game.gameDate", ascending=False)
# Указать нужные колонки для вывода
columns_to_show = [
"season",
"game.gameDate",
"game.team1Name",
"game.team2Name",
"game.score",
"stats.points",
"stats.shot2Percent",
"stats.shot3Percent",
"stats.shot23Percent",
"stats.shot1Percent",
"stats.assist",
"stats.steal",
"stats.blockShot",
"stats.defRebound",
"stats.offRebound",
"stats.rebound",
"stats.turnover",
"stats.foul",
"stats.playedTime",
"stats.plusMinus",
]
numeric_cols = [
"stats.points",
"stats.assist",
"stats.steal",
"stats.blockShot",
"stats.rebound",
]
# df_filtered[numeric_cols] = df_filtered[numeric_cols].apply(
# pd.to_numeric, errors="coerce"
# )
df_filtered[numeric_cols] = df_filtered[numeric_cols].apply(pd.to_numeric, errors="coerce")
df_filtered[numeric_cols] = df_filtered[numeric_cols].round(0).astype("Int64")
styled = (
df_filtered[columns_to_show]
.style
.apply(highlight_max, subset=["stats.points"])
.apply(highlight_max, subset=["stats.assist"])
.apply(highlight_max, subset=["stats.steal"])
.apply(highlight_max, subset=["stats.blockShot"])
.apply(highlight_max, subset=["stats.rebound"])
)
return styled
# Вывод данных
col_player1, col_player2 = tab_temp_1.columns((5, 5))
event1 = col_player1.dataframe(
team1_styled,
column_config=config,
hide_index=True,
height=460,
on_select="rerun",
selection_mode=[
"single-row",
],
)
event2 = col_player2.dataframe(
team2_styled,
column_config=config,
hide_index=True,
height=460,
on_select="rerun",
selection_mode=[
"single-row",
],
)
if event1.selection and event1.selection.get("rows"):
selected_index1 = event1.selection["rows"][0]
st.session_state["player1"] = (
selected_index1 # Сохранение состояния в session_state
)
else:
st.session_state["player1"] = None
if st.session_state["player1"] is not None:
selected_player_1, player_data_1 = process_player_data(
cached_team1, st.session_state["player1"]
)
if player_data_1["num"]:
z, a, q, b, c, d, e = col_player1.columns((1, 6, 1, 1, 1, 1, 1))
z.metric("Номер", player_data_1["num"], border=False)
a.metric("Игрок", player_data_1["NameGFX"], border=False)
q.image(player_data_1["flag"])
b.metric("Амплуа", player_data_1["roleShort"], border=False)
c.metric("Возраст", player_data_1["age"], border=False)
d.metric("Рост", player_data_1["height"].split()[0], border=False)
e.metric("Вес", player_data_1["weight"].split()[0], border=False)
col_player1.dataframe(
selected_player_1,
column_config=config_season,
hide_index=True,
)
col_player1.dataframe(get_player_all_game(player_data_1))
if event2.selection and event2.selection.get("rows"):
selected_index2 = event2.selection["rows"][0]
st.session_state["player2"] = (
selected_index2 # Сохранение состояния в session_state
)
else:
st.session_state["player2"] = None
if st.session_state["player2"] is not None:
selected_player_2, player_data_2 = process_player_data(
cached_team2, st.session_state["player2"]
)
if player_data_2["num"]:
z, a, q, b, c, d, e = col_player2.columns((1, 6, 1, 1, 1, 1, 1))
z.metric("Номер", player_data_2["num"], border=False)
a.metric("Игрок", player_data_2["NameGFX"], border=False)
q.image(player_data_2["flag"])
b.metric("Амплуа", player_data_2["roleShort"], border=False)
c.metric("Возраст", player_data_2["age"], border=False)
d.metric("Рост", player_data_2["height"].split()[0], border=False)
e.metric("Вес", player_data_2["weight"].split()[0], border=False)
col_player2.dataframe(
selected_player_2,
column_config=config_season,
hide_index=True,
)
col_player2.dataframe(get_player_all_game(player_data_2))
team_col1, team_col2 = tab_temp_2.columns((5, 5))
if isinstance(cached_team_stats, list) and len(cached_team_stats) >= 34:
cached_team_stats_new = [
cached_team_stats[0],
*cached_team_stats[25:29],
cached_team_stats[7],
cached_team_stats[33],
*cached_team_stats[9:11],
*cached_team_stats[15:17],
]
team_col1.title("Статистика за матч")
team_col1.dataframe(cached_team_stats_new, height=500)
if isinstance(cached_team_comparison, list):
team_col2.title("Статистика за сезон")
# задаём желаемый порядок ключей
order = [
"points",
"points_1",
"points_2",
"points_3",
"points_23",
"assists",
"rebounds",
"steals",
"blocks",
"turnovers",
"fouls",
"games",
# "team",
]
# формируем результат с учётом порядка
result = [
{
"name": key,
"val1": cached_team_comparison[0][key],
"val2": cached_team_comparison[1][key],
}
for key in order
if key in cached_team_comparison[0]
]
team_col2.dataframe(result, width="stretch", height=500)
if isinstance(cached_referee, (list, pd.DataFrame)):
tab_temp_3.dataframe(
cached_referee,
height=600,
width="content",
column_config={"flag": st.column_config.ImageColumn("flag")},
)
column_config_ref = {
"flag": st.column_config.ImageColumn(
"flag",
),
}
def highlight_teams(s):
try:
if s.iloc[0] in (
cached_game_online["result"]["team1"]["teamId"],
cached_game_online["result"]["team2"]["teamId"],
):
return ["background-color: #FF4B4B"] * len(s)
else:
return [""] * len(s)
except NameError:
return [""] * len(s)
if cached_standings:
df_st = pd.json_normalize(cached_standings)
def highlight_teams(s):
try:
t1 = (
((cached_game_online or {}).get("result") or {})
.get("team1", {})
.get("teamId")
)
t2 = (
((cached_game_online or {}).get("result") or {})
.get("team2", {})
.get("teamId")
)
if s.iloc[0] in (t1, t2):
return ["background-color: #FF4B4B"] * len(s)
except Exception:
pass
return [""] * len(s)
styled = df_st.style.apply(highlight_teams, axis=1)
tab_temp_4.dataframe(
styled,
column_config={"logo": st.column_config.ImageColumn("logo")},
hide_index=True,
height=610,
)
if isinstance(cached_scores_quarter, list) and len(cached_scores_quarter) >= 2:
column_config = {}
for quarter in ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"]:
column_name = f"score_avg{quarter}"
column_config[column_name] = st.column_config.NumberColumn(
column_name, format="%.1f"
)
columns_quarters_name = ["Q1", "Q2", "Q3", "Q4"]
columns_quarters_name_ot = ["OT1", "OT2", "OT3", "OT4"]
columns_quarters = tab_temp_5.columns((1, 1, 1, 1))
# Основные четверти
for index, col in enumerate(columns_quarters):
q = columns_quarters_name[index]
df_col = [
{
"team": cached_scores_quarter[0].get("team"),
"W": cached_scores_quarter[0].get(f"win{q}"),
"L": cached_scores_quarter[0].get(f"lose{q}"),
"D": cached_scores_quarter[0].get(f"draw{q}"),
"PTS": cached_scores_quarter[0].get(f"score{q}"),
"AVG": cached_scores_quarter[0].get(f"score_avg{q}"),
},
{
"team": cached_scores_quarter[1].get("team"),
"W": cached_scores_quarter[1].get(f"win{q}"),
"L": cached_scores_quarter[1].get(f"lose{q}"),
"D": cached_scores_quarter[1].get(f"draw{q}"),
"PTS": cached_scores_quarter[1].get(f"score{q}"),
"AVG": cached_scores_quarter[1].get(f"score_avg{q}"),
},
]
col.write(q)
col.dataframe(df_col)
# Овертаймы
for index, col in enumerate(columns_quarters):
q = columns_quarters_name_ot[index]
df_col = [
{
"team": cached_scores_quarter[0].get("team"),
"W": cached_scores_quarter[0].get(f"win{q}"),
"L": cached_scores_quarter[0].get(f"lose{q}"),
"D": cached_scores_quarter[0].get(f"draw{q}"),
"PTS": cached_scores_quarter[0].get(f"score{q}"),
"AVG": cached_scores_quarter[0].get(f"score_avg{q}"),
},
{
"team": cached_scores_quarter[1].get("team"),
"W": cached_scores_quarter[1].get(f"win{q}"),
"L": cached_scores_quarter[1].get(f"lose{q}"),
"D": cached_scores_quarter[1].get(f"draw{q}"),
"PTS": cached_scores_quarter[1].get(f"score{q}"),
"AVG": cached_scores_quarter[1].get(f"score_avg{q}"),
},
]
col.write(q)
col.dataframe(df_col)
if isinstance(cached_play_by_play, list) and isinstance(cached_game_online, dict):
plays = (cached_game_online.get("result") or {}).get("plays") or []
if plays:
tab_temp_6.table(cached_play_by_play)
def _ensure_columns(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
"""Гарантирует наличие всех столбцов в df, отсутствующие заполняет NaN/0 по типу."""
for c in cols:
if c not in df.columns:
# для числовых метрик по четверти логично ставить 0
df[c] = 0
# Переупорядочим столбцы
return df[cols]
def _safe_dataframe(
container, df: pd.DataFrame, cols_for_st: list[str], height_per_row: int = 38
):
df = _ensure_columns(df, cols_for_st)
rows = len(df)
# Стилизуем безопасно (если вдруг highlight_max упадёт — показываем без стиля)
try:
styled = (
df.style.apply(highlight_max, subset="q_pts") if not df.empty else df.style
)
except Exception:
styled = df.style
kwargs = dict(column_config=config, hide_index=True)
# height только если он действительно нужен
if rows > 10:
kwargs["height"] = height_per_row * rows
# или: kwargs["height"] = "stretch" # если хотите растягивать блок
container.dataframe(styled, **kwargs)
# 1) Безопасная проверка наличия плей-данных
has_plays = bool(
cached_game_online and cached_game_online.get("result", {}).get("plays")
)
if has_plays:
# 2) Готовим вкладки по количеству периодов
# Если у вас уже есть count_quarter — используйте его. Иначе берём из period_max.
# === REPLACE: безопасное определение количества периодов и создание вкладок ===
def _safe_int(x, default=None):
try:
v = int(x)
return v if v is not None else default
except Exception:
return default
def _max_period_from_plays(cached_game_online) -> int:
plays = (cached_game_online or {}).get("result", {}).get("plays") or []
periods = []
for item in plays:
try:
v = int(item.get("period"))
periods.append(v)
except Exception:
# пропускаем пустые/неконвертируемые значения
continue
return max(periods) if periods else 0
count_quarter = _max_period_from_plays(cached_game_online)
# 3) попытка по ключам team1_/team2_ в session_state (например, team1_1..team1_4)
if count_quarter is None or count_quarter <= 0:
import re
mx = 0
for k in st.session_state.keys():
m1 = re.fullmatch(r"team1_(\d+)", k)
m2 = re.fullmatch(r"team2_(\d+)", k)
if m1:
mx = max(mx, int(m1.group(1)))
if m2:
mx = max(mx, int(m2.group(1)))
count_quarter = mx if mx > 0 else None
# 4) дефолт, если ничего не получилось
if count_quarter is None or count_quarter <= 0:
# если совсем нет данных — не создаём вкладки и показываем инфо
tab_temp_7.info(
"Нет корректного числа периодов для отображения таблиц по четвертям."
)
count_quarter = 0
# 5) создаём вкладки, только если их хотя бы одна
if count_quarter > 0:
tab_labels = [f"{i+1}Q" for i in range(count_quarter)]
columns_quarter = tab_temp_7.tabs(tab_labels)
else:
columns_quarter = [] # чтобы ниже цикл просто не выполнялся
columns_quarter_for_st = [
"num",
"NameGFX",
"q_pts",
"q_pt2",
"q_pt3",
"q_ft",
"q_pt23",
"q_ast",
"q_stl",
"q_blk",
"q_reb",
"q_to",
"q_f",
"q_f_on",
"q_rnk",
"q_time",
]
# 3) Рендер таблиц по четвертям
for i in range(count_quarter):
# Каждая вкладка -> 2 колонки
try:
col_quarter1, col_quarter2 = columns_quarter[i].columns((5, 5))
except IndexError:
# На случай несоответствия размеров
st.warning(f"Недостаточно вкладок для периода #{i+1}. Пропуск.")
continue
# --- Team 1 ---
key_team1 = f"team1_{i+1}"
try:
load_data_from_json(key_team1)
except Exception as e:
tab_temp_7.warning(f"Не удалось загрузить данные для {key_team1}: {e}")
continue
data_team1 = st.session_state.get(key_team1, [])
# print(data_team1)
df_team1 = (
pd.DataFrame(data_team1)
if isinstance(data_team1, (list, tuple, dict))
else pd.DataFrame()
)
# _safe_dataframe(col_quarter1, df_team1, columns_quarter_for_st)
# --- Team 2 ---
key_team2 = f"team2_{i+1}"
try:
load_data_from_json(key_team2)
except Exception as e:
tab_temp_7.warning(f"Не удалось загрузить данные для {key_team2}: {e}")
continue
data_team2 = st.session_state.get(key_team2, [])
df_team2 = (
pd.DataFrame(data_team2)
if isinstance(data_team2, (list, tuple, dict))
else pd.DataFrame()
)
# _safe_dataframe(col_quarter2, df_team2, columns_quarter_for_st)
tab_temp_7.warning("В процессе разработки!!!!!!")
# 4) Блок с milestones по командам (если есть кэш)
if cached_team1 and cached_team2:
try:
data_team_season_1, data_team_career_1 = milestones(cached_team1)
data_team_season_2, data_team_career_2 = milestones(cached_team2)
except Exception as e:
st.warning(f"Не удалось получить milestones: {e}")
else:
# На случай, если функции вернули не DataFrame
if not isinstance(data_team_season_1, pd.DataFrame):
data_team_season_1 = pd.DataFrame(data_team_season_1 or {})
if not isinstance(data_team_career_1, pd.DataFrame):
data_team_career_1 = pd.DataFrame(data_team_career_1 or {})
if not isinstance(data_team_season_2, pd.DataFrame):
data_team_season_2 = pd.DataFrame(data_team_season_2 or {})
if not isinstance(data_team_career_2, pd.DataFrame):
data_team_career_2 = pd.DataFrame(data_team_career_2 or {})
tab7_col1, tab7_col2 = tab_temp_8.columns((5, 5))
tab7_col1.title("Сезон")
tab7_col1.dataframe(data_team_season_1)
tab7_col2.title("Сезон")
tab7_col2.dataframe(data_team_season_2)
tab7_col1.title("Карьера")
tab7_col1.dataframe(data_team_career_1)
tab7_col2.title("Карьера")
tab7_col2.dataframe(data_team_career_2)
else:
tab_temp_7.info("Нет онлайн-плей-данных для отображения.")
def schedule_selected_team(team_id, data, game_id, selected, away_team_id):
columns = [
"game.localDate",
"team1.name",
"team1.logo",
"game.score",
"team2.logo",
"team2.name",
"game.fullScore",
"win",
"team1.teamId",
"team2.teamId",
]
df_schedule_new = data.loc[
# (data["game.id"] < game_id)
# &
(
(data["team1.teamId"].isin([team_id]))
| (data["team2.teamId"].isin([team_id]))
)
]
df_schedule_new.loc[:, "game.fullScore"] = df_schedule_new[
"game.fullScore"
].str.split(",")
conditions = [
(df_schedule_new["team1.teamId"] == team_id)
& (df_schedule_new["game.score1"] > df_schedule_new["game.score2"]),
(df_schedule_new["team1.teamId"] == team_id)
& (df_schedule_new["game.score1"] < df_schedule_new["game.score2"]),
(df_schedule_new["team2.teamId"] == team_id)
& (df_schedule_new["game.score2"] > df_schedule_new["game.score1"]),
(df_schedule_new["team2.teamId"] == team_id)
& (df_schedule_new["game.score2"] < df_schedule_new["game.score1"]),
]
values = [True, False, True, False]
df_schedule_new = df_schedule_new.copy()
df_schedule_new.loc[:, "win"] = np.select(conditions, values, default=None)
mask = pd.Series(True, index=df_schedule_new.index)
# Проверяем каждое выбранное условие и объединяем с маской
if selected:
if "Дома" in selected:
mask &= df_schedule_new["team1.teamId"] == team_id
if "В гостях" in selected:
mask &= df_schedule_new["team2.teamId"] == team_id
if "Выигрыши" in selected:
mask &= df_schedule_new["win"] == True
if "Поражения" in selected:
mask &= df_schedule_new["win"] == False
if "Друг с другом" in selected:
mask &= df_schedule_new["team1.teamId"].isin(
[away_team_id, team_id]
) & df_schedule_new["team2.teamId"].isin([away_team_id, team_id])
return df_schedule_new[columns].loc[mask]
def get_in(d, path, default=None):
cur = d
for key in path:
if not isinstance(cur, dict):
return default
cur = cur.get(key, default)
if cur is default:
return default
return cur
if tab_schedule:
if cached_schedule and "items" in cached_schedule:
cached_schedule = cached_schedule["items"]
pd_schedule = pd.json_normalize(cached_schedule)
game_online = st.session_state.get("game_online")
# print(game_online)
team1_id = get_in(game_online, ["result", "team1", "teamId"])
team1_name = get_in(game_online, ["result", "team1", "name"])
team2_id = get_in(game_online, ["result", "team2", "teamId"])
team2_name = get_in(game_online, ["result", "team2", "name"])
game_id = get_in(game_online, ["result", "game", "id"])
col1_schedule, col2_schedule = tab_schedule.columns((5, 5))
options = ["Дома", "В гостях", "Выигрыши", "Поражения", "Друг с другом"]
selection1 = col1_schedule.segmented_control(
"Фильтр", options, selection_mode="multi", key="1"
)
selection2 = col2_schedule.segmented_control(
"Фильтр", options, selection_mode="multi", key="2"
)
team1_data = schedule_selected_team(
team1_id, pd_schedule, game_id, selection1, team2_id
)
# print(team1_data)
team2_data = schedule_selected_team(
team2_id, pd_schedule, game_id, selection2, team1_id
)
def highlight_two_teams(s):
# print(s)
try:
if s.loc["team1.teamId"] in (
team1_id,
team2_id,
) and s.loc["team2.teamId"] in (
team1_id,
team2_id,
):
return ["background-color: #FF4B4B"] * len(s)
else:
return [""] * len(s)
except NameError:
return [""] * len(s)
column_config = {
"team1.name": st.column_config.TextColumn("Команда1", width=150),
"team2.name": st.column_config.TextColumn("Команда2", width=150),
"game.score": st.column_config.TextColumn(
"Счёт",
),
"game.localDate": st.column_config.TextColumn(
"Дата",
),
"game.fullScore": st.column_config.Column(
"Счёт по четвертям", width="medium"
),
"team1.logo": st.column_config.ImageColumn("Лого1", width=50),
"team2.logo": st.column_config.ImageColumn("Лого2", width=50),
}
count_game_1 = len(team1_data)
count_game_2 = len(team2_data)
team1_data = team1_data.style.apply(highlight_two_teams, axis=1).apply(
color_win, subset="win"
)
team2_data = team2_data.style.apply(highlight_two_teams, axis=1).apply(
color_win, subset="win"
)
height1 = 38 * max(count_game_1, 10)
height2 = 38 * max(count_game_2, 10)
col1_schedule.dataframe(
team1_data,
hide_index=True,
height=int(min(height1, 1200)),
column_config=column_config,
)
col2_schedule.dataframe(
team2_data,
hide_index=True,
height=int(min(height2, 1200)),
column_config=column_config,
)
with tab_online:
if isinstance(cached_schedule, list):
current_date = pd.to_datetime(datetime.now().date())
df_online = pd.json_normalize(cached_schedule)
df_online["game.localDate"] = pd.to_datetime(
df_online["game.localDate"], format="%d.%m.%Y", errors="coerce"
)
df_filtered = df_online[df_online["game.localDate"] == current_date].copy()
names = {
"Avtodor": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\AVTODOR.png",
"BETCITY PARMA": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\BETCITY_PARMA_ENG.png",
"CSKA": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\CSKA_BLACK_BACK_ENG.png",
"Enisey": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\ENISEY_ENG.png",
"Lokomotiv Kuban": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\LOKOMOTIV KUBAN.png",
"MBA-MAI": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\MBA-MAI.png",
"Pari Nizhny Novgorod": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\PARI_NN_ENG.png",
"Samara": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\SAMARA_BLACK_BACK.png",
"UNICS": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\UNICS_BLACK_BACK.png",
"Uralmash": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\URALMASH_ENG.png",
"Zenit": r"D:\Графика\БАСКЕТБОЛ\VTB League\TEAM LOGOS\LOGOS 2025-2026\ZENIT.png",
}
df_filtered["logo1"] = df_filtered["team1.name"].map(names)
df_filtered["logo2"] = df_filtered["team2.name"].map(names)
# Словарь для быстрого поиска по gameId
live_data_map = {}
# Собираем live-данные по каждому game.id
for _, row in df_filtered.iterrows():
game_id = row["game.id"]
try:
json_data = requests.get(
f"https://pro.russiabasket.org/api/abc/games/live-status?Id={game_id}&Lang=en",
).json()
except Exception as ex:
# json_data = {
# "period": None,
# "timeToGo": 0.0,
# }
print(ex)
# Берём содержимое result (словарь с gameId и данными)
result = json_data.get("result", {})
if result and "gameId" in result:
live_data_map[result["gameId"]] = result
# Создаём колонки для live-данных
df_filtered["live_period"] = None
df_filtered["live_timeToGo"] = None
df_filtered["live_scoreA"] = None
df_filtered["live_scoreB"] = None
df_filtered["live_status"] = None
df_filtered["period"] = None
# Заполняем колонки, где game.id совпадает с gameId
for idx, row in df_filtered.iterrows():
game_id = row["game.id"]
if game_id in live_data_map:
live_info = live_data_map[game_id]
df_filtered.at[idx, "live_period"] = live_info.get("period")
df_filtered.at[idx, "live_timeToGo"] = live_info.get("timeToGo")
df_filtered.at[idx, "live_scoreA"] = live_info.get("scoreA")
df_filtered.at[idx, "live_scoreB"] = live_info.get("scoreB")
df_filtered.at[idx, "live_status"] = live_info.get("gameStatus")
try:
if live_info.get("timeToGo") != "":
if float(live_info.get("timeToGo")) == 0.0:
if live_info.get("period") == 2:
df_filtered.at[idx, "period"] = "HT"
elif live_info.get("period") in [1, 3]:
df_filtered.at[idx, "period"] = (
f"END {live_info.get('period')}Q"
)
elif float(live_info.get("timeToGo")) > 0.0:
df_filtered.at[idx, "period"] = (
f"{live_info.get('period')}Q"
)
else:
df_filtered.at[idx, "period"] = ""
except Exception as ex:
df_filtered.at[idx, "period"] = ""
print(ex)
df_filtered["live_scoreA"] = pd.to_numeric(
df_filtered["live_scoreA"], errors="coerce"
).fillna(pd.to_numeric(df_filtered["game.score1"], errors="coerce"))
df_filtered["live_scoreB"] = pd.to_numeric(
df_filtered["live_scoreB"], errors="coerce"
).fillna(pd.to_numeric(df_filtered["game.score2"], errors="coerce"))
# Вывод нужных колонок
columns = [
"game.id",
"game.gameStatus",
"status.displayName",
"game.defaultZoneTime",
"team1.name",
"team2.name",
"game.score1",
"game.score2",
"live_scoreA",
"live_scoreB",
"live_period",
"live_timeToGo",
"live_status",
"period",
"team1.logo",
"team2.logo",
"logo1",
"logo2",
]
existing_columns = [c for c in columns if c in df_filtered.columns]
df_sel = df_filtered.loc[:, existing_columns].copy()
data = df_sel.to_dict(orient="records") # список словарей
rewrite_file("online", data)
st.dataframe(
df_filtered[existing_columns],
column_config={
"team1.logo": st.column_config.ImageColumn("team1.logo"),
"team2.logo": st.column_config.ImageColumn("team2.logo"),
},
width="content",
)
def get_play_info(play):
# Ищем в списке play_type_id элемент, у которого PlayTypeID совпадает с play
for item in play_type_id:
if item["PlayTypeID"] == play:
return item["PlayInfoSite"]
return None # Если совпадение не найдено
def get_player_name(start_num):
# Ищем в списке teams_temp элемент, у которого startNum совпадает с temp_data_pbp["startNum"]
for player in teams_temp:
if player["startNum"] == start_num:
return f"{player['firstName']} {player['lastName']}"
return None # Если совпадение не найдено
def get_event_time(row):
if row != 0:
time_str = 6000 - row
if time_str == 0:
time_str = "0:00"
else:
time_str = time_str // 10
time_str = f"{time_str // 60}:{str(time_str % 60).zfill(2)}"
return time_str
# Безопасная загрузка PlayTypeID
try:
with open("PlayTypeID.json", "r", encoding="utf-8") as f:
play_type_id = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
play_type_id = []
teams_section = ((cached_game_online or {}).get("result") or {}).get("teams") or {}
# Если teams_section — список (например, [{"starts": [...]}, {...}])
if isinstance(teams_section, list):
if len(teams_section) >= 2:
starts1 = next(
(t.get("starts") for t in teams_section if t["teamNumber"] == 1), None
)
starts2 = next(
(t.get("starts") for t in teams_section if t["teamNumber"] == 2), None
)
else:
starts1 = []
starts2 = []
# Если teams_section — словарь (обычно {"1": {...}, "2": {...}})
elif isinstance(teams_section, dict):
starts1 = (teams_section.get(1) or teams_section.get("1") or {}).get("starts") or []
starts2 = (teams_section.get(2) or teams_section.get("2") or {}).get("starts") or []
else:
starts1 = []
starts2 = []
teams_temp = sorted(
[x for x in starts1 if isinstance(x, dict)], key=lambda x: x.get("playerNumber", 0)
) + sorted(
[x for x in starts2 if isinstance(x, dict)], key=lambda x: x.get("playerNumber", 0)
)
list_fullname = [None] + [
f"({x.get('displayNumber')}) {x.get('firstName','')} {x.get('lastName','')}".strip()
for x in teams_temp
if x.get("startRole") == "Player"
]
def get_play_info(play):
for item in play_type_id:
if isinstance(item, dict) and item.get("PlayTypeID") == play:
return item.get("PlayInfoSite")
return None
def get_player_name(start_num):
for player in teams_temp:
if player.get("startNum") == start_num:
return f"{player.get('firstName','')} {player.get('lastName','')}".strip()
return None
def get_event_time(row):
if isinstance(row, (int, float)) and row != 0:
time_val = 6000 - int(row)
if time_val <= 0:
return "0:00"
time_val //= 10
return f"{time_val // 60}:{str(time_val % 60).zfill(2)}"
return None
with tab_pbp:
plays = ((cached_game_online or {}).get("result") or {}).get("plays") or []
if plays:
temp_data_pbp = pd.DataFrame(plays)
col1_pbp, col2_pbp = tab_pbp.columns((3, 4))
option_player = col1_pbp.selectbox("Выбрать игрока", list_fullname)
options_pbp = ["1 очко", "2 очка", "3 очка"]
selection_pbp = col1_pbp.segmented_control(
"Фильтр", options_pbp, selection_mode="multi", key=3
)
if not temp_data_pbp.empty and "period" in temp_data_pbp.columns:
options_quarter = [
(f"{i+1} четверть" if i + 1 < 5 else f"{i-3} овертайм")
for i in range(int(temp_data_pbp["period"].max()))
]
selection_quarter = col1_pbp.segmented_control(
"Выбор четверти", options_quarter, selection_mode="multi", key=4
)
temp_data_pbp["info"] = temp_data_pbp["play"].map(get_play_info)
temp_data_pbp["who"] = temp_data_pbp["startNum"].map(get_player_name)
temp_data_pbp["time"] = temp_data_pbp["sec"].map(get_event_time)
mask1 = pd.Series(True, index=temp_data_pbp.index)
if option_player:
# безопасный поиск startNum
for x in teams_temp:
display = f"({x.get('displayNumber')}) {x.get('firstName','')} {x.get('lastName','')}".strip()
if display == option_player:
mask1 &= temp_data_pbp["startNum"] == x.get("startNum")
break
if selection_pbp:
plays_mapping = {"1 очко": 1, "2 очка": 2, "3 очка": 3}
selected_plays = [
plays_mapping[p] for p in selection_pbp if p in plays_mapping
]
mask1 &= temp_data_pbp["play"].isin(selected_plays)
if selection_quarter:
select_quart = [
i + 1
for i, q in enumerate(options_quarter)
if q in selection_quarter
]
mask1 &= temp_data_pbp["period"].isin(select_quart)
filtered_data_pbp = temp_data_pbp[mask1]
count_pbp = len(filtered_data_pbp)
column_pbp = ["num", "info", "who", "period", "time"]
column_config_pbp = {
"info": st.column_config.TextColumn(width="medium"),
"who": st.column_config.TextColumn(width="large"),
}
col2_pbp.dataframe(
filtered_data_pbp[column_pbp],
column_config=column_config_pbp,
hide_index=True,
height=(38 * count_pbp if count_pbp > 10 else "auto"),
)
else:
tab_pbp.info("Данных play-by-play нет.")