from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import Response, JSONResponse, HTMLResponse, StreamingResponse from fastapi.encoders import jsonable_encoder import pandas as pd import requests, io, os from requests.auth import HTTPBasicAuth from dotenv import load_dotenv from datetime import datetime import uvicorn from threading import Thread, Event, Lock import time from contextlib import asynccontextmanager import numpy as np import nasio import logging import logging.config import platform import json from pprint import pprint # --- Глобальные переменные --- selected_game_id: int | None = None current_tournament_id: int | None = None # будем обновлять при загрузке расписания current_season: str | None = None # будем обновлять при загрузке расписания latest_game_data: dict | None = None # сюда кладём последние данные по матчу latest_game_error: str | None = None _latest_lock = Lock() _stop_event = Event() _worker_thread: Thread | None = None # Загружаем переменные из .env if load_dotenv(dotenv_path="/mnt/khl/.env", verbose=True): print("Добавить в лог что был найден файл окружения!!") pass else: load_dotenv() print("Добавить в лог что не был найден файл окружения!!") api_user = os.getenv("API_USER") api_pass = os.getenv("API_PASS") league = os.getenv("LEAGUE") POLL_SEC = int(os.getenv("GAME_POLL_SECONDS")) SERVER_NAME = os.getenv("SYNO_URL") USER = os.getenv("SYNO_USERNAME") PASSWORD = os.getenv("SYNO_PASSWORD") PATH = "/team-folders/GFX/Hockey/KHL/Soft/MATCH.xlsm" def load_today_schedule(): """Возвращает DataFrame матчей на сегодня с нужными колонками (или пустой DF).""" url_tournaments = "http://stat2tv.khl.ru/tournaments.xml" r = requests.get( url_tournaments, auth=HTTPBasicAuth(api_user, api_pass), verify=False ) df = pd.read_xml(io.StringIO(r.text)) df["startDate"] = pd.to_datetime(df["startDate"], errors="coerce") df["endDate"] = pd.to_datetime(df["endDate"], errors="coerce") now = datetime.now() filtered = df[ (df["level"] == league) & (df["startDate"] <= now) & (df["endDate"] >= now) & (df["seasonPart"] == "regular") ] if filtered.empty: return pd.DataFrame() tournament_id = int(filtered.iloc[0]["id"]) season = str(filtered.iloc[0]["season"]) global current_tournament_id, current_season current_tournament_id = tournament_id current_season = season url_schedule = f"http://stat2tv.khl.ru/{tournament_id}/schedule-{tournament_id}.xml" r = requests.get(url_schedule, auth=HTTPBasicAuth(api_user, api_pass), verify=False) schedule_df = pd.read_xml(io.StringIO(r.text)) # Нужные колонки (скорректируй под реальные имена из XML) needed_columns = [ "id", "date", "time", "homeName_en", "visitorName_en", "arena_en", "arena_city_en", "homeCity_en", "visitorCity_en", ] exist = [c for c in needed_columns if c in schedule_df.columns] schedule_df = schedule_df[exist].copy() # Преобразуем дату и время schedule_df["date"] = pd.to_datetime(schedule_df["date"], errors="coerce") today = now.date() schedule_today = schedule_df[schedule_df["date"].dt.date == today].copy() # --- Нормализуем время и строим единый datetime для сортировки --- # 1) берём из time только HH:MM (на случай '19:30', '19:30:00', '19:30 MSK', и т.п.) time_clean = ( schedule_today.get("time") .astype(str) .str.extract(r"(?P\d{1,2}:\d{2})", expand=True)["hhmm"] ) # 2) собираем строку "YYYY-MM-DD HH:MM" и парсим в Timestamp date_str = schedule_today["date"].dt.strftime("%Y-%m-%d") kickoff_str = (date_str + " " + time_clean.fillna("")).str.strip() schedule_today["kickoff_dt"] = pd.to_datetime(kickoff_str, errors="coerce") # 3) сортировка: сначала по времени (NaT в конец), потом по id schedule_today = schedule_today.sort_values( by=["kickoff_dt", "id"], ascending=[True, True], na_position="last" ) # 4) человекочитаемая строка даты/времени для таблицы schedule_today["datetime_str"] = schedule_today["kickoff_dt"].dt.strftime( "%d.%m.%Y %H:%M" ) # если time отсутствует и kickoff_dt = NaT — показываем просто дату mask_nat = schedule_today["kickoff_dt"].isna() schedule_today.loc[mask_nat, "datetime_str"] = schedule_today.loc[ mask_nat, "date" ].dt.strftime("%d.%m.%Y") return schedule_today def _build_game_url(tournament_id: int, game_id: int) -> str: # URL по аналогии с расписанием: .../{tournament_id}/json_en/{game_id}.json # Если у тебя другой шаблон — просто поменяй строку ниже. return f"http://stat2tv.khl.ru/{tournament_id}/json_en/{game_id}.json" def _fetch_game_once(tournament_id: int, game_id: int) -> dict: """Один запрос к API матча -> чистый JSON из API.""" url = _build_game_url(tournament_id, game_id) r = requests.get( url, auth=HTTPBasicAuth(api_user, api_pass), verify=False, timeout=10 ) r.raise_for_status() # Пробуем распарсить JSON try: data = r.json() except ValueError: # если это не JSON — вернём текст data = {"raw": r.text} return data def _game_poll_worker(): """Фоновый цикл: опрашивает API для выбранного game_id.""" global latest_game_data, latest_game_error while not _stop_event.is_set(): gid = selected_game_id tid = current_tournament_id if gid and tid: try: data = _fetch_game_once(tid, gid) with _latest_lock: latest_game_data = { "tournament_id": tid, "game_id": gid, "fetched_at": datetime.now().isoformat(), "data": data, } latest_game_error = None except Exception as e: with _latest_lock: latest_game_error = f"{type(e).__name__}: {e}" # Ждём интервал (с возможностью ранней остановки) _stop_event.wait(POLL_SEC) @asynccontextmanager async def lifespan(app: FastAPI): """Запускаем и останавливаем поток чтения данных при старте/остановке приложения.""" global _worker_thread _stop_event.clear() _worker_thread = Thread(target=_game_poll_worker, daemon=True) _worker_thread.start() print("✅ Background thread started") # Отдаём управление FastAPI yield # Останавливаем при завершении _stop_event.set() if _worker_thread.is_alive(): _worker_thread.join(timeout=2) print("🛑 Background thread stopped") app = FastAPI( lifespan=lifespan, docs_url=None, # ❌ отключает /docs redoc_url=None, # ❌ отключает /redoc openapi_url=None, # ❌ отключает /openapi.json ) @app.get("/games") async def games(): df = load_today_schedule() if df.empty: return JSONResponse({"message": "Сегодня матчей нет"}) json_schedule = df.to_json(orient="records", force_ascii=False, date_format="iso") return Response(content=json_schedule, media_type="application/json") @app.get("/select") async def select(): df = load_today_schedule() if df.empty: return HTMLResponse( "

Сегодня матчей нет

" ) # Строим строки таблицы rows_html = [] for _, row in df.iterrows(): gid = int(row["id"]) home = row.get("homeName_en", "") away = row.get("visitorName_en", "") when = row.get("datetime_str", "") arena = row.get("arena_en", "") city = row.get("arena_city_en", "") rows_html.append( f""" {gid} {when} {home} {away} {arena} {city} """ ) # ✅ Весь HTML, включая JS, внутри тройных кавычек html = f""" Выбор матча

Матчи сегодня

{''.join(rows_html)}
IDДата/времяХозяеваГостиАренаГород
Ничего не выбрано
""" return HTMLResponse(html) @app.post("/select-game/{game_id}") async def select_game(game_id: int): global selected_game_id, latest_game_data, latest_game_error selected_game_id = game_id # моментально подтянуть первое состояние, если известен турнир if current_tournament_id: try: data = _fetch_game_once(current_tournament_id, selected_game_id) with _latest_lock: latest_game_data = { "tournament_id": current_tournament_id, "game_id": selected_game_id, "fetched_at": datetime.now().isoformat(), "data": data, } latest_game_error = None except Exception as e: with _latest_lock: latest_game_error = f"{type(e).__name__}: {e}" return JSONResponse({"selected_id": selected_game_id}) @app.get("/selected-game") async def get_selected_game(): return JSONResponse({"selected_id": selected_game_id}) @app.get("/game/url") async def game_url(): if not (selected_game_id and current_tournament_id): return JSONResponse({"message": "game_id или tournament_id не задан"}) return JSONResponse( { "url": _build_game_url(current_tournament_id, selected_game_id), "game_id": selected_game_id, "tournament_id": current_tournament_id, } ) # @app.get("/info") # async def info(): # if selected_game_id: # df = load_today_schedule() @app.get("/data") async def game_data(): with _latest_lock: if latest_game_data: return JSONResponse(latest_game_data) if latest_game_error: return JSONResponse({"error": latest_game_error}, status_code=502) return JSONResponse( {"message": "Ещё нет данных. Выберите матч и подождите первое обновление."} ) @app.get("/referee") async def referee(): json_data = latest_game_data["data"] data_referees = [ { "number": json_data["game"]["mref1_num"], "fullname": f'{json_data["game"]["mref1"].split()[1]} {json_data["game"]["mref1"].split()[0]}', }, { "number": json_data["game"]["mref2_num"], "fullname": f'{json_data["game"]["mref2"].split()[1]} {json_data["game"]["mref2"].split()[0]}', }, { "number": json_data["game"]["lref1_num"], "fullname": f'{json_data["game"]["lref1"].split()[1]} {json_data["game"]["lref1"].split()[0]}', }, { "number": json_data["game"]["lref2_num"], "fullname": f'{json_data["game"]["lref2"].split()[1]} {json_data["game"]["lref2"].split()[0]}', }, ] return data_referees async def team(who: str): """ "who: A - домашняя команда, B - гостевая""" with _latest_lock: lgd = latest_game_data # print(lgd) if not lgd or "data" not in lgd: return [{"details": "Нет данных по матчу!"}] players1_temp = lgd["data"]["players"][who] players1 = [] players1_f = [] players1_d = [] players1_g = [] goaltenders1 = [] for player in players1_temp: players1_temp[player]["mask"] = "#FF0000" # for s in seasonA_json: # if int(players1_temp[player]["id"]) == int(s["id"]): # for key, value in s.items(): # # Создаем новый ключ с префиксом # new_key = f"season_{key}" # players1_temp[player][new_key] = value if players1_temp[player]["ps"] not in ["в", "g"]: data1 = players1_temp[player] if "." in data1["name"]: lastname1, *names1 = data1["name"].split() names_new = " ".join(names1) elif len(data1["name"].split()) == 3: *lastname1, names1 = data1["name"].split() names_new = names1 lastname1 = " ".join(lastname1) else: lastname1, *names1 = data1["name"].split() names_new = " ".join(names1) if players1_temp[player]["ps"] == "н": position = "нападающий" elif players1_temp[player]["ps"] == "з": position = "защитник" elif players1_temp[player]["ps"] == "f": position = "forward" elif players1_temp[player]["ps"] == "d": position = "defenseman" data_with_number = { "number": player, "NameSurnameGFX": names_new + " " + lastname1, "NameGFX": names_new, "SurnameGFX": lastname1, "PositionGFX": position, **data1, } if players1_temp[player]["ps"] == "н": players1_f.append(data_with_number) elif players1_temp[player]["ps"] == "з": players1_d.append(data_with_number) elif players1_temp[player]["ps"] == "f": players1_f.append(data_with_number) elif players1_temp[player]["ps"] == "d": players1_d.append(data_with_number) else: data2 = players1_temp[player] position = "" if players1_temp[player]["ps"] == "в": position = "вратарь" elif players1_temp[player]["ps"] == "g": position = "Goaltender" lastname1, *names1 = data2["name"].split() names_new = " ".join(names1) data_with_number2 = { "number": player, "NameSurnameGFX": names_new + " " + lastname1, "NameGFX": names_new, "SurnameGFX": lastname1, "PositionGFX": position, **data2, } players1_g.append(data_with_number2) goaltenders1.append(data_with_number2) def make_empty(example_list): if not example_list: return {} return {key: "" for key in example_list[0].keys()} empty_d = make_empty(players1_d) empty_f = make_empty(players1_f) empty_g = make_empty(players1_g) # добивка пустыми слотами while len(players1_d) < 9: players1_d.append(empty_d.copy()) while len(players1_f) < 13: players1_f.append(empty_f.copy()) while len(players1_g) < 3: players1_g.append(empty_g.copy()) players1 = players1_d + players1_f + players1_g # print(len(players1)) return players1 @app.get("/team1") async def team1(): return await team("A") @app.get("/team2") async def team2(): return await team("B") # 👉 метка для первой строки (period row) def _period_label(period: str | int) -> str: s = str(period).strip().upper() # овертаймы: OT, OT1, OT2... или числовые >= 4 if s == "OT" or (s.startswith("OT") and s[2:].isdigit()): return "AFTER OVERTIME" if s.isdigit(): n = int(s) if n >= 4: return "AFTER OVERTIME" # 1–3 — с порядковым суффиксом if n % 100 in (11, 12, 13): suffix = "TH" else: suffix = {1: "ST", 2: "ND", 3: "RD"}.get(n % 10, "TH") return f"AFTER {n}{suffix} PERIOD" # например, Total if s == "TOTAL": return "FINAL" return "" # 👉 сортировка периодов: 1,2,3, затем OT/OT2/... def _period_sort_key(k: str) -> tuple[int, int]: s = str(k).strip().upper() # 1–3 — обычные периоды if s.isdigit(): n = int(s) if 1 <= n <= 3: return (n, 0) # 4-й и далее — трактуем как овертаймы (4 -> OT1, 5 -> OT2 ...) return (100, n - 3) # явные овертаймы: OT, OT1, OT2... if s == "OT": return (100, 1) if s.startswith("OT") and s[2:].isdigit(): return (100, int(s[2:])) # неизвестные — в конец return (200, 0) def _sorted_period_keys(teams_periods: dict) -> list[str]: a = teams_periods.get("A", {}) b = teams_periods.get("B", {}) keys = [k for k in a.keys() if k in b] return sorted(keys, key=_period_sort_key) def _current_period_key(payload: dict) -> str | None: keys = _sorted_period_keys(payload.get("teams_periods", {})) return keys[-1] if keys else None # >>> 1) Замените вашу async-функцию get_team_stat на обычную sync: def format_team_stat(team1: dict, team2: dict, period: str | None = None) -> list[dict]: """Форматирует статы двух команд в список записей для GFX (общие ключи + подписи).""" stat_list = [ ("coach_id", "coach id", "ID тренеров"), ("coach_fullname", "coach fullname", "Тренеры"), ("name", "name", "Команды"), ("outs", "Outs", ""), ("pim", "Penalty Minutes", "Минуты штрафа"), ("shots", "Shots on goal", "Броски в створ"), ("goals", "Goals", "Голы"), ("fo", "Face-offs", "Вбрасывания всего"), ("fow", "Face-offs won", "Вбрасывания"), ("fow_pct", "Face-offs won, %", "Выигранные вбрасывания, %"), ("hits", "Hits", "Силовые приемы"), ("bls", "Blocked shots", "Блокированные броски"), ("tka", "Takeaways", "Отборы"), ("toa", "Time on attack", "Время в атаке"), ("tie", "Even Strength Time on Ice", "Время в равных составах"), ("tipp", "Powerplay Time On Ice", "Время при большинстве"), ("tish", "Shorthanded Time On Ice", "Время при меньшинстве"), ("tien", "Emptry Net Time On Ice", "Время с пустыми воротами"), ("gva", "Giveaways", "Потери шайбы"), ("p_intc", "Pass interceptions", "Перехваты передачи"), ] teams = [{k: str(v) for k, v in t.items()} for t in [team1, team2]] keys = list(teams[0].keys()) formatted = [] if period is not None: formatted.append( { "name0": "period", "name1": str(period), "name2": "", "StatParameterGFX": _period_label(period) or "Period", } ) for key in keys: row = { "name0": key, "name1": teams[0].get(key, ""), "name2": teams[1].get(key, ""), } # подписи for code, eng, _ru in stat_list: if key == code: row["StatParameterGFX"] = eng break formatted.append(row) # постобработка отдельных полей for r in formatted: if r["name0"] == "fow_pct": r["name1"] = str(round(float(r["name1"]))) if r["name1"] else r["name1"] r["name2"] = str(round(float(r["name2"]))) if r["name2"] else r["name2"] if r["name0"] == "coach_fullname": # "Фамилия Имя" -> "Имя Фамилия" если есть пробел def flip(s: str) -> str: parts = s.split() return f"{parts[1]} {parts[0]}" if len(parts) >= 2 else s r["name1"] = flip(r["name1"]) r["name2"] = flip(r["name2"]) return formatted # >>> 2) Вспомогалки для обхода периодов def _iter_period_pairs(teams_periods: dict): """ Итерируем по периодам, отдаём (period_key, A_stats, B_stats). Ключи сортируем по числу, если это цифры. """ a = teams_periods.get("A", {}) b = teams_periods.get("B", {}) def _key(k): try: return int(k) except (TypeError, ValueError): return k for k in sorted(a.keys(), key=_key): if k in b: yield k, a[k], b[k] def _build_all_stats(payload: dict) -> dict: """ Собирает общий блок ('total') и список по периодам ('periods') из json матча. """ total_a = payload["teams"]["A"] total_b = payload["teams"]["B"] result = {"total": format_team_stat(total_a, total_b), "periods": []} for period_key, a_stat, b_stat in _iter_period_pairs(payload["teams_periods"]): result["periods"].append( {"period": period_key, "stats": format_team_stat(a_stat, b_stat)} ) return result @app.get("/teams/stats") async def teams_stats( scope: str = Query("all", pattern="^(all|total|period)$"), n: str | None = Query( None, description="Номер периода (строка или число) при scope=period" ), ): """ Все-в-одном: GET /teams/stats?scope=all вернёт { total: [...], periods: [{period: "1", stats:[...]}, ...] } Только общий: GET /teams/stats?scope=total Конкретный период: GET /teams/stats?scope=period&n=2 """ # читаем атомарно снапшот with _latest_lock: lgd = latest_game_data if not lgd or "data" not in lgd or not isinstance(lgd["data"], dict): raise HTTPException(status_code=400, detail="Нет данных по матчу.") payload = lgd["data"] if scope == "total": data = format_team_stat( payload["teams"]["A"], payload["teams"]["B"], period="Total" ) return JSONResponse({"scope": "total", "data": data}) if scope == "period": # 👉 если n не задан/или просит актуальный — берём последний период wants_current = (n is None) or (str(n).strip().lower() in {"current", "last"}) if wants_current: key = _current_period_key(payload) if key is None: raise HTTPException(status_code=404, detail="Периоды не найдены.") n = key # используем актуальный ключ периода # ключи в исходном json строковые period_key = str(n) a = payload["teams_periods"]["A"].get(period_key) b = payload["teams_periods"]["B"].get(period_key) if a is None or b is None: raise HTTPException( status_code=404, detail=f"Период {period_key} не найден." ) return JSONResponse( { "scope": "period", "period": period_key, "is_current": period_key == _current_period_key(payload), "data": format_team_stat(a, b, period=period_key), } ) # scope == "all" cur = _current_period_key(payload) return JSONResponse( {"scope": "all", "current_period": cur, "data": _build_all_stats(payload)} ) def _norm_name(s: str | None) -> str: """Нормализует название команды для сравнения.""" if not s: return "" return str(s).strip().casefold() def _load_buf(): buf = nasio.load_bio(user=USER, password=PASSWORD, nas_ip=SERVER_NAME, nas_port="443", path=PATH) if isinstance(buf, (bytes, bytearray, memoryview)): buf = io.BytesIO(buf) buf.seek(0) return buf @app.get("/info") async def info(format: str = "xlsx", sheet: str = "TEAMS"): # 1) Проверяем, выбран ли матч global current_season if not selected_game_id: return JSONResponse({"message": "Матч не выбран", "selected_id": None}) # 2) Берём расписание и ищем строку по выбранному ID df = load_today_schedule() if df.empty: return JSONResponse( {"message": "Сегодня матчей нет", "selected_id": selected_game_id} ) # безопасно приводим id к int и ищем try: row = df.loc[df["id"].astype(int) == int(selected_game_id)].iloc[0] except Exception: return JSONResponse( { "message": "Выбранный матч не найден в расписании на сегодня", "selected_id": selected_game_id, }, status_code=404, ) home_name = str(row.get("homeName_en", "")).strip() away_name = str(row.get("visitorName_en", "")).strip() # 3) Подтягиваем справочник команд из Excel (лист TEAMS) src = _load_buf() if format == "xlsx": # читаем нужный лист из исходного XLSM df = pd.read_excel(src, sheet_name=sheet, engine="openpyxl") # пишем НОВЫЙ XLSX (без макросов) — это то, что понимает vMix out = io.BytesIO() with pd.ExcelWriter(out, engine="openpyxl") as writer: df.to_excel(writer, sheet_name=sheet, index=False) out.seek(0) return StreamingResponse( out, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={ # стабильное имя файла, чтобы vMix не путался "Content-Disposition": "inline; filename=vmix.xlsx", # отключаем кэш браузера/прокси, vMix сам опрашивает по интервалу "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", }, ) elif format == "csv": df = pd.read_excel(src, sheet_name=sheet, engine="openpyxl") csv_bytes = df.to_csv(index=False, encoding="utf-8-sig").encode("utf-8") return StreamingResponse( io.BytesIO(csv_bytes), media_type="text/csv; charset=utf-8", headers={ "Content-Disposition": "inline; filename=vmix.csv", "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", }, ) elif format == "json": df = pd.read_excel(src, sheet_name=sheet, engine="openpyxl") payload = json.dumps(df.to_dict(orient="records"), ensure_ascii=False) return Response( content=payload, media_type="application/json; charset=utf-8", headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", }, ) return Response("Unsupported format", status_code=400) # # Оставляем только полезные поля (подгони под свой файл) # keep = ["Team", "Logo", "Short", "HexPodl", "HexBase", "HexText"] # keep = [c for c in keep if c in teams_df.columns] # teams_df = teams_df.loc[:, keep].copy() # # 4) Нормализованные ключи для джоина по имени # teams_df["__key"] = teams_df["Team"].apply(_norm_name) # def _pick_team_info(name: str) -> dict: # key = _norm_name(name) # hit = teams_df.loc[teams_df["__key"] == key] # if hit.empty: # # не нашли точное совпадение — вернём только название # return {"Team": name} # rec = hit.iloc[0].to_dict() # rec.pop("__key", None) # # заменим NaN/inf на None, чтобы JSON не падал # for k, v in list(rec.items()): # if pd.isna(v) or v in (np.inf, -np.inf): # rec[k] = None # return rec # home_info = _pick_team_info(home_name) # away_info = _pick_team_info(away_name) # date_obj = datetime.strptime(row.get("datetime_str", ""), "%d.%m.%Y %H:%M") # try: # full_format = date_obj.strftime("%B %-d, %Y") # except ValueError: # full_format = date_obj.strftime("%B %#d, %Y") # payload = [ # { # "selected_id": int(selected_game_id), # "tournament_id": ( # int(current_tournament_id) if current_tournament_id else None # ), # "datetime": str(full_format), # "arena": str(row.get("arena_en", "")), # "arena_city": str(row.get("arena_city_en", "")), # "home": home_info, # "home_city": str(row.get("homeCity_en", "")), # "away": away_info, # "away_city": str(row.get("visitorCity_en", "")), # "season": current_season, # } # ] # return JSONResponse(content=payload) # except Exception as ex: # pprint(ex) if __name__ == "__main__": uvicorn.run( "get_data:app", host="0.0.0.0", port=8000, reload=True, log_level="debug" )