1. добавил карту бросков

2. /last_5_games: последние пять игр + 1 следующая игра. в последних пяти играх не учитывается игра, которая загруженна
This commit is contained in:
2025-11-21 16:28:29 +03:00
parent d080faac2f
commit 8c8ea3f9a9

View File

@@ -18,8 +18,16 @@ import xml.etree.ElementTree as ET
import re
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
import warnings
warnings.filterwarnings(
"ignore",
message="Data Validation extension is not supported and will be removed",
category=UserWarning,
module="openpyxl",
)
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument("--league", default="vtb")
@@ -31,18 +39,20 @@ MYHOST = platform.node()
if not os.path.exists("logs"):
os.makedirs("logs")
def get_fqdn():
system_name = platform.system()
if system_name == "Linux":
hostname = platform.node().lower()
fqdn = f"https://{hostname}.tvstart.ru"
else:
fqdn = "http://127.0.0.1:8000"
return fqdn
FQDN = get_fqdn()
telegram_bot_token = os.getenv("TELEGRAM_TOKEN")
@@ -92,7 +102,7 @@ logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
logger.handlers[2].formatter.use_emoji = True
pprint(f"Локальный файл окружения ={load_dotenv(verbose=True)}")
pprint(f"Локальный файл окружения = {load_dotenv(verbose=True)}")
LEAGUE = args.league
TEAM = args.team
@@ -139,21 +149,7 @@ if isinstance(_syno_goal_raw, BytesIO):
_syno_goal_raw = _syno_goal_raw.getvalue()
SYNO_GOAL = _syno_goal_raw # bytes или None
SHOTMAP_SUBDIR = "shotmaps"
SHOTMAP_DIR = os.path.join(os.getcwd(), "shotmaps")
os.makedirs(SHOTMAP_DIR, exist_ok=True)
try:
for name in os.listdir(SHOTMAP_DIR):
fp = os.path.join(SHOTMAP_DIR, name)
if os.path.isfile(fp):
os.remove(fp)
logger.info(f"[shotmap] очищена папка {SHOTMAP_DIR} при старте")
except Exception as e:
logger.warning(f"[shotmap] не удалось очистить {SHOTMAP_DIR}: {e}")
CALENDAR = None
STATUS = False
GAME_ID = None
SEASON = None
@@ -168,9 +164,6 @@ PRELOAD_LOCK = False # когда True — consumer будет принимат
PRELOADED_GAME_ID = None # ID матча, который мы держим «тёплым»
PRELOAD_HOLD_UNTIL = None # timestamp, до какого момента держим (T-1:15)
# 🔥 кэш картинок в оперативной памяти
SHOTMAP_CACHE: dict[str, bytes] = {}
# общая очередь
results_q = queue.Queue()
# тут будем хранить последние данные
@@ -191,6 +184,13 @@ CURRENT_THREADS_MODE = None
CLEAR_OUTPUT_FOR_VMIX = False
EMPTY_PHOTO_PATH = r"D:\Графика\БАСКЕТБОЛ\VTB League\ASSETS\EMPTY.png"
# 🔥 кэш картинок в оперативной памяти
SHOTMAP_CACHE: dict[str, bytes] = {}
# новое хранилище shotmaps по startNum
SHOTMAPS: dict[int, dict] = {}
SHOTMAPS_LOCK = threading.Lock()
URLS = {
"seasons": "{host}api/abc/comps/seasons?Tag={league}",
@@ -1368,6 +1368,13 @@ async def lifespan(app: FastAPI):
)
thread_status_broadcaster.start()
# новый поток для shotmaps
thread_shotmap = threading.Thread(
target=shotmap_worker,
daemon=True,
)
thread_shotmap.start()
# 5. решаем, что запускать
if not is_today:
STATUS = "no_game_today"
@@ -1414,6 +1421,7 @@ async def lifespan(app: FastAPI):
thread_result_consumer.join(timeout=1)
thread_status_broadcaster.join(timeout=1)
thread_excel.join(timeout=1)
thread_shotmap.join(timeout=1)
app = FastAPI(
@@ -1423,7 +1431,7 @@ app = FastAPI(
openapi_url=None, # ❌ отключает /openapi.json
)
# раздаём /shotmaps как статику из SHOTMAP_DIR
app.mount("/shotmaps", StaticFiles(directory=SHOTMAP_DIR), name="shotmaps")
# app.mount("/shotmaps", StaticFiles(directory=SHOTMAP_DIR), name="shotmaps")
# @app.get("/shotmaps/{filename}")
# async def get_shotmap(filename: str):
@@ -1433,6 +1441,7 @@ app.mount("/shotmaps", StaticFiles(directory=SHOTMAP_DIR), name="shotmaps")
# raise HTTPException(status_code=404, detail="Shotmap not found")
# return Response(content=data, media_type="image/png")
def format_time(seconds: float | int) -> str:
"""
Удобный формат времени для игроков:
@@ -2311,6 +2320,7 @@ async def team(who: str):
"Career_TStartCount": car_T_starts,
"startNum": stats.get("startNum"),
"photoShotMapGFX": "",
"mask": "#FFFFFF",
}
team_rows.append(row)
@@ -2350,6 +2360,8 @@ async def team(who: str):
"kpi",
]:
empty_row[key] = 0
elif key == "mask":
empty_row[key] = "#FFFFFF00"
else:
empty_row[key] = ""
team_rows.append(empty_row)
@@ -2401,26 +2413,19 @@ async def team(who: str):
is_made = play_code in (2, 3) # 2,3 — точные, 5,6 — промахи
shots_by_startnum.setdefault(sn, []).append((x, y, is_made, sec, period))
# Рендерим по картинке на каждого startNum
shotmap_paths: dict[int, str] = {}
for sn, points in shots_by_startnum.items():
# timestamp/версия — просто количество бросков этого игрока.
# Увеличилось кол-во бросков → поменялся путь → vMix не возьмёт картинку из кеша.
version = str(len(points))
bib = str(sn) # можно заменить на номер игрока, если нужно
# --- shotmaps: используем уже готовые пути из глобального SHOTMAPS ---
with SHOTMAPS_LOCK:
shotmaps_snapshot = dict(SHOTMAPS)
path = get_image(points, bib, version)
shotmap_paths[sn] = path
# Присоединяем пути к игрокам по startNum
for row in sorted_team:
sn = row.get("startNum")
if sn in shotmap_paths:
row["photoShotMapGFX"] = f"{shotmap_paths[sn]}"
info = shotmaps_snapshot.get(sn)
if info and info.get("count", 0) > 0:
# сюда кладём путь, который воркер получил из get_image:
# например: "https://host/image/23_shots_7"
row["photoShotMapGFX"] = FQDN + info.get("url", "")
else:
# если бросков не было — оставляем пустую строку
row["photoShotMapGFX"] = ""
row["photoShotMapGFX"] = EMPTY_PHOTO_PATH
return sorted_team
@@ -3377,11 +3382,11 @@ def get_image(points, bib, count_point):
x, y — координаты из API, где (0,0) = центр кольца
is_made — True (play 2,3) или False (play 5,6)
bib: startNum/номер игрока
count_point: строка-версия (анти-кэш vMix)
count_point: строка-версия (количество бросков)
"""
if not points:
return ""
return b""
base_image = Image.new("RGBA", (1500, 2800), (0, 0, 0, 0))
width, height = base_image.size
@@ -3404,19 +3409,19 @@ def get_image(points, bib, count_point):
point_radius = 30
# --- шрифт ---
font = ImageFont.load_default()
try:
nasio_font = SYNO_FONT_PATH
if nasio_font:
if isinstance(nasio_font, BytesIO):
nasio_font = nasio_font.getvalue()
if isinstance(nasio_font, (bytes, bytearray)):
font = ImageFont.truetype(BytesIO(nasio_font), 18)
else:
font = ImageFont.truetype(nasio_font, 18)
except Exception as e:
logger.warning(f"[shotmap] не удалось загрузить шрифт: {e}")
# # --- шрифт ---
# font = ImageFont.load_default()
# try:
# nasio_font = SYNO_FONT_PATH
# if nasio_font:
# if isinstance(nasio_font, BytesIO):
# nasio_font = nasio_font.getvalue()
# if isinstance(nasio_font, (bytes, bytearray)):
# font = ImageFont.truetype(BytesIO(nasio_font), 18)
# else:
# font = ImageFont.truetype(nasio_font, 18)
# except Exception as e:
# logger.warning(f"[shotmap] не удалось загрузить шрифт: {e}")
for x_raw, y_raw, is_made, sec, period in points:
try:
@@ -3463,33 +3468,27 @@ def get_image(points, bib, count_point):
color = (0, 255, 0, 255) if is_made else (255, 0, 0, 255)
draw.ellipse(bbox, fill=color)
# --- подпись Q{period} по центру ---
label = f"Q{period}"
# узнаём размер текста (Pillow 10+ — textbbox, старые — textsize)
try:
bbox = draw.textbbox((0, 0), label, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
except AttributeError:
# fallback для старых версий Pillow
text_w, text_h = draw.textsize(label, font=font)
# # --- подпись Q{period} по центру ---
# label = f"Q{period}"
# try:
# bbox = draw.textbbox((0, 0), label, font=font)
# text_w = bbox[2] - bbox[0]
# text_h = bbox[3] - bbox[1]
# except AttributeError:
# text_w, text_h = draw.textsize(label, font=font)
# центр иконки = центр текста
center_x, center_y = px, py
text_x = center_x - text_w // 2
text_y = center_y - text_h // 2
# center_x, center_y = px, py
# text_x = center_x - text_w // 2
# text_y = center_y - text_h // 2
# тень
draw.text((text_x + 1, text_y + 1), label, font=font, fill=(0, 0, 0, 255))
# текст
draw.text((text_x, text_y), label, font=font, fill=(255, 255, 255, 255))
# draw.text((text_x + 1, text_y + 1), label, font=font, fill=(255, 255, 255, 255))
# draw.text((text_x, text_y), label, font=font, fill=(0, 0, 0, 255))
# --- сохраняем картинку в оперативную память ---
filename = f"{bib}_shots_{count_point}"
buf = BytesIO()
try:
# compress_level=1 — быстрее, чем дефолт
base_image = base_image.transpose(Image.ROTATE_90)
base_image.save(buf, format="PNG", compress_level=1)
except Exception as e:
logger.warning(f"[shotmap] не удалось сохранить shotmap в память: {e}")
@@ -3498,15 +3497,96 @@ def get_image(points, bib, count_point):
data = buf.getvalue()
SHOTMAP_CACHE[filename] = data # кладём в RAM
# формируем URL для vMix
public_url = f"{FQDN}/image/{filename}"
# logger.info(
# f"[shotmap] generated in-memory shotmap for bib={bib}, ver={count_point} "
# f"-> {filename}, url={public_url}"
# )
# относительный путь для HTTP-эндпоинта
public_url = f"/image/{filename}"
return public_url
def shotmap_worker():
"""
Фоновый поток: следит за latest_data['game'] и пересчитывает карты бросков
по каждому startNum. Картинки лежат только в RAM (SHOTMAP_CACHE/SHOTMAPS).
"""
last_counts: dict[int, int] = {}
while not stop_event.is_set():
try:
game = get_latest_game_safe("game")
if not game:
time.sleep(1)
continue
# в get_latest_game_safe уже нормализована структура,
# но на всякий случай ещё раз берём data/result
game_data = game.get("data") if isinstance(game, dict) else None
if not isinstance(game_data, dict):
time.sleep(1)
continue
result = game_data.get("result") or {}
plays = result.get("plays") or []
# собираем все броски по startNum
shots_by_startnum: dict[int, list[tuple]] = {}
for ev in plays:
play_code = ev.get("play")
# 2,3 — попали; 5,6 — промахи
if play_code not in (2, 3, 5, 6):
continue
sn = ev.get("startNum")
if sn is None:
continue
x = ev.get("x")
y = ev.get("y")
if x is None or y is None:
continue
sec = ev.get("sec")
period = ev.get("period")
is_made = play_code in (2, 3)
shots_by_startnum.setdefault(sn, []).append(
(x, y, is_made, sec, period)
)
# обновляем только тех, у кого поменялось количество бросков
for sn, points in shots_by_startnum.items():
count = len(points)
if count == 0:
continue
if last_counts.get(sn) == count:
# количество бросков и так то же — картинка уже есть
continue
version = str(count)
bib = str(sn)
# get_image:
# - рисует shotmap
# - кладёт PNG bytes в SHOTMAP_CACHE[filename]
# - возвращает полный URL вида f"{FQDN}/image/{filename}"
url = get_image(points, bib, version)
if not url:
continue
with SHOTMAPS_LOCK:
SHOTMAPS[sn] = {
"count": count,
"url": url, # готовый путь, который можно отдавать в vMix
}
last_counts[sn] = count
except Exception as e:
logger.warning(f"[shotmap_worker] error: {e}")
time.sleep(1)
@app.get("/image/{player_id_shots}")
async def get_shotmap_image(player_id_shots: str):
"""
@@ -3520,6 +3600,179 @@ async def get_shotmap_image(player_id_shots: str):
return Response(content=data, media_type="image/png")
@app.get("/last_5_games")
async def last_5_games():
# достаём актуальный game
game = get_latest_game_safe("game")
if not game:
raise HTTPException(status_code=503, detail="game data not ready")
game_data = game["data"] if "data" in game else game
result = game_data.get("result", {}) or {}
team1_info = result.get("team1") or {}
team2_info = result.get("team2") or {}
team1_id = team1_info.get("teamId")
team2_id = team2_info.get("teamId")
team1_name = team1_info.get("name", "")
team2_name = team2_info.get("name", "")
if not team1_id or not team2_id:
raise HTTPException(status_code=503, detail="team ids not ready")
if not CALENDAR or "items" not in CALENDAR:
raise HTTPException(status_code=503, detail="calendar data not ready")
final_states = {"result", "resultconfirmed", "finished"}
# последние N завершённых игр по команде
def collect_last_games_for_team(team_id: int, limit: int = 5):
matches = []
for item in CALENDAR["items"]:
game_info = item.get("game") or {}
if not game_info:
continue
status_raw = str(game_info.get("gameStatus", "") or "").lower()
if status_raw not in final_states:
# пропускаем незавершённые матчи
continue
t1 = item.get("team1") or {}
t2 = item.get("team2") or {}
if team_id not in (t1.get("teamId"), t2.get("teamId")):
continue
# пропускаем текущий матч
gid = game_info.get("id")
if gid is not None and GAME_ID is not None and str(gid) == str(GAME_ID):
continue
dt_str = game_info.get("defaultZoneDateTime") or ""
try:
dt = datetime.fromisoformat(dt_str)
except ValueError:
continue
matches.append({"dt": dt, "item": item})
matches.sort(key=lambda x: x["dt"], reverse=True)
return [m["item"] for m in matches[:limit]]
# считаем W/L для списка матчей команды
def calc_results_list(games: list[dict], team_id: int):
wl_list = []
for item in games:
game_info = item.get("game") or {}
t1 = item.get("team1") or {}
t2 = item.get("team2") or {}
score1 = game_info.get("score1")
score2 = game_info.get("score2")
id1 = t1.get("teamId")
id2 = t2.get("teamId")
is_win = None
if isinstance(score1, (int, float)) and isinstance(score2, (int, float)):
if team_id == id1:
is_win = score1 > score2
elif team_id == id2:
is_win = score2 > score1
wl_list.append("W" if is_win else "L" if is_win is not None else "")
return wl_list[::-1]
# ищем СЛЕДУЮЩИЙ матч (ближайший в будущем) для команды
def find_next_game(team_id: int):
if not CALENDAR or "items" not in CALENDAR:
return {"opponent": "", "date": "", "place": "", "place_ru": ""}
now = datetime.now() # наивное "сейчас"
best = None # {"dt": ..., "opp": ..., "place": "home"/"away"}
for item in CALENDAR["items"]:
game_info = item.get("game") or {}
t1 = item.get("team1") or {}
t2 = item.get("team2") or {}
if team_id not in (t1.get("teamId"), t2.get("teamId")):
continue
dt_str = game_info.get("defaultZoneDateTime") or ""
try:
dt = datetime.fromisoformat(dt_str)
except ValueError:
continue
# убираем tzinfo, чтобы можно было сравнивать с now
if dt.tzinfo is not None:
dt = dt.replace(tzinfo=None)
# только будущие матчи
if dt <= now:
continue
# определяем соперника и место
if team_id == t1.get("teamId"):
opp_name = t2.get("name", "")
place = "home" # команда дома
else:
opp_name = t1.get("name", "")
place = "away" # команда в гостях
if best is None or dt < best["dt"]:
best = {"dt": dt, "opp": opp_name, "place": place}
if not best:
return {"opponent": "", "date": "", "place": "", "place_ru": ""}
place_ru = "дома" if best["place"] == "home" else "в гостях"
return {
"opponent": best["opp"],
"date": best["dt"].strftime("%Y-%m-%d %H:%M"),
"place": best["place"], # "home" / "away"
"place_ru": place_ru, # "дома" / "в гостях"
}
# последние 5 игр и результаты
team1_games = collect_last_games_for_team(team1_id)
team2_games = collect_last_games_for_team(team2_id)
team1_wl = calc_results_list(team1_games, team1_id)
team2_wl = calc_results_list(team2_games, team2_id)
# следующий матч
next1 = find_next_game(team1_id)
next2 = find_next_game(team2_id)
data = [
{
"teamName": team1_name,
"teamId": team1_id,
"team_results": team1_wl,
"nextOpponent": next1["opponent"],
"nextGameDate": next1["date"],
"nextGamePlace": next1["place_ru"], # "дома" / "в гостях"
"nextGameHomeAway": next1["place"], # "home" / "away" (если нужно в логике)
},
{
"teamName": team2_name,
"teamId": team2_id,
"team_results": team2_wl,
"nextOpponent": next2["opponent"],
"nextGameDate": next2["date"],
"nextGamePlace": next2["place_ru"],
"nextGameHomeAway": next2["place"],
},
]
return data
if __name__ == "__main__":
uvicorn.run(
"get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="debug"