from fastapi import FastAPI, HTTPException, Query from fastapi.responses import Response, JSONResponse, HTMLResponse from fastapi.encoders import jsonable_encoder import pandas as pd import requests, io, os from requests.auth import HTTPBasicAuth from dotenv import load_dotenv from datetime import datetime import uvicorn from threading import Thread, Event, Lock import time from contextlib import asynccontextmanager import numpy as np import nasio # --- Глобальные переменные --- selected_game_id: int | None = None current_tournament_id: int | None = None # будем обновлять при загрузке расписания current_season: str | None = None # будем обновлять при загрузке расписания latest_game_data: dict | None = None # сюда кладём последние данные по матчу latest_game_error: str | None = None _latest_lock = Lock() _stop_event = Event() _worker_thread: Thread | None = None # Загружаем переменные из .env if load_dotenv(dotenv_path="/mnt/khl/.env",verbose=True): print("Добавить в лог что был найден файл окружения!!") pass else: load_dotenv() print("Добавить в лог что не был найден файл окружения!!") api_user = os.getenv("API_USER") api_pass = os.getenv("API_PASS") league = os.getenv("LEAGUE") POLL_SEC = int(os.getenv("GAME_POLL_SECONDS")) SERVER_NAME = os.getenv("SYNO_URL") USER = os.getenv("SYNO_USERNAME") PASSWORD = os.getenv("SYNO_PASSWORD") PATH = "/team-folders/GFX/Hockey/KHL/Soft/MATCH.xlsm" def load_today_schedule(): """Возвращает DataFrame матчей на сегодня с нужными колонками (или пустой DF).""" url_tournaments = "http://stat2tv.khl.ru/tournaments.xml" r = requests.get( url_tournaments, auth=HTTPBasicAuth(api_user, api_pass), verify=False ) df = pd.read_xml(io.StringIO(r.text)) df["startDate"] = pd.to_datetime(df["startDate"], errors="coerce") df["endDate"] = pd.to_datetime(df["endDate"], errors="coerce") now = datetime.now() filtered = df[ (df["level"] == league) & (df["startDate"] <= now) & (df["endDate"] >= now) & (df["seasonPart"] == "regular") ] if filtered.empty: return pd.DataFrame() tournament_id = int(filtered.iloc[0]["id"]) season = str(filtered.iloc[0]["season"]) global current_tournament_id, current_season current_tournament_id = tournament_id current_season = season url_schedule = f"http://stat2tv.khl.ru/{tournament_id}/schedule-{tournament_id}.xml" r = requests.get(url_schedule, auth=HTTPBasicAuth(api_user, api_pass), verify=False) schedule_df = pd.read_xml(io.StringIO(r.text)) # Нужные колонки (скорректируй под реальные имена из XML) needed_columns = ["id", "date", "time", "homeName_en", "visitorName_en", "arena_en", "arena_city_en", "homeCity_en", "visitorCity_en"] exist = [c for c in needed_columns if c in schedule_df.columns] schedule_df = schedule_df[exist].copy() # Преобразуем дату и время schedule_df["date"] = pd.to_datetime(schedule_df["date"], errors="coerce") today = now.date() schedule_today = schedule_df[schedule_df["date"].dt.date == today].copy() # --- Нормализуем время и строим единый datetime для сортировки --- # 1) берём из time только HH:MM (на случай '19:30', '19:30:00', '19:30 MSK', и т.п.) time_clean = ( schedule_today.get("time") .astype(str) .str.extract(r"(?P\d{1,2}:\d{2})", expand=True)["hhmm"] ) # 2) собираем строку "YYYY-MM-DD HH:MM" и парсим в Timestamp date_str = schedule_today["date"].dt.strftime("%Y-%m-%d") kickoff_str = (date_str + " " + time_clean.fillna("")).str.strip() schedule_today["kickoff_dt"] = pd.to_datetime(kickoff_str, errors="coerce") # 3) сортировка: сначала по времени (NaT в конец), потом по id schedule_today = schedule_today.sort_values( by=["kickoff_dt", "id"], ascending=[True, True], na_position="last" ) # 4) человекочитаемая строка даты/времени для таблицы schedule_today["datetime_str"] = schedule_today["kickoff_dt"].dt.strftime( "%d.%m.%Y %H:%M" ) # если time отсутствует и kickoff_dt = NaT — показываем просто дату mask_nat = schedule_today["kickoff_dt"].isna() schedule_today.loc[mask_nat, "datetime_str"] = schedule_today.loc[ mask_nat, "date" ].dt.strftime("%d.%m.%Y") return schedule_today def _build_game_url(tournament_id: int, game_id: int) -> str: # URL по аналогии с расписанием: .../{tournament_id}/json_en/{game_id}.json # Если у тебя другой шаблон — просто поменяй строку ниже. return f"http://stat2tv.khl.ru/{tournament_id}/json_en/{game_id}.json" def _fetch_game_once(tournament_id: int, game_id: int) -> dict: """Один запрос к API матча -> чистый JSON из API.""" url = _build_game_url(tournament_id, game_id) r = requests.get( url, auth=HTTPBasicAuth(api_user, api_pass), verify=False, timeout=10 ) r.raise_for_status() # Пробуем распарсить JSON try: data = r.json() except ValueError: # если это не JSON — вернём текст data = {"raw": r.text} return data def _game_poll_worker(): """Фоновый цикл: опрашивает API для выбранного game_id.""" global latest_game_data, latest_game_error while not _stop_event.is_set(): gid = selected_game_id tid = current_tournament_id if gid and tid: try: data = _fetch_game_once(tid, gid) with _latest_lock: latest_game_data = { "tournament_id": tid, "game_id": gid, "fetched_at": datetime.now().isoformat(), "data": data, } latest_game_error = None except Exception as e: with _latest_lock: latest_game_error = f"{type(e).__name__}: {e}" # Ждём интервал (с возможностью ранней остановки) _stop_event.wait(POLL_SEC) @asynccontextmanager async def lifespan(app: FastAPI): """Запускаем и останавливаем поток чтения данных при старте/остановке приложения.""" global _worker_thread _stop_event.clear() _worker_thread = Thread(target=_game_poll_worker, daemon=True) _worker_thread.start() print("✅ Background thread started") # Отдаём управление FastAPI yield # Останавливаем при завершении _stop_event.set() if _worker_thread.is_alive(): _worker_thread.join(timeout=2) print("🛑 Background thread stopped") app = FastAPI( lifespan=lifespan, docs_url=None, # ❌ отключает /docs redoc_url=None, # ❌ отключает /redoc openapi_url=None # ❌ отключает /openapi.json ) @app.get("/games") async def games(): df = load_today_schedule() if df.empty: return JSONResponse({"message": "Сегодня матчей нет"}) json_schedule = df.to_json(orient="records", force_ascii=False, date_format="iso") return Response(content=json_schedule, media_type="application/json") @app.get("/select") async def select(): df = load_today_schedule() if df.empty: return HTMLResponse( "

Сегодня матчей нет

" ) # Строим строки таблицы rows_html = [] for _, row in df.iterrows(): gid = int(row["id"]) home = row.get("homeName_en", "") away = row.get("visitorName_en", "") when = row.get("datetime_str", "") arena = row.get("arena_en", "") city = row.get("arena_city_en", "") rows_html.append( f""" {gid} {when} {home} {away} {arena} {city} """ ) # ✅ Весь HTML, включая JS, внутри тройных кавычек html = f""" Выбор матча

Матчи сегодня

{''.join(rows_html)}
IDДата/времяХозяеваГостиАренаГород
Ничего не выбрано
""" return HTMLResponse(html) @app.post("/select-game/{game_id}") async def select_game(game_id: int): global selected_game_id, latest_game_data, latest_game_error selected_game_id = game_id # моментально подтянуть первое состояние, если известен турнир if current_tournament_id: try: data = _fetch_game_once(current_tournament_id, selected_game_id) with _latest_lock: latest_game_data = { "tournament_id": current_tournament_id, "game_id": selected_game_id, "fetched_at": datetime.now().isoformat(), "data": data, } latest_game_error = None except Exception as e: with _latest_lock: latest_game_error = f"{type(e).__name__}: {e}" return JSONResponse({"selected_id": selected_game_id}) @app.get("/selected-game") async def get_selected_game(): return JSONResponse({"selected_id": selected_game_id}) @app.get("/game/url") async def game_url(): if not (selected_game_id and current_tournament_id): return JSONResponse({"message": "game_id или tournament_id не задан"}) return JSONResponse({ "url": _build_game_url(current_tournament_id, selected_game_id), "game_id": selected_game_id, "tournament_id": current_tournament_id }) # @app.get("/info") # async def info(): # if selected_game_id: # df = load_today_schedule() @app.get("/data") async def game_data(): with _latest_lock: if latest_game_data: return JSONResponse(latest_game_data) if latest_game_error: return JSONResponse({"error": latest_game_error}, status_code=502) return JSONResponse({"message": "Ещё нет данных. Выберите матч и подождите первое обновление."}) @app.get("/referee") async def referee(): json_data = latest_game_data["data"] data_referees = [ { "number": json_data["game"]["mref1_num"], "fullname": f'{json_data["game"]["mref1"].split()[1]} {json_data["game"]["mref1"].split()[0]}', }, { "number": json_data["game"]["mref2_num"], "fullname": f'{json_data["game"]["mref2"].split()[1]} {json_data["game"]["mref2"].split()[0]}', }, { "number": json_data["game"]["lref1_num"], "fullname": f'{json_data["game"]["lref1"].split()[1]} {json_data["game"]["lref1"].split()[0]}', }, { "number": json_data["game"]["lref2_num"], "fullname": f'{json_data["game"]["lref2"].split()[1]} {json_data["game"]["lref2"].split()[0]}', }, ] return data_referees async def team(who:str): """"who: A - домашняя команда, B - гостевая""" with _latest_lock: lgd = latest_game_data # print(lgd) if not lgd or "data" not in lgd: return [{"details":"Нет данных по матчу!"}] players1_temp = lgd["data"]["players"][who] players1 = [] players1_f = [] players1_d = [] players1_g = [] goaltenders1 = [] for player in players1_temp: players1_temp[player]["mask"] = "#FF0000" # for s in seasonA_json: # if int(players1_temp[player]["id"]) == int(s["id"]): # for key, value in s.items(): # # Создаем новый ключ с префиксом # new_key = f"season_{key}" # players1_temp[player][new_key] = value if players1_temp[player]["ps"] not in ["в", "g"]: data1 = players1_temp[player] if "." in data1["name"]: lastname1, *names1 = data1["name"].split() names_new = " ".join(names1) elif len(data1["name"].split()) == 3: *lastname1, names1 = data1["name"].split() names_new = names1 lastname1 = " ".join(lastname1) else: lastname1, *names1 = data1["name"].split() names_new = " ".join(names1) if players1_temp[player]["ps"] == "н": position = "нападающий" elif players1_temp[player]["ps"] == "з": position = "защитник" elif players1_temp[player]["ps"] == "f": position = "forward" elif players1_temp[player]["ps"] == "d": position = "defenseman" data_with_number = { "number": player, "NameSurnameGFX": names_new + " " + lastname1, "NameGFX": names_new, "SurnameGFX": lastname1, "PositionGFX": position, **data1, } if players1_temp[player]["ps"] == "н": players1_f.append(data_with_number) elif players1_temp[player]["ps"] == "з": players1_d.append(data_with_number) elif players1_temp[player]["ps"] == "f": players1_f.append(data_with_number) elif players1_temp[player]["ps"] == "d": players1_d.append(data_with_number) else: data2 = players1_temp[player] position = "" if players1_temp[player]["ps"] == "в": position = "вратарь" elif players1_temp[player]["ps"] == "g": position = "Goaltender" lastname1, *names1 = data2["name"].split() names_new = " ".join(names1) data_with_number2 = { "number": player, "NameSurnameGFX": names_new + " " + lastname1, "NameGFX": names_new, "SurnameGFX": lastname1, "PositionGFX": position, **data2, } players1_g.append(data_with_number2) goaltenders1.append(data_with_number2) def make_empty(example_list): if not example_list: return {} return {key: "" for key in example_list[0].keys()} empty_d = make_empty(players1_d) empty_f = make_empty(players1_f) empty_g = make_empty(players1_g) # добивка пустыми слотами while len(players1_d) < 9: players1_d.append(empty_d.copy()) while len(players1_f) < 13: players1_f.append(empty_f.copy()) while len(players1_g) < 3: players1_g.append(empty_g.copy()) players1 = players1_d + players1_f + players1_g # print(len(players1)) return players1 @app.get("/team1") async def team1(): return await team("A") @app.get("/team2") async def team2(): return await team("B") # 👉 метка для первой строки (period row) def _period_label(period: str | int) -> str: s = str(period).strip().upper() # овертаймы: OT, OT1, OT2... или числовые >= 4 if s == "OT" or (s.startswith("OT") and s[2:].isdigit()): return "AFTER OVERTIME" if s.isdigit(): n = int(s) if n >= 4: return "AFTER OVERTIME" # 1–3 — с порядковым суффиксом if n % 100 in (11, 12, 13): suffix = "TH" else: suffix = {1: "ST", 2: "ND", 3: "RD"}.get(n % 10, "TH") return f"AFTER {n}{suffix} PERIOD" # например, Total if s == "TOTAL": return "FINAL" return "" # 👉 сортировка периодов: 1,2,3, затем OT/OT2/... def _period_sort_key(k: str) -> tuple[int, int]: s = str(k).strip().upper() # 1–3 — обычные периоды if s.isdigit(): n = int(s) if 1 <= n <= 3: return (n, 0) # 4-й и далее — трактуем как овертаймы (4 -> OT1, 5 -> OT2 ...) return (100, n - 3) # явные овертаймы: OT, OT1, OT2... if s == "OT": return (100, 1) if s.startswith("OT") and s[2:].isdigit(): return (100, int(s[2:])) # неизвестные — в конец return (200, 0) def _sorted_period_keys(teams_periods: dict) -> list[str]: a = teams_periods.get("A", {}) b = teams_periods.get("B", {}) keys = [k for k in a.keys() if k in b] return sorted(keys, key=_period_sort_key) def _current_period_key(payload: dict) -> str | None: keys = _sorted_period_keys(payload.get("teams_periods", {})) return keys[-1] if keys else None # >>> 1) Замените вашу async-функцию get_team_stat на обычную sync: def format_team_stat(team1: dict, team2: dict, period: str | None = None) -> list[dict]: """Форматирует статы двух команд в список записей для GFX (общие ключи + подписи).""" stat_list = [ ("coach_id", "coach id", "ID тренеров"), ("coach_fullname", "coach fullname", "Тренеры"), ("name", "name", "Команды"), ("outs", "Outs", ""), ("pim", "Penalty Minutes", "Минуты штрафа"), ("shots", "Shots on goal", "Броски в створ"), ("goals", "Goals", "Голы"), ("fo", "Face-offs", "Вбрасывания всего"), ("fow", "Face-offs won", "Вбрасывания"), ("fow_pct", "Face-offs won, %", "Выигранные вбрасывания, %"), ("hits", "Hits", "Силовые приемы"), ("bls", "Blocked shots", "Блокированные броски"), ("tka", "Takeaways", "Отборы"), ("toa", "Time on attack", "Время в атаке"), ("tie", "Even Strength Time on Ice", "Время в равных составах"), ("tipp", "Powerplay Time On Ice", "Время при большинстве"), ("tish", "Shorthanded Time On Ice", "Время при меньшинстве"), ("tien", "Emptry Net Time On Ice", "Время с пустыми воротами"), ("gva", "Giveaways", "Потери шайбы"), ("p_intc", "Pass interceptions", "Перехваты передачи"), ] teams = [{k: str(v) for k, v in t.items()} for t in [team1, team2]] keys = list(teams[0].keys()) formatted = [] if period is not None: formatted.append({ "name0": "period", "name1": str(period), "name2": "", "StatParameterGFX": _period_label(period) or "Period" }) for key in keys: row = {"name0": key, "name1": teams[0].get(key, ""), "name2": teams[1].get(key, "")} # подписи for code, eng, _ru in stat_list: if key == code: row["StatParameterGFX"] = eng break formatted.append(row) # постобработка отдельных полей for r in formatted: if r["name0"] == "fow_pct": r["name1"] = str(round(float(r["name1"]))) if r["name1"] else r["name1"] r["name2"] = str(round(float(r["name2"]))) if r["name2"] else r["name2"] if r["name0"] == "coach_fullname": # "Фамилия Имя" -> "Имя Фамилия" если есть пробел def flip(s: str) -> str: parts = s.split() return f"{parts[1]} {parts[0]}" if len(parts) >= 2 else s r["name1"] = flip(r["name1"]) r["name2"] = flip(r["name2"]) return formatted # >>> 2) Вспомогалки для обхода периодов def _iter_period_pairs(teams_periods: dict): """ Итерируем по периодам, отдаём (period_key, A_stats, B_stats). Ключи сортируем по числу, если это цифры. """ a = teams_periods.get("A", {}) b = teams_periods.get("B", {}) def _key(k): try: return int(k) except (TypeError, ValueError): return k for k in sorted(a.keys(), key=_key): if k in b: yield k, a[k], b[k] def _build_all_stats(payload: dict) -> dict: """ Собирает общий блок ('total') и список по периодам ('periods') из json матча. """ total_a = payload["teams"]["A"] total_b = payload["teams"]["B"] result = { "total": format_team_stat(total_a, total_b), "periods": [] } for period_key, a_stat, b_stat in _iter_period_pairs(payload["teams_periods"]): result["periods"].append({ "period": period_key, "stats": format_team_stat(a_stat, b_stat) }) return result @app.get("/teams/stats") async def teams_stats( scope: str = Query("all", pattern="^(all|total|period)$"), n: str | None = Query(None, description="Номер периода (строка или число) при scope=period"), ): """ Все-в-одном: GET /teams/stats?scope=all вернёт { total: [...], periods: [{period: "1", stats:[...]}, ...] } Только общий: GET /teams/stats?scope=total Конкретный период: GET /teams/stats?scope=period&n=2 """ # читаем атомарно снапшот with _latest_lock: lgd = latest_game_data if not lgd or "data" not in lgd or not isinstance(lgd["data"], dict): raise HTTPException(status_code=400, detail="Нет данных по матчу.") payload = lgd["data"] if scope == "total": data = format_team_stat(payload["teams"]["A"], payload["teams"]["B"], period="Total") return JSONResponse({"scope": "total", "data": data}) if scope == "period": # 👉 если n не задан/или просит актуальный — берём последний период wants_current = (n is None) or (str(n).strip().lower() in {"current", "last"}) if wants_current: key = _current_period_key(payload) if key is None: raise HTTPException(status_code=404, detail="Периоды не найдены.") n = key # используем актуальный ключ периода # ключи в исходном json строковые period_key = str(n) a = payload["teams_periods"]["A"].get(period_key) b = payload["teams_periods"]["B"].get(period_key) if a is None or b is None: raise HTTPException(status_code=404, detail=f"Период {period_key} не найден.") return JSONResponse({ "scope": "period", "period": period_key, "is_current": period_key == _current_period_key(payload), "data": format_team_stat(a, b, period=period_key) }) # scope == "all" cur = _current_period_key(payload) return JSONResponse({ "scope": "all", "current_period": cur, "data": _build_all_stats(payload) }) def _norm_name(s: str | None) -> str: """Нормализует название команды для сравнения.""" if not s: return "" return str(s).strip().casefold() @app.get("/info") async def info(): # 1) Проверяем, выбран ли матч global current_season if not selected_game_id: return JSONResponse({"message": "Матч не выбран", "selected_id": None}) # 2) Берём расписание и ищем строку по выбранному ID df = load_today_schedule() if df.empty: return JSONResponse({"message": "Сегодня матчей нет", "selected_id": selected_game_id}) # безопасно приводим id к int и ищем try: row = df.loc[df["id"].astype(int) == int(selected_game_id)].iloc[0] except Exception: return JSONResponse({"message": "Выбранный матч не найден в расписании на сегодня", "selected_id": selected_game_id}, status_code=404) home_name = str(row.get("homeName_en", "")).strip() away_name = str(row.get("visitorName_en", "")).strip() # 3) Подтягиваем справочник команд из Excel (лист TEAMS) teams_df = nasio.load_formatted( user=USER, password=PASSWORD, nas_ip=SERVER_NAME, nas_port="443", path=PATH, sheet="TEAMS" ) # Оставляем только полезные поля (подгони под свой файл) keep = [ "Team", "Logo", "Short", "HexPodl", "HexBase", "HexText" ] keep = [c for c in keep if c in teams_df.columns] teams_df = teams_df.loc[:, keep].copy() # 4) Нормализованные ключи для джоина по имени teams_df["__key"] = teams_df["Team"].apply(_norm_name) def _pick_team_info(name: str) -> dict: key = _norm_name(name) hit = teams_df.loc[teams_df["__key"] == key] if hit.empty: # не нашли точное совпадение — вернём только название return {"Team": name} rec = hit.iloc[0].to_dict() rec.pop("__key", None) # заменим NaN/inf на None, чтобы JSON не падал for k, v in list(rec.items()): if pd.isna(v) or v in (np.inf, -np.inf): rec[k] = None return rec home_info = _pick_team_info(home_name) away_info = _pick_team_info(away_name) date_obj = datetime.strptime(row.get("datetime_str", ""), "%d.%m.%Y %H:%M") try: full_format = date_obj.strftime("%B %-d, %Y") except ValueError: full_format = date_obj.strftime("%B %#d, %Y") payload = [{ "selected_id": int(selected_game_id), "tournament_id": int(current_tournament_id) if current_tournament_id else None, "datetime": str(full_format), "arena": str(row.get("arena_en", "")), "arena_city": str(row.get("arena_city_en", "")), "home": home_info, "home_city": str(row.get("homeCity_en", "")), "away": away_info, "away_city": str(row.get("visitorCity_en", "")), "season": current_season, }] return JSONResponse(content=payload) if __name__ == "__main__": uvicorn.run( "get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="debug" )