Files
RFB/get_data.py

897 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI
from contextlib import asynccontextmanager
import requests
from datetime import datetime
import threading
import time
import queue
import argparse
import uvicorn
from pprint import pprint
import os
# передадим параметры через аргументы или глобальные переменные
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument("--league", default="vtb")
parser.add_argument("--team", required=True)
parser.add_argument("--lang", default="en")
args = parser.parse_args()
LEAGUE = args.league
TEAM = args.team
LANG = args.lang
HOST = "https://ref.russiabasket.org"
STATUS = False
URLS = {
"seasons": "{host}/api/abc/comps/seasons?Tag={league}",
"actual-standings": "{host}/api/abc/comps/actual-standings?tag={league}&season={season}&lang={lang}",
"calendar": "{host}/api/abc/comps/calendar?Tag={league}&Season={season}&Lang={lang}&MaxResultCount=1000",
"game": "{host}/api/abc/games/game?Id={game_id}&Lang={lang}",
"pregame": "{host}/api/abc/games/pregame?tag={league}&season={season}&id={game_id}&lang={lang}",
"pregame-full-stats": "{host}/api/abc/games/pregame-full-stats?tag={league}&season={season}&id={game_id}&lang={lang}",
"live-status": "{host}/api/abc/games/live-status?id={game_id}",
"box-score": "{host}/api/abc/games/box-score?id={game_id}",
"play-by-play": "{host}/api/abc/games/play-by-play?id={game_id}",
}
# общая очередь
results_q = queue.Queue()
# тут будем хранить последние данные
latest_data = {}
# событие для остановки потоков
stop_event = threading.Event()
# Функция запускаемая в потоках
def get_data_from_API(
name: str, url: str, quantity: float, stop_event: threading.Event
):
if quantity <= 0:
raise ValueError("quantity must be > 0")
sleep_time = 1.0 / quantity # это и есть "раз в N секунд"
while not stop_event.is_set():
start = time.time()
try:
value = requests.get(url).json()
except Exception as ex:
value = {"error": str(ex)}
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
results_q.put({"source": name, "ts": ts, "data": value})
print(f"[{ts}] name: {name}, status: {value.get('status', 'no-status')}")
# сколько уже заняло
elapsed = time.time() - start
# сколько надо доспать, чтобы в сумме вышла нужная частота
to_sleep = sleep_time - elapsed
if to_sleep > 0:
time.sleep(to_sleep)
# если запрос занял дольше — просто сразу следующую итерацию
# Получение результатов из всех запущенных потоков
def results_consumer():
while not stop_event.is_set():
try:
msg = results_q.get(timeout=0.5)
except queue.Empty:
continue
if "play-by-play" in msg["source"]:
latest_data["game"]["data"]["result"]["plays"] = msg["data"]["result"]
elif "box-score" in msg["source"]:
if "game" in latest_data:
for team in latest_data["game"]["data"]["result"]["teams"]:
if team["teamNumber"] != 0:
box_team = [
t
for t in msg["data"]["result"]["teams"]
if t["teamNumber"] == team["teamNumber"]
]
if not box_team:
print("EROORRRRR")
next # log
box_team = box_team[0]
for player in team["starts"]:
box_player = [
p
for p in box_team["starts"]
if p["startNum"] == player["startNum"]
]
if box_player:
player["stats"] = box_player[0]
team["total"] = box_team["total"]
team["startTotal"] = box_team["startTotal"]
team["benchTotal"] = box_team["benchTotal"]
team["maxLeading"] = box_team["maxLeading"]
team["pointsInRow"] = box_team["pointsInRow"]
team["maxPointsInRow"] = box_team["maxPointsInRow"]
else:
latest_data[msg["source"]] = {
"ts": msg["ts"],
"data": msg["data"],
}
def get_items(data: dict) -> list:
"""
Мелкий хелпер: берём первый список в ответе API.
Многие ручки отдают {"result":[...]} или {"seasons":[...]}.
Если находим список — возвращаем его.
Если нет — возвращаем None (значит, нужно брать весь dict).
"""
for k, v in data.items():
if isinstance(v, list):
return data[k]
return None
def get_game_id(data):
"""
получаем GAME_ID для домашней команды. Если матча сегодня нет, то берем последний.
"""
items = get_items(data)
for game in items[::-1]:
if game["team1"]["name"].lower() == TEAM.lower():
game_date = datetime.strptime(game["game"]["localDate"], "%d.%m.%Y").date()
if game_date == datetime.now().date():
game_id = game["game"]["id"]
print(f"Получили актуальный id: {game_id} для {TEAM}")
return game_id, True
elif game_date < datetime.now().date():
game_id = game["game"]["id"]
print(f"Получили старый id: {game_id} для {TEAM}")
return game_id, False
else:
print(f"Не смогли найти игру для команды {TEAM}") # DEBUG
@asynccontextmanager
async def lifespan(app: FastAPI):
global STATUS
# -------- startup --------
# 1. определим сезон (как у тебя)
try:
season = requests.get(URLS["seasons"].format(host=HOST, league=LEAGUE)).json()[
"items"
][0]["season"]
except Exception:
# WARNING
now = datetime.now()
if now.month > 9:
season = now.year + 1
else:
season = now.year
print("не удалось получить последний сезон.") # WARNING
try:
calendar = requests.get(
URLS["calendar"].format(host=HOST, league=LEAGUE, season=season, lang=LANG)
).json()
except Exception as ex:
print(f"не получилось проверить работу API. код ошибки: {ex}") # ERROR
exit(1)
game_id, STATUS = get_game_id(calendar)
# 2. поднимем потоки
threads_long = [
threading.Thread(
target=get_data_from_API,
args=(
"pregame",
URLS["pregame"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
0.0016667,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"pregame-full-stats",
URLS["pregame-full-stats"].format(
host=HOST, league=LEAGUE, season=season, game_id=game_id, lang=LANG
),
0.0016667,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"actual-standings",
URLS["actual-standings"].format(
host=HOST, league=LEAGUE, season=season, lang=LANG
),
0.0016667,
stop_event,
),
daemon=True,
),
threading.Thread(
target=results_consumer,
daemon=True,
),
]
threads_live = [
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
0.0016667,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"live-status",
URLS["live-status"].format(host=HOST, game_id=game_id),
5,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"box-score",
URLS["box-score"].format(host=HOST, game_id=game_id),
5,
stop_event,
),
daemon=True,
),
threading.Thread(
target=get_data_from_API,
args=(
"play-by-play",
URLS["play-by-play"].format(host=HOST, game_id=game_id),
5,
stop_event,
),
daemon=True,
),
]
threads_offline = [
threading.Thread(
target=get_data_from_API,
args=(
"game",
URLS["game"].format(host=HOST, game_id=game_id, lang=LANG),
1,
stop_event,
),
daemon=True,
)
]
for t in threads_long:
t.start()
if STATUS:
for t in threads_live:
t.start()
else:
for t in threads_offline:
t.start()
# отдаём управление FastAPI
yield
# -------- shutdown --------
stop_event.set()
for t in threads_long:
t.join(timeout=1)
if STATUS:
for t in threads_live:
t.join(timeout=1)
else:
for t in threads_offline:
t.join(timeout=1)
app = FastAPI(lifespan=lifespan)
def format_time(seconds: float | int) -> str:
"""
Удобный формат времени для игроков:
71 -> "1:11"
0 -> "0:00"
Любые кривые значения -> "0:00".
"""
try:
total_seconds = int(float(seconds))
minutes = total_seconds // 60
sec = total_seconds % 60
return f"{minutes}:{sec:02}"
except (ValueError, TypeError):
return "0:00"
@app.get("/team1.json")
async def team1():
return await team("team1")
@app.get("/team2.json")
async def team2():
return await team("team2")
@app.get("/top_team1.json")
async def top_team1():
data = await team("team1")
return await top_sorted_team(data)
@app.get("/top_team2.json")
async def top_team2():
data = await team("team2")
return await top_sorted_team(data)
@app.get("/started_team1.json")
async def started_team1():
data = await team("team1")
return await started_team(data)
@app.get("/started_team2.json")
async def started_team2():
data = await team("team2")
return await started_team(data)
@app.get("/game.json")
async def game():
return latest_data["game"]
@app.get("/status.json")
async def status():
return [
{
"name": item,
"status": latest_data[item]["data"]["status"],
"ts": latest_data[item]["ts"],
}
for item in latest_data
]
@app.get("/scores.json")
async def scores():
quarters = ["Q1", "Q2", "Q3", "Q4", "OT1", "OT2", "OT3", "OT4"]
score_by_quarter = [{"Q": q, "score1": "", "score2": ""} for q in quarters]
full_score_list = latest_data["game"]["data"]["result"]["game"]["fullScore"].split(",")
for i, score_str in enumerate(full_score_list[: len(score_by_quarter)]):
parts = score_str.split(":")
if len(parts) == 2:
score_by_quarter[i]["score1"] = parts[0]
score_by_quarter[i]["score2"] = parts[1]
return score_by_quarter
async def top_sorted_team(data):
top_sorted_team = sorted(
(p for p in data if p.get("startRole") in ["Player", ""]),
key=lambda x: (
x.get("pts", 0),
x.get("dreb", 0) + x.get("oreb", 0),
x.get("ast", 0),
x.get("stl", 0),
x.get("blk", 0),
x.get("time", "0:00"),
),
reverse=True,
)
# пустые строки не должны ломать UI процентами фолов/очков
for player in top_sorted_team:
if player.get("num", "") == "":
player["pts"] = ""
player["foul"] = ""
return top_sorted_team
async def team(who: str):
"""
Формирует и записывает несколько JSON-файлов по составу и игрокам команды:
- <who>.json (полный список игроков с метриками)
- topTeam1.json / topTeam2.json (топ-игроки)
- started_team1.json / started_team2.json (игроки на паркете)
Вход:
merged: словарь из build_render_state()
who: "team1" или "team2"
"""
if who == "team1":
payload = next(
(
i
for i in latest_data["game"]["data"]["result"]["teams"]
if i["teamNumber"] == 1
),
None,
)
elif who == "team2":
payload = next(
(
i
for i in latest_data["game"]["data"]["result"]["teams"]
if i["teamNumber"] == 2
),
None,
)
role_list = [
("Center", "C"),
("Guard", "G"),
("Forward", "F"),
("Power Forward", "PF"),
("Small Forward", "SF"),
("Shooting Guard", "SG"),
("Point Guard", "PG"),
("Forward-Center", "FC"),
]
starts = payload["starts"]
team_rows = []
for item in starts:
stats = item["stats"]
row = {
"id": item.get("personId") or "",
"num": item.get("displayNumber"),
"startRole": item.get("startRole"),
"role": item.get("positionName"),
"roleShort": (
[
r[1]
for r in role_list
if r[0].lower() == (item.get("positionName") or "").lower()
][0]
if any(
r[0].lower() == (item.get("positionName") or "").lower()
for r in role_list
)
else ""
),
"NameGFX": (
f"{(item.get('firstName') or '').strip()} {(item.get('lastName') or '').strip()}".strip()
if item.get("firstName") is not None
and item.get("lastName") is not None
else "Команда"
),
"captain": item.get("isCapitan", False),
"age": item.get("age") or 0,
"height": f"{item.get('height')} cm" if item.get("height") else 0,
"weight": f"{item.get('weight')} kg" if item.get("weight") else 0,
"isStart": stats["isStart"],
"isOn": "🏀" if stats["isOnCourt"] is True else "",
"flag": (
"https://flagicons.lipis.dev/flags/4x3/"
+ (
"ru"
if item.get("countryId") is None
and item.get("countryName") == "Russia"
else (
""
if item.get("countryId") is None
else (
(item.get("countryId") or "").lower()
if item.get("countryName") is not None
else ""
)
)
)
+ ".svg"
),
"pts": stats["points"],
"pt-2": f"{stats['goal2']}/{stats['shot2']}" if stats else 0,
"pt-3": f"{stats['goal3']}/{stats['shot3']}" if stats else 0,
"pt-1": f"{stats['goal1']}/{stats['shot1']}" if stats else 0,
"fg": (
f"{stats['goal2']+stats['goal3']}/" f"{stats['shot2']+stats['shot3']}"
if stats
else 0
),
"ast": stats["assist"],
"stl": stats["steal"],
"blk": stats["block"],
"blkVic": stats["blocked"],
"dreb": stats["defReb"],
"oreb": stats["offReb"],
"reb": stats["defReb"] + stats["offReb"],
"to": stats["turnover"],
"foul": stats["foul"],
"foulT": stats["foulT"],
"foulD": stats["foulD"],
"foulC": stats["foulC"],
"foulB": stats["foulB"],
"fouled": stats["foulsOn"],
"plusMinus": stats["plusMinus"],
"dunk": stats["dunk"],
"kpi": (
stats["points"]
+ stats["defReb"]
+ stats["offReb"]
+ stats["assist"]
+ stats["steal"]
+ stats["block"]
+ stats["foulsOn"]
+ (stats["goal1"] - stats["shot1"])
+ (stats["goal2"] - stats["shot2"])
+ (stats["goal3"] - stats["shot3"])
- stats["turnover"]
- stats["foul"]
),
"time": format_time(stats["second"]),
"pts1q": 0,
"pts2q": 0,
"pts3q": 0,
"pts4q": 0,
"pts1h": 0,
"pts2h": 0,
"Name1GFX": (item.get("firstName") or "").strip(),
"Name2GFX": (item.get("lastName") or "").strip(),
"photoGFX": (
os.path.join(
"D:\\Photos",
latest_data["game"]["data"]["result"]["league"]["abcName"],
latest_data["game"]["data"]["result"][who]["name"],
f'{item.get("displayNumber")}.png',
)
if item.get("startRole") == "Player"
else ""
),
"isOnCourt": stats["isOnCourt"],
}
team_rows.append(row)
# добиваем до 12 строк, чтобы UI был ровный
count_player = sum(1 for x in team_rows if x["startRole"] == "Player")
if count_player < 12 and team_rows:
filler_count = (4 if count_player <= 4 else 12) - count_player
template_keys = list(team_rows[0].keys())
for _ in range(filler_count):
empty_row = {}
for key in template_keys:
if key in ["captain", "isStart", "isOnCourt"]:
empty_row[key] = False
elif key in [
"id",
"pts",
"weight",
"height",
"age",
"ast",
"stl",
"blk",
"blkVic",
"dreb",
"oreb",
"reb",
"to",
"foul",
"foulT",
"foulD",
"foulC",
"foulB",
"fouled",
"plusMinus",
"dunk",
"kpi",
]:
empty_row[key] = 0
else:
empty_row[key] = ""
team_rows.append(empty_row)
# сортируем игроков по типу роли: сначала "Player", потом "", потом "Coach" и т.д.
role_priority = {
"Player": 0,
"": 1,
"Coach": 2,
"Team": 3,
None: 4,
"Other": 5,
}
sorted_team = sorted(
team_rows,
key=lambda x: role_priority.get(x.get("startRole", 99), 99),
)
return sorted_team
async def started_team(data):
started_team = sorted(
(
p
for p in data
if p.get("startRole") == "Player" and p.get("isOnCourt") is True
),
key=lambda x: int(x.get("num") or 0),
)
return started_team
def add_new_team_stat(
data: dict,
avg_age: float,
points,
avg_height: float,
timeout_str: str,
timeout_left: int,
) -> dict:
"""
Берёт словарь total по команде (очки, подборы, броски и т.д.),
добавляет:
- проценты попаданий
- средний возраст / рост
- очки старт / бенч
- информацию по таймаутам
и всё приводит к строкам (для UI, чтобы не ловить типы).
Возвращает обновлённый словарь.
"""
def safe_int(v):
try:
return int(v)
except (ValueError, TypeError):
return 0
def format_percent(goal, shot):
goal, shot = safe_int(goal), safe_int(shot)
return f"{round(goal * 100 / shot)}%" if shot else "0%"
goal1, shot1 = safe_int(data.get("goal1")), safe_int(data.get("shot1"))
goal2, shot2 = safe_int(data.get("goal2")), safe_int(data.get("shot2"))
goal3, shot3 = safe_int(data.get("goal3")), safe_int(data.get("shot3"))
def_reb = safe_int(data.get("defReb"))
off_reb = safe_int(data.get("offReb"))
data.update(
{
"pt-1": f"{goal1}/{shot1}",
"pt-2": f"{goal2}/{shot2}",
"pt-3": f"{goal3}/{shot3}",
"fg": f"{goal2 + goal3}/{shot2 + shot3}",
"pt-1_pro": format_percent(goal1, shot1),
"pt-2_pro": format_percent(goal2, shot2),
"pt-3_pro": format_percent(goal3, shot3),
"fg_pro": format_percent(goal2 + goal3, shot2 + shot3),
"Reb": str(def_reb + off_reb),
"avgAge": str(avg_age),
"ptsStart": str(points[0]),
"ptsStart_pro": str(points[1]),
"ptsBench": str(points[2]),
"ptsBench_pro": str(points[3]),
"avgHeight": f"{avg_height} cm",
"timeout_left": str(timeout_left),
"timeout_str": str(timeout_str),
}
)
for k in data:
data[k] = str(data[k])
return data
def time_outs_func(data_pbp):
"""
Считает таймауты для обеих команд и формирует читабельные строки вида:
"2 Time-outs left in 2nd half"
Возвращает:
(строка_для_команды1, остаток1, строка_для_команды2, остаток2)
"""
timeout1 = []
timeout2 = []
for event in data_pbp:
if event.get("play") == 23: # 23 == таймаут
if event.get("startNum") == 1:
timeout1.append(event)
elif event.get("startNum") == 2:
timeout2.append(event)
def timeout_status(timeout_list, last_event: dict):
period = last_event.get("period", 0)
sec = last_event.get("sec", 0)
if period < 3:
timeout_max = 2
count = sum(1 for t in timeout_list if t.get("period", 0) <= period)
quarter = "1st half"
elif period < 5:
count = sum(1 for t in timeout_list if 3 <= t.get("period", 0) <= period)
quarter = "2nd half"
if period == 4 and sec >= 4800 and count in (0, 1):
timeout_max = 2
else:
timeout_max = 3
else:
timeout_max = 1
count = sum(1 for t in timeout_list if t.get("period", 0) == period)
quarter = f"OverTime {period - 4}"
left = max(0, timeout_max - count)
word = "Time-outs" if left != 1 else "Time-out"
text = f"{left if left != 0 else 'No'} {word} left in {quarter}"
return text, left
if not data_pbp:
return "", 0, "", 0
last_event = data_pbp[-1]
t1_str, t1_left = timeout_status(timeout1, last_event)
t2_str, t2_left = timeout_status(timeout2, last_event)
return t1_str, t1_left, t2_str, t2_left
def add_data_for_teams(new_data):
"""
Считает командные агрегаты:
- средний возраст
- очки со старта vs со скамейки, + их проценты
- средний рост
Возвращает кортеж:
(avg_age, [start_pts, start%, bench_pts, bench%], avg_height_cm)
"""
players = [item for item in new_data if item["startRole"] == "Player"]
points_start = 0
points_bench = 0
total_age = 0
total_height = 0
player_count = len(players)
for player in players:
# print(player)
stats = player["stats"]
if stats:
# print(stats)
if stats["isStart"] is True:
points_start += stats["points"]
elif stats["isStart"] is False:
points_bench += stats["points"]
total_age += player["age"]
total_height += player["height"]
total_points = points_start + points_bench
points_start_pro = (
f"{round(points_start * 100 / total_points)}%" if total_points else "0%"
)
points_bench_pro = (
f"{round(points_bench * 100 / total_points)}%" if total_points else "0%"
)
avg_age = round(total_age / player_count, 1) if player_count else 0
avg_height = round(total_height / player_count, 1) if player_count else 0
points = [points_start, points_start_pro, points_bench, points_bench_pro]
return avg_age, points, avg_height
stat_name_list = [
("points", "Очки", "points"),
("pt-1", "Штрафные", "free throws"),
("pt-1_pro", "штрафные, процент", "free throws pro"),
("pt-2", "2-очковые", "2-points"),
("pt-2_pro", "2-очковые, процент", "2-points pro"),
("pt-3", "3-очковые", "3-points"),
("pt-3_pro", "3-очковые, процент", "3-points pro"),
("fg", "очки с игры", "field goals"),
("fg_pro", "Очки с игры, процент", "field goals pro"),
("assist", "Передачи", "assists"),
("pass", "", ""),
("defReb", "подборы в защите", ""),
("offReb", "подборы в нападении", ""),
("Reb", "Подборы", "rebounds"),
("steal", "Перехваты", "steals"),
("block", "Блокшоты", "blocks"),
("blocked", "", ""),
("turnover", "Потери", "turnovers"),
("foul", "Фолы", "fouls"),
("foulsOn", "", ""),
("foulT", "", ""),
("foulD", "", ""),
("foulC", "", ""),
("foulB", "", ""),
("second", "секунды", "seconds"),
("dunk", "данки", "dunks"),
("fastBreak", "", "fast breaks"),
("plusMinus", "+/-", "+/-"),
("avgAge", "", "avg Age"),
("ptsBench", "", "Bench PTS"),
("ptsBench_pro", "", "Bench PTS, %"),
("ptsStart", "", "Start PTS"),
("ptsStart_pro", "", "Start PTS, %"),
("avgHeight", "", "avg height"),
("timeout_left", "", "timeout left"),
("timeout_str", "", "timeout str"),
]
@app.get("/team_stats.json")
async def team_stats():
teams = latest_data["game"]["data"]["result"]["teams"]
plays = latest_data["game"]["data"]["result"]["plays"]
team_1 = next((t for t in teams if t["teamNumber"] == 1), None)
team_2 = next((t for t in teams if t["teamNumber"] == 2), None)
timeout_str1, timeout_left1, timeout_str2, timeout_left2 = time_outs_func(plays)
avg_age_1, points_1, avg_height_1 = add_data_for_teams(team_1["starts"])
avg_age_2, points_2, avg_height_2 = add_data_for_teams(team_2["starts"])
total_1 = add_new_team_stat(
team_1["total"],
avg_age_1,
points_1,
avg_height_1,
timeout_str1,
timeout_left1,
)
total_2 = add_new_team_stat(
team_2["total"],
avg_age_2,
points_2,
avg_height_2,
timeout_str2,
timeout_left2,
)
result_json = []
for key in total_1:
val1 = total_1[key]
val2 = total_2[key]
stat_rus = ""
stat_eng = ""
for metric_name, rus, eng in stat_name_list:
if metric_name == key:
stat_rus, stat_eng = rus, eng
break
result_json.append(
{
"name": key,
"nameGFX_rus": stat_rus,
"nameGFX_eng": stat_eng,
"val1": val1,
"val2": val2,
}
)
return result_json
if __name__ == "__main__":
uvicorn.run("get_data:app", host="0.0.0.0", port=8000, reload=True)