Files
KHL/get_data.py

372 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI
from fastapi.responses import Response, JSONResponse, HTMLResponse
import pandas as pd
import requests, io, os
from requests.auth import HTTPBasicAuth
from dotenv import load_dotenv
from datetime import datetime
import uvicorn
from threading import Thread, Event, Lock
import time
from contextlib import asynccontextmanager
# --- Глобальные переменные ---
selected_game_id: int | None = None
current_tournament_id: int | None = None # будем обновлять при загрузке расписания
latest_game_data: dict | None = None # сюда кладём последние данные по матчу
latest_game_error: str | None = None
_latest_lock = Lock()
_stop_event = Event()
_worker_thread: Thread | None = None
# Загружаем переменные из .env
load_dotenv()
api_user = os.getenv("API_USER")
api_pass = os.getenv("API_PASS")
league = os.getenv("LEAGUE")
POLL_SEC = int(os.getenv("GAME_POLL_SECONDS"))
def load_today_schedule():
"""Возвращает DataFrame матчей на сегодня с нужными колонками (или пустой DF)."""
url_tournaments = "http://stat2tv.khl.ru/tournaments.xml"
r = requests.get(
url_tournaments, auth=HTTPBasicAuth(api_user, api_pass), verify=False
)
df = pd.read_xml(io.StringIO(r.text))
df["startDate"] = pd.to_datetime(df["startDate"], errors="coerce")
df["endDate"] = pd.to_datetime(df["endDate"], errors="coerce")
now = datetime.now()
filtered = df[
(df["level"] == league)
& (df["startDate"] <= now)
& (df["endDate"] >= now)
& (df["seasonPart"] == "regular")
]
if filtered.empty:
return pd.DataFrame()
tournament_id = int(filtered.iloc[0]["id"])
global current_tournament_id
current_tournament_id = tournament_id
url_schedule = f"http://stat2tv.khl.ru/{tournament_id}/schedule-{tournament_id}.xml"
r = requests.get(url_schedule, auth=HTTPBasicAuth(api_user, api_pass), verify=False)
schedule_df = pd.read_xml(io.StringIO(r.text))
# Нужные колонки (скорректируй под реальные имена из XML)
needed_columns = ["id", "date", "time", "homeName_en", "visitorName_en", "arena"]
exist = [c for c in needed_columns if c in schedule_df.columns]
schedule_df = schedule_df[exist].copy()
# Преобразуем дату и время
schedule_df["date"] = pd.to_datetime(schedule_df["date"], errors="coerce")
today = now.date()
schedule_today = schedule_df[schedule_df["date"].dt.date == today].copy()
# --- Нормализуем время и строим единый datetime для сортировки ---
# 1) берём из time только HH:MM (на случай '19:30', '19:30:00', '19:30 MSK', и т.п.)
time_clean = (
schedule_today.get("time")
.astype(str)
.str.extract(r"(?P<hhmm>\d{1,2}:\d{2})", expand=True)["hhmm"]
)
# 2) собираем строку "YYYY-MM-DD HH:MM" и парсим в Timestamp
date_str = schedule_today["date"].dt.strftime("%Y-%m-%d")
kickoff_str = (date_str + " " + time_clean.fillna("")).str.strip()
schedule_today["kickoff_dt"] = pd.to_datetime(kickoff_str, errors="coerce")
# 3) сортировка: сначала по времени (NaT в конец), потом по id
schedule_today = schedule_today.sort_values(
by=["kickoff_dt", "id"], ascending=[True, True], na_position="last"
)
# 4) человекочитаемая строка даты/времени для таблицы
schedule_today["datetime_str"] = schedule_today["kickoff_dt"].dt.strftime(
"%d.%m.%Y %H:%M"
)
# если time отсутствует и kickoff_dt = NaT — показываем просто дату
mask_nat = schedule_today["kickoff_dt"].isna()
schedule_today.loc[mask_nat, "datetime_str"] = schedule_today.loc[
mask_nat, "date"
].dt.strftime("%d.%m.%Y")
return schedule_today
def _build_game_url(tournament_id: int, game_id: int) -> str:
# URL по аналогии с расписанием: .../{tournament_id}/json_en/{game_id}.json
# Если у тебя другой шаблон — просто поменяй строку ниже.
return f"http://stat2tv.khl.ru/{tournament_id}/json_en/{game_id}.json"
def _fetch_game_once(tournament_id: int, game_id: int) -> dict:
"""Один запрос к API матча -> чистый JSON из API."""
url = _build_game_url(tournament_id, game_id)
r = requests.get(
url,
auth=HTTPBasicAuth(api_user, api_pass),
verify=False,
timeout=10
)
r.raise_for_status()
# Пробуем распарсить JSON
try:
data = r.json()
except ValueError:
# если это не JSON — вернём текст
data = {"raw": r.text}
return data
def _game_poll_worker():
"""Фоновый цикл: опрашивает API для выбранного game_id."""
global latest_game_data, latest_game_error
while not _stop_event.is_set():
gid = selected_game_id
tid = current_tournament_id
if gid and tid:
try:
data = _fetch_game_once(tid, gid)
with _latest_lock:
latest_game_data = {
"tournament_id": tid,
"game_id": gid,
"fetched_at": datetime.now().isoformat(),
"data": data,
}
latest_game_error = None
except Exception as e:
with _latest_lock:
latest_game_error = f"{type(e).__name__}: {e}"
# Ждём интервал (с возможностью ранней остановки)
_stop_event.wait(POLL_SEC)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Запускаем и останавливаем поток чтения данных при старте/остановке приложения."""
global _worker_thread
_stop_event.clear()
_worker_thread = Thread(target=_game_poll_worker, daemon=True)
_worker_thread.start()
print("✅ Background thread started")
# Отдаём управление FastAPI
yield
# Останавливаем при завершении
_stop_event.set()
if _worker_thread.is_alive():
_worker_thread.join(timeout=2)
print("🛑 Background thread stopped")
app = FastAPI(
lifespan=lifespan,
docs_url=None, # ❌ отключает /docs
redoc_url=None, # ❌ отключает /redoc
openapi_url=None # ❌ отключает /openapi.json
)
@app.get("/games")
async def games():
df = load_today_schedule()
if df.empty:
return JSONResponse({"message": "Сегодня матчей нет"})
json_schedule = df.to_json(orient="records", force_ascii=False, date_format="iso")
return Response(content=json_schedule, media_type="application/json")
@app.get("/select")
async def select():
df = load_today_schedule()
if df.empty:
return HTMLResponse(
"<h2 style='font-family:Inter,system-ui'>Сегодня матчей нет</h2>"
)
# Строим строки таблицы
rows_html = []
for _, row in df.iterrows():
gid = int(row["id"])
home = row.get("homeName_en", "")
away = row.get("visitorName_en", "")
when = row.get("datetime_str", "")
arena = row.get("arena", "")
rows_html.append(
f"""
<tr data-id="{gid}">
<td>{gid}</td>
<td>{when}</td>
<td>{home}</td>
<td>{away}</td>
<td>{arena}</td>
<td><button class="pick">Выбрать</button></td>
</tr>
"""
)
# ✅ Весь HTML, включая JS, внутри тройных кавычек
html = f"""
<!doctype html>
<html lang="ru">
<meta charset="utf-8">
<title>Выбор матча</title>
<style>
body {{ font-family: Inter, system-ui, -apple-system, Segoe UI, Roboto, Arial; background:#0b0f14; color:#e6edf3; margin:0; padding:24px; }}
h1 {{ margin:0 0 16px; font-size:20px; }}
table {{ border-collapse: collapse; width:100%; background:#0f1620; border:1px solid #1f2a36; border-radius:12px; overflow:hidden; }}
thead th {{ text-align:left; padding:12px 14px; font-weight:600; background:#121b26; border-bottom:1px solid #1f2a36; }}
tbody td {{ padding:12px 14px; border-top:1px solid #16212c; }}
tr:hover {{ background:#13202d; }}
tr.selected {{ outline:2px solid #6aa6ff; outline-offset:-2px; background:#132c49; }}
button.pick {{
border:1px solid #2a3a4d; background:#1a2635; padding:8px 12px; border-radius:10px;
color:#e6edf3; cursor:pointer; font-weight:600;
}}
button.pick:hover {{ background:#223243; }}
.status {{ margin-top:14px; opacity:0.9; }}
</style>
<h1>Матчи сегодня</h1>
<table id="tbl">
<thead>
<tr>
<th>ID</th><th>Дата/время</th><th>Хозяева</th><th>Гости</th><th>Арена</th><th></th>
</tr>
</thead>
<tbody>
{''.join(rows_html)}
</tbody>
</table>
<div class="status" id="status">Ничего не выбрано</div>
<script>
const tbl = document.getElementById('tbl');
const status = document.getElementById('status');
let selectedRow = null;
tbl.addEventListener('click', async (e) => {{
const btn = e.target.closest('.pick');
if (!btn) return;
const row = btn.closest('tr');
const gameId = row.getAttribute('data-id');
// визуально выделяем выбранную строку
if (selectedRow) selectedRow.classList.remove('selected');
row.classList.add('selected');
selectedRow = row;
status.textContent = 'Сохраняю выбор...';
try {{
const res = await fetch(`/select-game/${{gameId}}`, {{
method: 'POST'
}});
if (!res.ok) throw new Error('HTTP ' + res.status);
const data = await res.json();
status.textContent = 'Выбран матч ID: ' + data.selected_id;
}} catch (err) {{
status.textContent = 'Ошибка: ' + err.message;
}}
}});
</script>
</html>
"""
return HTMLResponse(html)
@app.post("/select-game/{game_id}")
async def select_game(game_id: int):
global selected_game_id, latest_game_data, latest_game_error
selected_game_id = game_id
# моментально подтянуть первое состояние, если известен турнир
if current_tournament_id:
try:
data = _fetch_game_once(current_tournament_id, selected_game_id)
with _latest_lock:
latest_game_data = {
"tournament_id": current_tournament_id,
"game_id": selected_game_id,
"fetched_at": datetime.now().isoformat(),
"data": data,
}
latest_game_error = None
except Exception as e:
with _latest_lock:
latest_game_error = f"{type(e).__name__}: {e}"
return JSONResponse({"selected_id": selected_game_id})
@app.get("/selected-game")
async def get_selected_game():
return JSONResponse({"selected_id": selected_game_id})
@app.get("/game/url")
async def game_url():
if not (selected_game_id and current_tournament_id):
return JSONResponse({"message": "game_id или tournament_id не задан"})
return JSONResponse({
"url": _build_game_url(current_tournament_id, selected_game_id),
"game_id": selected_game_id,
"tournament_id": current_tournament_id
})
@app.get("/data")
async def game_data():
with _latest_lock:
if latest_game_data:
return JSONResponse(latest_game_data)
if latest_game_error:
return JSONResponse({"error": latest_game_error}, status_code=502)
return JSONResponse({"message": "Ещё нет данных. Выберите матч и подождите первое обновление."})
@app.get("/referee")
async def referee():
json_data = latest_game_data["data"]
referees_id = [
json_data["game"]["mref1_id"],
json_data["game"]["mref2_id"],
json_data["game"]["lref1_id"],
json_data["game"]["lref2_id"],
]
data_referees = [
{
"number": json_data["game"]["mref1_num"],
"fullname": f'{json_data["game"]["mref1"].split()[1]} {json_data["game"]["mref1"].split()[0]}',
},
{
"number": json_data["game"]["mref2_num"],
"fullname": f'{json_data["game"]["mref2"].split()[1]} {json_data["game"]["mref2"].split()[0]}',
},
{
"number": json_data["game"]["lref1_num"],
"fullname": f'{json_data["game"]["lref1"].split()[1]} {json_data["game"]["lref1"].split()[0]}',
},
{
"number": json_data["game"]["lref2_num"],
"fullname": f'{json_data["game"]["lref2"].split()[1]} {json_data["game"]["lref2"].split()[0]}',
},
]
return data_referees
# def team(who:str):
if __name__ == "__main__":
uvicorn.run(
"get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="debug"
)