import os from dotenv import load_dotenv import logging import logging.config from pprint import pprint import pandas as pd from transliterate import translit import requests from time import sleep import datetime import sys from urllib.parse import urlparse from pathlib import PureWindowsPath import telebot from synology_drive_api.drive import SynologyDrive # Загрузка переменных окружения из файла load_dotenv('AF_environment.env') class Config: """Класс для хранения конфигурации""" def __init__(self): # Обязательные переменные окружения self.nas_user = os.getenv('NAS_USER') self.nas_pass = os.getenv('NAS_PASS') self.nas_ip = os.getenv('NAS_IP') self.nas_port = os.getenv('NAS_PORT') self.nas_file = os.getenv('NAS_FILE') self.token = os.getenv('TELEGRAM_TOKEN') self.group_chat = os.getenv('TELEGRAM_GROUP_CHAT') self.nexrender_url = os.getenv('NEXRENDER_URL') # Валидация конфигурации self._validate_config() def _validate_config(self): """Проверяет, что все обязательные переменные окружения установлены""" missing_vars = [] for var, value in vars(self).items(): if value is None: missing_vars.append(var.upper()) if missing_vars: raise ValueError( f"Отсутствуют обязательные переменные окружения: {', '.join(missing_vars)}. " "Пожалуйста, проверьте файл AF_environment.env" ) class JobManager: """Класс для управления задачами рендеринга""" def __init__(self, config): self.config = config self.bot = telebot.TeleBot(config.token) self.PLACEHOLDER = sys.platform == 'win32' if self.PLACEHOLDER: self._init_placeholders() def _init_placeholders(self): """Инициализация заглушек для тестирования""" from random import random, choices def send_job_dumb(data): if random() < 0.8: uid = ''.join(choices('abcdefghijklmnopqrstuvwxyz_', k=8)) return {'uid': uid, 'outname': data['outfile_name']} return None class FakeResp: def __init__(self, state='queued', *args, **kwargs): self.state = state self.status_code = 200 def json(self): return {'state': self.state} def fake_get(*args, **kwargs): rand = random() if rand < 0.8: return FakeResp() elif rand < 0.95: return FakeResp('finished') else: return FakeResp('error') self._send_job_real = self.send_job self.send_job = send_job_dumb self._fake_get = fake_get def setup_logging(self): """Настройка логирования""" LOG_CONFIG = { 'version': 1, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'level': 'DEBUG', 'formatter': 'simple', 'stream': 'ext://sys.stdout' }, 'file': { 'class': 'logging.FileHandler', 'level': 'DEBUG', 'formatter': 'simple', 'encoding': 'utf-8', 'filename': 'AF_script.log' }, }, 'loggers': { __name__: { 'handlers': ['console', 'file'], 'level': 'DEBUG' } }, 'formatters': { 'simple': { 'class': 'logging.Formatter', 'format': '%(asctime)s %(levelname)-8s %(funcName)12s() - %(message)s', 'datefmt': '%d.%m.%Y %H:%M:%S' } } } logging.config.dictConfig(LOG_CONFIG) self.logger = logging.getLogger(__name__) telebot.logger.addHandler(self.logger.handlers[0]) def load_osheet(self, message): """Загрузка файла с Synology Drive""" self.logger.debug('Получение данных') try: synd = SynologyDrive( self.config.nas_user, self.config.nas_pass, self.config.nas_ip, self.config.nas_port, https=True, dsm_version='7' ) self.logger.debug(synd.login()) # Проверка сессии try: self.logger.debug('Попытка загрузки таблицы') bio = synd.download_synology_office_file(self.config.nas_file) self.logger.debug('Успешная загрузка') return bio except Exception as e: self.logger.exception('Ошибка загрузки') self.bot.send_message( message.chat.id, 'Не удалось скачать таблицу', parse_mode='html' ) raise except Exception as e: self.logger.exception('Ошибка авторизации') self.bot.send_message( message.chat.id, 'Не удалось авторизоваться', parse_mode='html' ) raise def get_sheet_data(self, osheet, sheet_name, **kwargs): """Получение данных из листа Excel""" self.logger.debug(f'Чтение листа {sheet_name}') try: sheet = pd.read_excel(osheet, sheet_name=sheet_name, **kwargs) self.logger.debug('Успешное чтение') return sheet except Exception as e: self.logger.exception(f'Ошибка чтения листа {sheet_name}') raise def get_sport_logo(self, sport, pack, message): """Получение логотипа вида спорта""" self.logger.info(f'Получение оформления для {sport}') self.bot.send_message( message.chat.id, f'Ищем оформления для {sport}', parse_mode='html' ) try: d = pack.loc[sport]['LINK'] self.logger.debug(d) if pd.isna(d): self.logger.warning(f'Нет LINK для вида спорта "{sport}"') return '' return d except Exception as e: self.logger.exception(f"Не удалось получить оформление для {sport}") return '' def get_team_logo(self, team, sport, logos, message): """Получение логотипа команды""" self.logger.info(f'Получение логотипа {team}/{sport}') self.bot.send_message( message.chat.id, f'Поиск логотипа {sport}-{team}', parse_mode='html' ) try: d = logos.loc[team, sport]['LINK'] self.logger.debug(d) return d except KeyError: self.logger.warning(f"Нет LINK для {team}/{sport}") return '' except Exception as e: self.logger.exception(f"Ошибка при получении логотипа {sport}") return '' def make_data_dict(self, ds): """Создание словаря с данными""" return { 'date': ds['DATA'], 'time': ds['TIME'], 'channel': ds['CHANEL'], 'sport': ds['SPORT'], 'league': ds['LEAGUE'], 'TEAM A': ds['TEAM A'], 'TEAM B': ds['TEAM B'], 'index': ds.name } def unc2uri(self, unc): """Преобразование UNC пути в URI""" self.logger.debug('Преобразование пути') try: p = urlparse(unc) if len(p.scheme) > 2 or not unc: return unc else: p = PureWindowsPath(unc) return p.as_uri() except Exception as e: self.logger.exception('Ошибка преобразования пути') return unc def send_job(self, data, message): """Отправка задачи на рендеринг""" if self.PLACEHOLDER: return self.send_job_dumb(data) payload = { "template": { "src": "file:///c:/users/virtVmix-2/Downloads/PackShot_Sborka_eng.aepx", "composition": "pack", "outputModule": "Start_h264", "outputExt": "mp4" }, "actions": { "postrender": [ { "module": "@nexrender/action-encode", "preset": "mp4", "output": "encoded.mp4" }, { "module": "@nexrender/action-copy", "input": "encoded.mp4", "output": f"//10.10.35.3/edit/Auto_Anons/{data['outfile_name']}.mp4" } ] }, "assets": [] } # Добавление данных в payload self._add_data_to_payload(payload, data, message) url = self.config.nexrender_url try: r = requests.post(url, json=payload) if r.status_code == 200: res = r.json() uid = res['uid'] return {'uid': uid, 'outname': data['outfile_name']} except Exception as e: self.logger.exception('Ошибка отправки задачи') return None def _add_data_to_payload(self, payload, data, message): """Добавление данных в payload""" # Добавление даты self._add_date_data(payload, data, message) # Добавление времени self._add_time_data(payload, data, message) # Добавление лиги payload['assets'].append({ "type": "data", "layerName": "LEAGUE", "property": "Source Text", "value": data['league'] }) # Добавление спорта if data['sport']: payload['assets'].append({ "type": "data", "layerName": "SPORT", "property": "Source Text", "value": data['sport'] }) # Добавление команд и логотипов self._add_team_data(payload, data, message, 'A') self._add_team_data(payload, data, message, 'B') # Добавление оформления if data['pack']: payload['assets'].append({ "src": data['pack'], "type": "video", "layerName": "TOP" }) def _add_date_data(self, payload, data, message): """Добавление данных о дате""" if data['data'] == 'сегодня': self._add_specific_date_style(payload, data, message, "105", [0, 5]) elif data['data'] == 'завтра': self._add_specific_date_style(payload, data, message, "115", [0, 25]) elif len(data['data']) < 6: self._add_specific_date_style(payload, data, message, "120", [0, 20]) payload['assets'].append({ "type": "data", "layerName": "DATA", "property": "Source Text", "value": data['data'] }) def _add_specific_date_style(self, payload, data, message, font_size, anchor_point): """Добавление стилей для конкретной даты""" payload['assets'].extend([ { "layerName": "DATA", "property": "Source Text.fontSize", "type": "data", "value": font_size }, { "layerName": "DATA", "property": "transform.anchorPoint", "type": "data", "value": anchor_point } ]) self.logger.info(f'Для "{data["data"]}" шрифт установлен {font_size}') self.bot.send_message( message.chat.id, f'Для "{data["data"]}" размер шрифта установлен {font_size}', parse_mode='html' ) self.logger.info(f'Сдвиг "{data["data"]}" на {anchor_point} пикселей') self.bot.send_message( message.chat.id, f'Сдвигаем "{data["data"]}" на {anchor_point} пикселей', parse_mode='html' ) def _add_time_data(self, payload, data, message): """Добавление данных о времени""" if len(data['time_h']) < 2: anchor_point = [40, 0] for layer in ["TIME_H", "TIME_M", "TIME"]: payload['assets'].append({ "layerName": layer, "property": "transform.anchorPoint", "type": "data", "value": anchor_point }) self.logger.info(f'Сдвиг "{data["time_h"]}:{data["time_m"]}" на {anchor_point} пикселей') self.bot.send_message( message.chat.id, f'Сдвигаем "{data["time_h"]}:{data["time_m"]}" на {anchor_point} пикседей', parse_mode='html' ) payload['assets'].extend([ { "type": "data", "layerName": "TIME_H", "property": "Source Text", "value": data['time_h'] }, { "type": "data", "layerName": "TIME_M", "property": "Source Text", "value": data['time_m'] } ]) def _add_team_data(self, payload, data, message, team): """Добавление данных о команде""" team_key = f'team_{team.lower()}' if data[team_key]: payload['assets'].append({ "type": "data", "layerName": f"TEAM_{team}", "property": "Source Text", "value": data[team_key] }) logo_key = f'{team_key}_logo' if data[logo_key]: payload['assets'].append({ "src": data[logo_key], "type": "image", "layerName": f"TEAM_{team}_LOGO" }) logo_res_key = f'{team_key}_logo_res' if data.get(logo_res_key): payload['assets'].append({ "property": "scale", "type": "data", "expression": f"if (width > height) {{max_size = width;}} else {{max_size = height;}} var real_size = {data[logo_res_key][0]}/max_size*100;[real_size,real_size]", "layerName": f"TEAM_{team}_LOGO" }) self.logger.info(f'{data[team_key]} логотип изменен до {data[logo_res_key][0]}') self.bot.send_message( message.chat.id, f'{data[team_key]} масштабирован под {data[logo_res_key][0]} пикселей', parse_mode='html' ) def make_job_dicts(self, dd, pack, logos, message): """Создание задач рендеринга""" self.logger.debug('Начало создания имени') fn = '' data = {} empty_sport = pack.iloc[0].name # Дата if isinstance(dd['date'], str): fn += f"{dd['date'][6:]}{dd['date'][3:5]}{dd['date'][0:2]}" d = dd['date'].split('.') data['data'] = f"{int(d[0])} {['','января','февраля','марта','апреля','мая','июня','июля','августа','сентября','октября','ноября','декабря'][int(d[1])]}" elif isinstance(dd['date'], datetime.date): fn += f"{dd['date'].year}{dd['date'].month:02}{dd['date'].day:02}" data['data'] = f"{dd['date'].day} {['','января','февраля','марта','апреля','мая','июня','июля','августа','сентября','октября','ноября','декабря'][dd['date'].month]}" # Вид спорта и оформление if dd['sport'] != empty_sport: fn += f"_{dd['sport']}" data['sport'] = dd['sport'] data['pack'] = self.unc2uri(self.get_sport_logo(dd['sport'], pack, message)) else: data['sport'] = '' data['pack'] = '' # Лига if dd["league"][-1] == '.': self.logger.debug('Точка в названии лиги!') fn += f'_{dd["league"][:-1]}' data['league'] = dd['league'][:-1] else: data['league'] = dd['league'] fn += f'_{dd["league"]}' # Команды self._process_team_data(dd, data, fn, 'A', logos, message) self._process_team_data(dd, data, fn, 'B', logos, message) # Канал if not pd.isna(dd['channel']): self.logger.debug('Канал установлен ' + dd['channel']) fn += f"_{dd['channel']}" # Финальное форматирование имени файла fn = translit(fn, reversed=True) fn = fn.replace(' ', '-').replace("'", '') data['outfile_name'] = fn # Время if isinstance(dd['time'], str): t = dd['time'].split(':') data['time_h'] = t[0] data['time_m'] = t[1] elif isinstance(dd['time'], datetime.time): data['time_h'] = str(dd['time'].hour) data['time_m'] = str(dd['time'].minute) self.logger.debug('Время ' + data['time_h'] + ':' + data['time_m']) self.logger.debug("Конец создания имени") # Создание задач watch_list = [] watch_list.append(self.send_job(data, message)) if True: # TODO: Заменить на условие, если нужно data['data'] = 'сегодня' data['outfile_name'] = fn + '_Today' watch_list.append(self.send_job(data, message)) data['data'] = 'завтра' data['outfile_name'] = fn + '_Tomorrow' watch_list.append(self.send_job(data, message)) pprint(watch_list) return list(filter(None, watch_list)) def _process_team_data(self, dd, data, fn, team, logos, message): """Обработка данных команды""" team_key = f'team_{team.lower()}' if pd.isna(dd[f'TEAM {team}']): self.logger.info(f'Нет команды {team}') self.bot.send_message( message.chat.id, f'Нет команды {team}', parse_mode='html' ) data[team_key] = '' data[f'{team_key}_logo'] = '' data[f'{team_key}_logo_res'] = '' else: name = dd[f'TEAM {team}'].split('#') fn += f"_{name[0]}" data[f'{team_key}_logo_res'] = name[2:] data[team_key] = name[0] data[f'{team_key}_logo'] = self.unc2uri( self.get_team_logo(dd[f'TEAM {team}'], dd['sport'], logos, message) ) def run(self): """Запуск бота""" @self.bot.message_handler(commands=['help', 'start']) def send_welcome(message): self.bot.send_chat_action(message.chat.id, 'typing') user = f" {message.from_user.username}" if message.from_user.username else '!' sleep(1) self.bot.reply_to( message, f"Привет{user}\nЯ помогу тебе сделать Анонсы!\nВот список команд которые я могу выполнить:\n/ибаш - наибашу обработку и рендер!\n/харе - остановит нах!" ) @self.bot.message_handler(commands=['чёкак', 'status']) def status(message): self.logger.info(f'Статус запрошен {message.from_user.username}') try: r = requests.get(self.config.nexrender_url) if r.status_code == 200: jobs = r.json() s = [{'uid': i['uid'], 'state': i['state']} for i in jobs] queued = sum(1 for job in s if job['state'] in ('queued', 'picked')) if s: self.logger.info(f"{queued} в очереди") self.bot.send_message( message.chat.id, f"В очереди {queued}" ) else: self.logger.info("Нет задач в очереди") self.bot.send_message( message.chat.id, "Нет задач в очереди" ) except Exception as e: self.logger.exception("Ошибка получения статуса") self.bot.send_message( message.chat.id, "Ошибка получения статуса" ) @self.bot.message_handler(commands=['харе', 'stop']) def stop(message): try: r = requests.get(self.config.nexrender_url) if r.status_code == 200: jobs = r.json() s = [{'uid': i['uid'], 'state': i['state']} for i in jobs] cancelled = 0 if s: for job in s: requests.delete(f"{self.config.nexrender_url}/{job['uid']}") cancelled += 1 self.logger.info(f"Отменено {cancelled} задач пользователем {message.from_user.username}") self.bot.send_message( message.chat.id, f"Отменено {cancelled}" ) else: self.logger.info(f"{message.from_user.username} запросил отмену, но нет задач для отмены") self.bot.send_message( message.chat.id, "Нет задач для отмены" ) except Exception as e: self.logger.exception("Ошибка отмены задач") self.bot.send_message( message.chat.id, "Ошибка отмены задач" ) @self.bot.message_handler(commands=['ибаш', 'ibash']) def ibash(message): self.logger.info(f'Запуск задач для {message.from_user.username}') self.bot.send_chat_action(message.chat.id, 'typing') user = message.from_user.username if message.from_user.username else '!' self.bot.send_message( message.chat.id, f"Ну что ж {user}, давай попробуем \nНАИБАШИТЬ!!!", parse_mode='html' ) self.bot.send_chat_action(message.chat.id, 'upload_document') try: osheet = self.load_osheet(message) start = self.get_sheet_data(osheet, 'Start', header=1) start = start[start['STATE'] == False] # Проверка "первая" pack = self.get_sheet_data(osheet, 'SPORT', header=0, index_col='SPORT') pack = pack[pack.index.notna()] logos = self.get_sheet_data(osheet, 'TEAMS', header=0, index_col=[0, 1]) # Очистка старых задач try: r = requests.get(self.config.nexrender_url) if r.status_code == 200: jobs = r.json() for job in jobs: if job['state'] in ('finished', 'error'): requests.delete(f"{self.config.nexrender_url}/{job['uid']}") except Exception as e: self.logger.exception("Ошибка очистки старых задач") self.bot.send_chat_action(message.chat.id, 'record_video') watch_list = [] for i, row in start.iterrows(): dd = self.make_data_dict(row) watch_list += self.make_job_dicts(dd, pack, logos, message) self.logger.info(f"В очереди {len(watch_list)} задач") self.bot.send_message( message.chat.id, f"В очереди {len(watch_list)} задач" ) while watch_list: self.bot.send_chat_action(message.chat.id, 'record_video') sleep(25) for job in watch_list[:]: # Копия списка для итерации try: if self.PLACEHOLDER: r = self._fake_get() else: r = requests.get(f"{self.config.nexrender_url}/{job['uid']}") if r.status_code == 200: state = r.json()['state'] if state == 'finished': watch_list.remove(job) self.logger.info(f"{job['outname']}, {state}, {len(watch_list)} осталось") self.bot.send_message( message.chat.id, f"{job['outname']}, готов, {len(watch_list)} осталось выполнить", parse_mode='html' ) elif state == 'error': watch_list.remove(job) self.logger.warning(f"{job}, {state}, {len(watch_list)} осталось") self.bot.send_message( message.chat.id, f"!!!{job}, {state}, {len(watch_list)} осталось выполнить", parse_mode='html' ) except Exception as e: self.logger.exception(f"Ошибка проверки статуса задачи {job['uid']}") self.bot.send_message(message.chat.id, 'Пойду спать :)') except Exception as e: self.logger.exception("Ошибка выполнения команды ибаш") self.bot.send_message( message.chat.id, "Произошла ошибка при обработке команды" ) self.logger.info('Запуск бота') self.bot.infinity_polling() self.logger.info('Завершение работы бота') if __name__ == '__main__': try: # Проверяем, что файл окружения существует if not os.path.exists('AF_environment.env'): raise FileNotFoundError( "Файл окружения AF_environment.env не найден. " ) config = Config() job_manager = JobManager(config) job_manager.setup_logging() job_manager.run() except FileNotFoundError as e: logging.error(str(e)) print(str(e)) sys.exit(1) except ValueError as e: logging.error(f"Ошибка конфигурации: {e}") print(f"Ошибка конфигурации: {e}") sys.exit(1) except Exception as e: logging.error(f"Неожиданная ошибка: {e}", exc_info=True) print(f"Неожиданная ошибка: {e}") sys.exit(1)