Files
KHL/get_data.py

979 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<<<<<<< HEAD
from fastapi import FastAPI, HTTPException, Query
=======
from fastapi import FastAPI, HTTPException, Query, Request
>>>>>>> c60caaa8aaad763cf7605dcd2c4502b8dfc3be84
from fastapi.responses import Response, JSONResponse, HTMLResponse, StreamingResponse
from fastapi.encoders import jsonable_encoder
import pandas as pd
import requests, io, os
from requests.auth import HTTPBasicAuth
from dotenv import load_dotenv
from datetime import datetime
import uvicorn
from threading import Thread, Event, Lock
import time
from contextlib import asynccontextmanager
import numpy as np
import nasio
import logging
import logging.config
import platform
<<<<<<< HEAD
=======
import json
>>>>>>> c60caaa8aaad763cf7605dcd2c4502b8dfc3be84
from pprint import pprint
# --- Глобальные переменные ---
selected_game_id: int | None = None
current_tournament_id: int | None = None # будем обновлять при загрузке расписания
current_season: str | None = None # будем обновлять при загрузке расписания
latest_game_data: dict | None = None # сюда кладём последние данные по матчу
latest_game_error: str | None = None
_latest_lock = Lock()
_stop_event = Event()
_worker_thread: Thread | None = None
<<<<<<< HEAD
# Загружаем переменные из .env
if load_dotenv(dotenv_path="/mnt/khl/.env", verbose=True):
print("Добавить в лог что был найден файл окружения!!")
pass
else:
load_dotenv()
print("Добавить в лог что не был найден файл окружения!!")
=======
pprint(f"Локальный файл окружения ={load_dotenv(verbose=True)}")
>>>>>>> c60caaa8aaad763cf7605dcd2c4502b8dfc3be84
api_user = os.getenv("API_USER")
api_pass = os.getenv("API_PASS")
league = os.getenv("LEAGUE")
api_base_url = os.getenv("API_BASE_URL")
POLL_SEC = int(os.getenv("GAME_POLL_SECONDS"))
SERVER_NAME = os.getenv("SYNO_URL")
USER = os.getenv("SYNO_USERNAME")
PASSWORD = os.getenv("SYNO_PASSWORD")
PATH = f'{os.getenv("SYNO_PATH")}MATCH.xlsm'
def load_today_schedule():
"""Возвращает DataFrame матчей на сегодня с нужными колонками (или пустой DF)."""
url_tournaments = f"{api_base_url}tournaments.xml"
r = requests.get(
url_tournaments, auth=HTTPBasicAuth(api_user, api_pass), verify=False
)
df = pd.read_xml(io.StringIO(r.text))
df["startDate"] = pd.to_datetime(df["startDate"], errors="coerce")
df["endDate"] = pd.to_datetime(df["endDate"], errors="coerce")
now = datetime.now()
filtered = df[
(df["level"] == league)
& (df["startDate"] <= now)
& (df["endDate"] >= now)
& (df["seasonPart"] == "regular")
]
if filtered.empty:
return pd.DataFrame()
tournament_id = int(filtered.iloc[0]["id"])
season = str(filtered.iloc[0]["season"])
global current_tournament_id, current_season
current_tournament_id = tournament_id
current_season = season
url_schedule = f"{api_base_url}{tournament_id}/schedule-{tournament_id}.xml"
r = requests.get(url_schedule, auth=HTTPBasicAuth(api_user, api_pass), verify=False)
schedule_df = pd.read_xml(io.StringIO(r.text))
# Нужные колонки (скорректируй под реальные имена из XML)
needed_columns = [
"id",
"date",
"time",
"homeName_en",
"visitorName_en",
"arena_en",
"arena_city_en",
"homeCity_en",
"visitorCity_en",
]
exist = [c for c in needed_columns if c in schedule_df.columns]
schedule_df = schedule_df[exist].copy()
# Преобразуем дату и время
schedule_df["date"] = pd.to_datetime(schedule_df["date"], errors="coerce")
today = now.date()
schedule_today = schedule_df[schedule_df["date"].dt.date == today].copy()
# --- Нормализуем время и строим единый datetime для сортировки ---
# 1) берём из time только HH:MM (на случай '19:30', '19:30:00', '19:30 MSK', и т.п.)
time_clean = (
schedule_today.get("time")
.astype(str)
.str.extract(r"(?P<hhmm>\d{1,2}:\d{2})", expand=True)["hhmm"]
)
# 2) собираем строку "YYYY-MM-DD HH:MM" и парсим в Timestamp
date_str = schedule_today["date"].dt.strftime("%Y-%m-%d")
kickoff_str = (date_str + " " + time_clean.fillna("")).str.strip()
schedule_today["kickoff_dt"] = pd.to_datetime(kickoff_str, errors="coerce")
# 3) сортировка: сначала по времени (NaT в конец), потом по id
schedule_today = schedule_today.sort_values(
by=["kickoff_dt", "id"], ascending=[True, True], na_position="last"
)
# 4) человекочитаемая строка даты/времени для таблицы
schedule_today["datetime_str"] = schedule_today["kickoff_dt"].dt.strftime(
"%d.%m.%Y %H:%M"
)
# если time отсутствует и kickoff_dt = NaT — показываем просто дату
mask_nat = schedule_today["kickoff_dt"].isna()
schedule_today.loc[mask_nat, "datetime_str"] = schedule_today.loc[
mask_nat, "date"
].dt.strftime("%d.%m.%Y")
return schedule_today
def _build_game_url(tournament_id: int, game_id: int) -> str:
# URL по аналогии с расписанием: .../{tournament_id}/json_en/{game_id}.json
# Если у тебя другой шаблон — просто поменяй строку ниже.
return f"{api_base_url}{tournament_id}/json_en/{game_id}.json"
def _fetch_game_once(tournament_id: int, game_id: int) -> dict:
"""Один запрос к API матча -> чистый JSON из API."""
url = _build_game_url(tournament_id, game_id)
r = requests.get(
url, auth=HTTPBasicAuth(api_user, api_pass), verify=False, timeout=10
)
r.raise_for_status()
# Пробуем распарсить JSON
try:
data = r.json()
except ValueError:
# если это не JSON — вернём текст
data = {"raw": r.text}
return data
def _game_poll_worker():
"""Фоновый цикл: опрашивает API для выбранного game_id."""
global latest_game_data, latest_game_error
while not _stop_event.is_set():
gid = selected_game_id
tid = current_tournament_id
if gid and tid:
try:
data = _fetch_game_once(tid, gid)
with _latest_lock:
latest_game_data = {
"tournament_id": tid,
"game_id": gid,
"fetched_at": datetime.now().isoformat(),
"data": data,
}
latest_game_error = None
except Exception as e:
with _latest_lock:
latest_game_error = f"{type(e).__name__}: {e}"
# Ждём интервал (с возможностью ранней остановки)
_stop_event.wait(POLL_SEC)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Запускаем и останавливаем поток чтения данных при старте/остановке приложения."""
global _worker_thread
_stop_event.clear()
_worker_thread = Thread(target=_game_poll_worker, daemon=True)
_worker_thread.start()
print("✅ Background thread started")
# Отдаём управление FastAPI
yield
# Останавливаем при завершении
_stop_event.set()
if _worker_thread.is_alive():
_worker_thread.join(timeout=2)
print("🛑 Background thread stopped")
app = FastAPI(
lifespan=lifespan,
docs_url=None, # ❌ отключает /docs
redoc_url=None, # ❌ отключает /redoc
openapi_url=None, # ❌ отключает /openapi.json
)
@app.get("/games")
async def games():
df = load_today_schedule()
if df.empty:
return JSONResponse({"message": "Сегодня матчей нет"})
json_schedule = df.to_json(orient="records", force_ascii=False, date_format="iso")
return Response(content=json_schedule, media_type="application/json")
@app.get("/select")
async def select():
df = load_today_schedule()
if df.empty:
return HTMLResponse(
"<h2 style='font-family:Inter,system-ui'>Сегодня матчей нет</h2>"
)
# Строим строки таблицы
rows_html = []
for _, row in df.iterrows():
gid = int(row["id"])
home = row.get("homeName_en", "")
away = row.get("visitorName_en", "")
when = row.get("datetime_str", "")
arena = row.get("arena_en", "")
city = row.get("arena_city_en", "")
rows_html.append(
f"""
<tr data-id="{gid}">
<td>{gid}</td>
<td>{when}</td>
<td>{home}</td>
<td>{away}</td>
<td>{arena}</td>
<td>{city}</td>
<td><button class="pick">Выбрать</button></td>
</tr>
"""
)
# ✅ Весь HTML, включая JS, внутри тройных кавычек
html = f"""
<!doctype html>
<html lang="ru">
<meta charset="utf-8">
<title>Выбор матча</title>
<style>
body {{ font-family: Inter, system-ui, -apple-system, Segoe UI, Roboto, Arial; background:#0b0f14; color:#e6edf3; margin:0; padding:24px; }}
h1 {{ margin:0 0 16px; font-size:20px; }}
table {{ border-collapse: collapse; width:100%; background:#0f1620; border:1px solid #1f2a36; border-radius:12px; overflow:hidden; }}
thead th {{ text-align:left; padding:12px 14px; font-weight:600; background:#121b26; border-bottom:1px solid #1f2a36; }}
tbody td {{ padding:12px 14px; border-top:1px solid #16212c; }}
tr:hover {{ background:#13202d; }}
tr.selected {{ outline:2px solid #6aa6ff; outline-offset:-2px; background:#132c49; }}
button.pick {{
border:1px solid #2a3a4d; background:#1a2635; padding:8px 12px; border-radius:10px;
color:#e6edf3; cursor:pointer; font-weight:600;
}}
button.pick:hover {{ background:#223243; }}
.status {{ margin-top:14px; opacity:0.9; }}
</style>
<h1>Матчи сегодня</h1>
<table id="tbl">
<thead>
<tr>
<th>ID</th><th>Дата/время</th><th>Хозяева</th><th>Гости</th><th>Арена</th><th>Город</th><th></th>
</tr>
</thead>
<tbody>
{''.join(rows_html)}
</tbody>
</table>
<div class="status" id="status">Ничего не выбрано</div>
<script>
const tbl = document.getElementById('tbl');
const status = document.getElementById('status');
let selectedRow = null;
tbl.addEventListener('click', async (e) => {{
const btn = e.target.closest('.pick');
if (!btn) return;
const row = btn.closest('tr');
const gameId = row.getAttribute('data-id');
// визуально выделяем выбранную строку
if (selectedRow) selectedRow.classList.remove('selected');
row.classList.add('selected');
selectedRow = row;
status.textContent = 'Сохраняю выбор...';
try {{
const res = await fetch(`/select-game/${{gameId}}`, {{
method: 'POST'
}});
if (!res.ok) throw new Error('HTTP ' + res.status);
const data = await res.json();
status.textContent = 'Выбран матч ID: ' + data.selected_id;
}} catch (err) {{
status.textContent = 'Ошибка: ' + err.message;
}}
}});
</script>
</html>
"""
return HTMLResponse(html)
@app.post("/select-game/{game_id}")
async def select_game(game_id: int):
global selected_game_id, latest_game_data, latest_game_error
selected_game_id = game_id
# моментально подтянуть первое состояние, если известен турнир
if current_tournament_id:
try:
data = _fetch_game_once(current_tournament_id, selected_game_id)
with _latest_lock:
latest_game_data = {
"tournament_id": current_tournament_id,
"game_id": selected_game_id,
"fetched_at": datetime.now().isoformat(),
"data": data,
}
latest_game_error = None
except Exception as e:
with _latest_lock:
latest_game_error = f"{type(e).__name__}: {e}"
return JSONResponse({"selected_id": selected_game_id})
@app.get("/selected-game")
async def get_selected_game():
return JSONResponse({"selected_id": selected_game_id})
@app.get("/game/url")
async def game_url():
if not (selected_game_id and current_tournament_id):
return JSONResponse({"message": "game_id или tournament_id не задан"})
return JSONResponse(
{
"url": _build_game_url(current_tournament_id, selected_game_id),
"game_id": selected_game_id,
"tournament_id": current_tournament_id,
}
)
# @app.get("/info")
# async def info():
# if selected_game_id:
# df = load_today_schedule()
@app.get("/data")
async def game_data():
with _latest_lock:
if latest_game_data:
return JSONResponse(latest_game_data)
if latest_game_error:
return JSONResponse({"error": latest_game_error}, status_code=502)
return JSONResponse(
{"message": "Ещё нет данных. Выберите матч и подождите первое обновление."}
)
@app.get("/referee")
async def referee():
json_data = latest_game_data["data"]
data_referees = [
{
"number": json_data["game"]["mref1_num"],
"fullname": f'{json_data["game"]["mref1"].split()[1]} {json_data["game"]["mref1"].split()[0]}',
},
{
"number": json_data["game"]["mref2_num"],
"fullname": f'{json_data["game"]["mref2"].split()[1]} {json_data["game"]["mref2"].split()[0]}',
},
{
"number": json_data["game"]["lref1_num"],
"fullname": f'{json_data["game"]["lref1"].split()[1]} {json_data["game"]["lref1"].split()[0]}',
},
{
"number": json_data["game"]["lref2_num"],
"fullname": f'{json_data["game"]["lref2"].split()[1]} {json_data["game"]["lref2"].split()[0]}',
},
]
return data_referees
async def team(who: str):
""" "who: A - домашняя команда, B - гостевая"""
with _latest_lock:
lgd = latest_game_data
# print(lgd)
if not lgd or "data" not in lgd:
return [{"details": "Нет данных по матчу!"}]
players1_temp = lgd["data"]["players"][who]
players1 = []
players1_f = []
players1_d = []
players1_g = []
goaltenders1 = []
for player in players1_temp:
players1_temp[player]["mask"] = "#FF0000"
# for s in seasonA_json:
# if int(players1_temp[player]["id"]) == int(s["id"]):
# for key, value in s.items():
# # Создаем новый ключ с префиксом
# new_key = f"season_{key}"
# players1_temp[player][new_key] = value
if players1_temp[player]["ps"] not in ["в", "g"]:
data1 = players1_temp[player]
if "." in data1["name"]:
lastname1, *names1 = data1["name"].split()
names_new = " ".join(names1)
elif len(data1["name"].split()) == 3:
*lastname1, names1 = data1["name"].split()
names_new = names1
lastname1 = " ".join(lastname1)
else:
lastname1, *names1 = data1["name"].split()
names_new = " ".join(names1)
if players1_temp[player]["ps"] == "н":
position = "нападающий"
elif players1_temp[player]["ps"] == "з":
position = "защитник"
elif players1_temp[player]["ps"] == "f":
position = "forward"
elif players1_temp[player]["ps"] == "d":
position = "defenseman"
data_with_number = {
"number": player,
"NameSurnameGFX": names_new + " " + lastname1,
"NameGFX": names_new,
"SurnameGFX": lastname1,
"PositionGFX": position,
**data1,
}
if players1_temp[player]["ps"] == "н":
players1_f.append(data_with_number)
elif players1_temp[player]["ps"] == "з":
players1_d.append(data_with_number)
elif players1_temp[player]["ps"] == "f":
players1_f.append(data_with_number)
elif players1_temp[player]["ps"] == "d":
players1_d.append(data_with_number)
else:
data2 = players1_temp[player]
position = ""
if players1_temp[player]["ps"] == "в":
position = "вратарь"
elif players1_temp[player]["ps"] == "g":
position = "Goaltender"
lastname1, *names1 = data2["name"].split()
names_new = " ".join(names1)
data_with_number2 = {
"number": player,
"NameSurnameGFX": names_new + " " + lastname1,
"NameGFX": names_new,
"SurnameGFX": lastname1,
"PositionGFX": position,
**data2,
}
players1_g.append(data_with_number2)
goaltenders1.append(data_with_number2)
def make_empty(example_list):
if not example_list:
return {}
return {key: "" for key in example_list[0].keys()}
empty_d = make_empty(players1_d)
empty_f = make_empty(players1_f)
empty_g = make_empty(players1_g)
# добивка пустыми слотами
while len(players1_d) < 9:
players1_d.append(empty_d.copy())
while len(players1_f) < 13:
players1_f.append(empty_f.copy())
while len(players1_g) < 3:
players1_g.append(empty_g.copy())
players1 = players1_d + players1_f + players1_g
# print(len(players1))
return players1
@app.get("/team1")
async def team1():
return await team("A")
@app.get("/team2")
async def team2():
return await team("B")
# 👉 метка для первой строки (period row)
def _period_label(period: str | int) -> str:
s = str(period).strip().upper()
# овертаймы: OT, OT1, OT2... или числовые >= 4
if s == "OT" or (s.startswith("OT") and s[2:].isdigit()):
return "AFTER OVERTIME"
if s.isdigit():
n = int(s)
if n >= 4:
return "AFTER OVERTIME"
# 13 — с порядковым суффиксом
if n % 100 in (11, 12, 13):
suffix = "TH"
else:
suffix = {1: "ST", 2: "ND", 3: "RD"}.get(n % 10, "TH")
return f"AFTER {n}{suffix} PERIOD"
# например, Total
if s == "TOTAL":
return "FINAL"
return ""
# 👉 сортировка периодов: 1,2,3, затем OT/OT2/...
def _period_sort_key(k: str) -> tuple[int, int]:
s = str(k).strip().upper()
# 13 — обычные периоды
if s.isdigit():
n = int(s)
if 1 <= n <= 3:
return (n, 0)
# 4-й и далее — трактуем как овертаймы (4 -> OT1, 5 -> OT2 ...)
return (100, n - 3)
# явные овертаймы: OT, OT1, OT2...
if s == "OT":
return (100, 1)
if s.startswith("OT") and s[2:].isdigit():
return (100, int(s[2:]))
# неизвестные — в конец
return (200, 0)
def _sorted_period_keys(teams_periods: dict) -> list[str]:
a = teams_periods.get("A", {})
b = teams_periods.get("B", {})
keys = [k for k in a.keys() if k in b]
return sorted(keys, key=_period_sort_key)
def _current_period_key(payload: dict) -> str | None:
keys = _sorted_period_keys(payload.get("teams_periods", {}))
return keys[-1] if keys else None
# >>> 1) Замените вашу async-функцию get_team_stat на обычную sync:
def format_team_stat(team1: dict, team2: dict, period: str | None = None) -> list[dict]:
"""Форматирует статы двух команд в список записей для GFX (общие ключи + подписи)."""
stat_list = [
("coach_id", "coach id", "ID тренеров"),
("coach_fullname", "coach fullname", "Тренеры"),
("name", "name", "Команды"),
("outs", "Outs", ""),
("pim", "Penalty Minutes", "Минуты штрафа"),
("shots", "Shots on goal", "Броски в створ"),
("goals", "Goals", "Голы"),
("fo", "Face-offs", "Вбрасывания всего"),
("fow", "Face-offs won", "Вбрасывания"),
("fow_pct", "Face-offs won, %", "Выигранные вбрасывания, %"),
("hits", "Hits", "Силовые приемы"),
("bls", "Blocked shots", "Блокированные броски"),
("tka", "Takeaways", "Отборы"),
("toa", "Time on attack", "Время в атаке"),
("tie", "Even Strength Time on Ice", "Время в равных составах"),
("tipp", "Powerplay Time On Ice", "Время при большинстве"),
("tish", "Shorthanded Time On Ice", "Время при меньшинстве"),
("tien", "Emptry Net Time On Ice", "Время с пустыми воротами"),
("gva", "Giveaways", "Потери шайбы"),
("p_intc", "Pass interceptions", "Перехваты передачи"),
]
teams = [{k: str(v) for k, v in t.items()} for t in [team1, team2]]
keys = list(teams[0].keys())
formatted = []
if period is not None:
formatted.append(
{
"name0": "period",
"name1": str(period),
"name2": "",
"StatParameterGFX": _period_label(period) or "Period",
}
)
for key in keys:
row = {
"name0": key,
"name1": teams[0].get(key, ""),
"name2": teams[1].get(key, ""),
}
# подписи
for code, eng, _ru in stat_list:
if key == code:
row["StatParameterGFX"] = eng
break
formatted.append(row)
# постобработка отдельных полей
for r in formatted:
if r["name0"] == "fow_pct":
r["name1"] = str(round(float(r["name1"]))) if r["name1"] else r["name1"]
r["name2"] = str(round(float(r["name2"]))) if r["name2"] else r["name2"]
if r["name0"] == "coach_fullname":
# "Фамилия Имя" -> "Имя Фамилия" если есть пробел
def flip(s: str) -> str:
parts = s.split()
return f"{parts[1]} {parts[0]}" if len(parts) >= 2 else s
r["name1"] = flip(r["name1"])
r["name2"] = flip(r["name2"])
return formatted
# >>> 2) Вспомогалки для обхода периодов
def _iter_period_pairs(teams_periods: dict):
"""
Итерируем по периодам, отдаём (period_key, A_stats, B_stats).
Ключи сортируем по числу, если это цифры.
"""
a = teams_periods.get("A", {})
b = teams_periods.get("B", {})
def _key(k):
try:
return int(k)
except (TypeError, ValueError):
return k
for k in sorted(a.keys(), key=_key):
if k in b:
yield k, a[k], b[k]
def _build_all_stats(payload: dict) -> dict:
"""
Собирает общий блок ('total') и список по периодам ('periods') из json матча.
"""
total_a = payload["teams"]["A"]
total_b = payload["teams"]["B"]
result = {"total": format_team_stat(total_a, total_b), "periods": []}
for period_key, a_stat, b_stat in _iter_period_pairs(payload["teams_periods"]):
result["periods"].append(
{"period": period_key, "stats": format_team_stat(a_stat, b_stat)}
)
return result
@app.get("/teams/stats")
async def teams_stats(
scope: str = Query("all", pattern="^(all|total|period)$"),
n: str | None = Query(
None, description="Номер периода (строка или число) при scope=period"
),
):
"""
Все-в-одном: GET /teams/stats?scope=all
вернёт { total: [...], periods: [{period: "1", stats:[...]}, ...] }
Только общий: GET /teams/stats?scope=total
Конкретный период: GET /teams/stats?scope=period&n=2
"""
# читаем атомарно снапшот
with _latest_lock:
lgd = latest_game_data
if not lgd or "data" not in lgd or not isinstance(lgd["data"], dict):
raise HTTPException(status_code=400, detail="Нет данных по матчу.")
payload = lgd["data"]
if scope == "total":
data = format_team_stat(
payload["teams"]["A"], payload["teams"]["B"], period="Total"
)
return JSONResponse({"scope": "total", "data": data})
if scope == "period":
# 👉 если n не задан/или просит актуальный — берём последний период
wants_current = (n is None) or (str(n).strip().lower() in {"current", "last"})
if wants_current:
key = _current_period_key(payload)
if key is None:
raise HTTPException(status_code=404, detail="Периоды не найдены.")
n = key # используем актуальный ключ периода
# ключи в исходном json строковые
period_key = str(n)
a = payload["teams_periods"]["A"].get(period_key)
b = payload["teams_periods"]["B"].get(period_key)
if a is None or b is None:
raise HTTPException(
status_code=404, detail=f"Период {period_key} не найден."
)
return JSONResponse(
{
"scope": "period",
"period": period_key,
"is_current": period_key == _current_period_key(payload),
"data": format_team_stat(a, b, period=period_key),
}
)
# scope == "all"
cur = _current_period_key(payload)
return JSONResponse(
{"scope": "all", "current_period": cur, "data": _build_all_stats(payload)}
)
def _norm_name(s: str | None) -> str:
"""Нормализует название команды для сравнения."""
if not s:
return ""
return str(s).strip().casefold()
<<<<<<< HEAD
@app.get("/info")
def info():
=======
def _load_buf():
buf = nasio.load_bio(user=USER, password=PASSWORD,
nas_ip=SERVER_NAME, nas_port="443", path=PATH)
if isinstance(buf, (bytes, bytearray, memoryview)):
buf = io.BytesIO(buf)
buf.seek(0)
return buf
@app.get("/info")
async def info(format: str = "xlsx", sheet: str = "TEAMS"):
>>>>>>> c60caaa8aaad763cf7605dcd2c4502b8dfc3be84
# 1) Проверяем, выбран ли матч
global current_season
if not selected_game_id:
return JSONResponse({"message": "Матч не выбран", "selected_id": None})
# 2) Берём расписание и ищем строку по выбранному ID
df = load_today_schedule()
if df.empty:
return JSONResponse(
{"message": "Сегодня матчей нет", "selected_id": selected_game_id}
)
# безопасно приводим id к int и ищем
try:
row = df.loc[df["id"].astype(int) == int(selected_game_id)].iloc[0]
except Exception:
return JSONResponse(
{
"message": "Выбранный матч не найден в расписании на сегодня",
"selected_id": selected_game_id,
},
status_code=404,
)
home_name = str(row.get("homeName_en", "")).strip()
away_name = str(row.get("visitorName_en", "")).strip()
# 3) Подтягиваем справочник команд из Excel (лист TEAMS)
<<<<<<< HEAD
try:
binary_bytes: bytes = nasio.load_bio(
user=USER,
password=PASSWORD,
nas_ip=SERVER_NAME,
nas_port="443",
path=PATH,
# sheet="TEAMS"
)
buf = io.BytesIO(binary_bytes)
headers = {
"Content-Length": str(len(binary_bytes)),
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
# можно подсунуть имя файла (если парсеру это важно)
"Content-Disposition": 'inline; filename="MATCH.xlsm"',
}
return StreamingResponse(
buf,
media_type="application/vnd.ms-excel.sheet.macroEnabled.12",
headers=headers,
)
# Оставляем только полезные поля (подгони под свой файл)
keep = ["Team", "Logo", "Short", "HexPodl", "HexBase", "HexText"]
keep = [c for c in keep if c in teams_df.columns]
teams_df = teams_df.loc[:, keep].copy()
# 4) Нормализованные ключи для джоина по имени
teams_df["__key"] = teams_df["Team"].apply(_norm_name)
def _pick_team_info(name: str) -> dict:
key = _norm_name(name)
hit = teams_df.loc[teams_df["__key"] == key]
if hit.empty:
# не нашли точное совпадение — вернём только название
return {"Team": name}
rec = hit.iloc[0].to_dict()
rec.pop("__key", None)
# заменим NaN/inf на None, чтобы JSON не падал
for k, v in list(rec.items()):
if pd.isna(v) or v in (np.inf, -np.inf):
rec[k] = None
return rec
home_info = _pick_team_info(home_name)
away_info = _pick_team_info(away_name)
date_obj = datetime.strptime(row.get("datetime_str", ""), "%d.%m.%Y %H:%M")
try:
full_format = date_obj.strftime("%B %-d, %Y")
except ValueError:
full_format = date_obj.strftime("%B %#d, %Y")
payload = [
{
"selected_id": int(selected_game_id),
"tournament_id": (
int(current_tournament_id) if current_tournament_id else None
),
"datetime": str(full_format),
"arena": str(row.get("arena_en", "")),
"arena_city": str(row.get("arena_city_en", "")),
"home": home_info,
"home_city": str(row.get("homeCity_en", "")),
"away": away_info,
"away_city": str(row.get("visitorCity_en", "")),
"season": current_season,
}
]
return JSONResponse(content=payload)
except Exception as ex:
pprint(ex)
=======
src = _load_buf()
if format == "xlsx":
# читаем нужный лист из исходного XLSM
df = pd.read_excel(src, sheet_name=sheet, engine="openpyxl")
# пишем НОВЫЙ XLSX (без макросов) — это то, что понимает vMix
out = io.BytesIO()
with pd.ExcelWriter(out, engine="openpyxl") as writer:
df.to_excel(writer, sheet_name=sheet, index=False)
out.seek(0)
return StreamingResponse(
out,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
# стабильное имя файла, чтобы vMix не путался
"Content-Disposition": "inline; filename=vmix.xlsx",
# отключаем кэш браузера/прокси, vMix сам опрашивает по интервалу
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
},
)
elif format == "csv":
df = pd.read_excel(src, sheet_name=sheet, engine="openpyxl")
csv_bytes = df.to_csv(index=False, encoding="utf-8-sig").encode("utf-8")
return StreamingResponse(
io.BytesIO(csv_bytes),
media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition": "inline; filename=vmix.csv",
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
},
)
elif format == "json":
df = pd.read_excel(src, sheet_name=sheet, engine="openpyxl")
payload = json.dumps(df.to_dict(orient="records"), ensure_ascii=False)
return Response(
content=payload,
media_type="application/json; charset=utf-8",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
},
)
return Response("Unsupported format", status_code=400)
# # Оставляем только полезные поля (подгони под свой файл)
# keep = ["Team", "Logo", "Short", "HexPodl", "HexBase", "HexText"]
# keep = [c for c in keep if c in teams_df.columns]
# teams_df = teams_df.loc[:, keep].copy()
# # 4) Нормализованные ключи для джоина по имени
# teams_df["__key"] = teams_df["Team"].apply(_norm_name)
# def _pick_team_info(name: str) -> dict:
# key = _norm_name(name)
# hit = teams_df.loc[teams_df["__key"] == key]
# if hit.empty:
# # не нашли точное совпадение — вернём только название
# return {"Team": name}
# rec = hit.iloc[0].to_dict()
# rec.pop("__key", None)
# # заменим NaN/inf на None, чтобы JSON не падал
# for k, v in list(rec.items()):
# if pd.isna(v) or v in (np.inf, -np.inf):
# rec[k] = None
# return rec
# home_info = _pick_team_info(home_name)
# away_info = _pick_team_info(away_name)
# date_obj = datetime.strptime(row.get("datetime_str", ""), "%d.%m.%Y %H:%M")
# try:
# full_format = date_obj.strftime("%B %-d, %Y")
# except ValueError:
# full_format = date_obj.strftime("%B %#d, %Y")
# payload = [
# {
# "selected_id": int(selected_game_id),
# "tournament_id": (
# int(current_tournament_id) if current_tournament_id else None
# ),
# "datetime": str(full_format),
# "arena": str(row.get("arena_en", "")),
# "arena_city": str(row.get("arena_city_en", "")),
# "home": home_info,
# "home_city": str(row.get("homeCity_en", "")),
# "away": away_info,
# "away_city": str(row.get("visitorCity_en", "")),
# "season": current_season,
# }
# ]
# return JSONResponse(content=payload)
# except Exception as ex:
# pprint(ex)
>>>>>>> c60caaa8aaad763cf7605dcd2c4502b8dfc3be84
if __name__ == "__main__":
uvicorn.run(
"get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="debug"
)