Files
TIMERS/timer_saratov.py
Юрий Черненко a59a299d73 поправил таймер для Саратова:
1. если идет таймут, то на 24 секундах не отображается больше 24 секунд
2. когда заканчивается четверть, то в титр не приходят 2:00 или 15:00 (время перерыва) какое то время (хватит чтобы снять спокойно верхний счет)
3. таймер работает даже если vMix отключен (в случае перезагрузки vMix не нужно ребутать таймер)
4. в случае если подвиснет MOXA в получении данных, то соединение то должно переподключиться (нужно проверить)
5. убрал в парсинге сплит по FF 52, и добавил сплит, если пакет больше 18 байт, то сплит по 18 байту и дальше парсинг (проверил на старых и новых логах, все ок)
2025-11-11 23:44:48 +03:00

328 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import json
from socket import *
import logging
import os
import time
import binascii
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
# Полностью отключаем внутренние ретраи urllib3/requests
adapter = HTTPAdapter(
max_retries=Retry(total=0, connect=0, read=0, redirect=0, status=0)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({"Connection": "keep-alive"})
SETTING = "19200,None,8,1,None,Enable,RS-485(2 wire)"
HOST = "192.168.127.254"
PORT = 1993
_last_timer_text = None
_hold_zero_active = False
_hold_zero_t0 = 0.0
_HOLD_ZERO_SEC = 1.0 # длительность удержания 0:00 (сек)
_VMIX_URL = "http://127.0.0.1:8088/API/"
_VMIX_INPUT = "SCOREBUG"
_VMIX_COOLDOWN = 5.0 # сек: пауза после неудачи
_vmix_suppress_until = 0.0 # монотоник-время до которого не шлём
_vmix_online = None # None/True/False — для логов «online/offline» один раз
# --- MOXA reconnect/watchdog ---
_SOCK_RECV_TIMEOUT = 2.0 # сек: таймаут чтения сокета
_IDLE_RECONNECT = 5.0 # сек: если давно нет данных — переподключиться
_RETRY_BACKOFF = 1.0 # сек: начальный бэкофф между попытками
_RETRY_BACKOFF_MAX = 10.0 # сек: верхняя граница бэкоффа
_last_rx_mono = 0.0 # монотоник-время последнего полученного байта/кадра
def _hexspace(bs: bytes) -> str:
return " ".join(f"{b:02X}" for b in bs)
FRAME_SIZE = 18
SYNC = 0xA0 # первый байт кадра (из лога видно 0xA0)
TAIL = 0x00 # 18-й байт должен быть 0x00
_rx_buf = bytearray()
def send_data(name: str, value):
"""
«Мягкая» отправка в vMix. Любые сетевые ошибки подавляются,
чтобы парсер продолжал работать. Возвращает True/False.
"""
global _vmix_suppress_until, _vmix_online
# Не спамим, если недавно был фейл
now = time.monotonic()
if now < _vmix_suppress_until:
return False
try:
kind = name.split(".")[1]
except Exception:
kind = "Text"
par = "SetText" if kind == "Text" else "SetImage"
params = {
"Function": par,
"Input": _VMIX_INPUT,
"SelectedName": name,
"Value": value,
}
try:
# Короткий таймаут, чтобы не блокировать парсер
resp = session.get(_VMIX_URL, params=params, timeout=0.25)
# если раньше считали оффлайн — сообщим, что теперь онлайн
if _vmix_online is not True:
print("[vMix] online")
_vmix_online = True
return True
except requests.exceptions.RequestException as e:
# Перешли в оффлайн — логнём один раз и включим cooldown
if _vmix_online is not False:
msg = f"[vMix] offline: {e.__class__.__name__}: {e}"
print(msg)
_vmix_online = False
_vmix_suppress_until = now + _VMIX_COOLDOWN
return False
def _process_frame(frame: bytes):
# frame — ровно 18 байт
edata = _hexspace(frame)
# print("edata:", edata)
items = edata.split()
# индексы как у тебя
minutes = int(items[5], 16)
seconds_raw = int(items[6], 16)
# --- нормализация секунд ---
if seconds_raw < 60:
seconds = seconds_raw
timer_str_calc = f"{minutes}:{seconds:02d}"
else:
seconds = seconds_raw - 128
timer_str_calc = f"{minutes}.{seconds}"
# --- анти-дребезг 0.0 / 2:00 ---
global _hold_zero_active, _hold_zero_t0, _last_timer_text
now = time.monotonic()
# флаг: текущее время == 0:00
is_zero_time = minutes == 0 and seconds == 0
# флаг: внезапно появилось 2:00 (рывок после сирены)
is_two_zero = minutes == 2 and seconds == 0
if is_zero_time:
# удерживаем 0.0
_hold_zero_active = True
_hold_zero_t0 = now
timer_str = "0.0"
elif _hold_zero_active and is_two_zero:
# игнорируем «рывок» 2:00 и продолжаем показывать 0.0
timer_str = "0.0"
# сбрасываем удержание, если прошло больше HOLD_ZERO_SEC
if now - _hold_zero_t0 > _HOLD_ZERO_SEC:
_hold_zero_active = False
else:
# обычный случай
if _hold_zero_active and (now - _hold_zero_t0 > _HOLD_ZERO_SEC):
_hold_zero_active = False
timer_str = timer_str_calc
seconds_24_byte = int(items[7], 16)
timer_attack = ""
if seconds_24_byte == 0xFF:
# Специальное значение: не показывать
timer_attack = ""
elif seconds_24_byte > 0x7F:
# Режим десятых: значение в десятых после -128 (0..127 => 0.0..12.7 с)
t_dec = seconds_24_byte - 0x80 # десятые секунды
# правило ">24 c" тут не актуально: максимум 12.7 с, значит всегда показываем
timer_attack = f"{t_dec // 10}.{t_dec % 10}"
else:
# Режим целых секунд (0..127 c) — показываем только если ≤ 24 c
if seconds_24_byte <= 24:
timer_attack = str(seconds_24_byte)
else:
timer_attack = "" # > 24 c — не отображать
quarter = int(items[14][1], 16)
score1 = int(items[8], 16)
score2 = int(items[9], 16)
data = {
"TIMER.Text": timer_str,
"24sec.Text": timer_attack,
"SCORE1.Text": score1,
"SCORE2.Text": score2,
}
send_data("TIMER.Text", data["TIMER.Text"])
_last_timer_text = timer_str
send_data("24sec.Text", data["24sec.Text"])
send_data("SCORE1.Text", data["SCORE1.Text"])
send_data("SCORE2.Text", data["SCORE2.Text"])
x = [int(x, 16) for x in items]
print(f"{edata}, timer: {timer_str}, attack: {timer_attack}, Q: {quarter}, {x}")
def parse_new(chunk: bytes):
global _rx_buf, _last_rx_mono
if chunk == b"\xff" or not chunk:
return
_last_rx_mono = time.monotonic() # получили порцию данных — сброс idle
_rx_buf.extend(chunk)
# пытаемся вычленять кадры, пока хватает данных
while len(_rx_buf) >= FRAME_SIZE:
# если начало не синхронно — смещаемся к следующему SYNC (0xA0)
if _rx_buf[0] != SYNC:
# ищем ближайший возможный старт
try:
pos = _rx_buf.index(SYNC)
# отбросим мусор до sync
del _rx_buf[:pos]
except ValueError:
# sync не нашли — чистим всё, ждём ещё
_rx_buf.clear()
return
# если данных меньше кадра — ждём
if len(_rx_buf) < FRAME_SIZE:
return
frame = _rx_buf[:FRAME_SIZE]
# валидация «хвоста» кадра (18-й байт == 0x00)
if frame[FRAME_SIZE - 1] != TAIL:
# старт совпал, но не попали в границу кадра — сдвигаемся на байт
del _rx_buf[0]
continue
# опционально: простая проверка «RU» на позициях 9-10 (0x52 0x55) из твоего лога
# if not (frame[8] == 0x52 and frame[9] == 0x55):
# del _rx_buf[0]
# continue
# есть валидный кадр — обрабатываем
_process_frame(bytes(frame))
# удаляем его из буфера
del _rx_buf[:FRAME_SIZE]
def read_logs():
with open(
# r"C:\Code\timer\LOGS\timer_SARATOV_full.log",
r"C:\Code\timer\LOGS\timer_SARATOV copy.log",
# r"C:\Users\soule\Downloads\Telegram Desktop\timer_SARATOV.log",
"r",
) as file:
for line in file:
parts = line.strip().split(" DEBUG ")
if len(parts) == 2:
timestamp = parts[0]
data = eval(parts[1])
parse_new(data)
time.sleep(0.025)
def _connect_socket():
"""Создаёт и настраивает TCP-сокет до MOXA."""
s = socket(AF_INET, SOCK_STREAM)
# небольшой общий таймаут на операции, чтобы не подвисать
s.settimeout(_SOCK_RECV_TIMEOUT)
# опционально: keepalive (на Windows просто включает флаг)
try:
s.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
except OSError:
pass
s.connect((HOST, PORT))
# приветствие, как у тебя
hello = b"hello"
s.send(hello)
return s
def run_client():
"""Главный цикл: читает данные, парсит, в случае проблем — переподключается."""
global _last_rx_mono, _rx_buf
backoff = _RETRY_BACKOFF
s = None
_rx_buf = bytearray()
_last_rx_mono = time.monotonic()
while True:
try:
if s is None:
# попытка подключения
print(f"[MOXA] connecting to {HOST}:{PORT} ...")
s = _connect_socket()
print("[MOXA] connected")
backoff = _RETRY_BACKOFF # сброс бэкоффа после успеха
_last_rx_mono = time.monotonic()
# если слишком давно не приходят данные — форсим реконнект
if time.monotonic() - _last_rx_mono > _IDLE_RECONNECT:
raise TimeoutError(f"idle {time.monotonic() - _last_rx_mono:.1f}s")
# читаем порциями (таймаут на сокете стоит)
try:
chunk = s.recv(1024)
if not chunk:
# 0 байт => соединение закрыто удалённой стороной
raise ConnectionResetError("socket closed by peer")
# отдаём парсеру
logging.debug(chunk)
parse_new(chunk)
except timeout:
# это socket.timeout — просто проверим idle на следующей итерации
pass
except (ConnectionError, OSError, TimeoutError) as e:
# Любая ошибка — мягко закрываем и ждём перед повтором
if s is not None:
try:
s.close()
except Exception:
pass
s = None
print(f"[MOXA] reconnect in {backoff:.1f}s due to: {e}")
time.sleep(backoff)
backoff = min(backoff * 2, _RETRY_BACKOFF_MAX)
continue
def main():
if not os.path.isdir("LOGS"):
os.mkdir("LOGS")
LogFileName = f"LOGS/timer_SARATOV.log"
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(message)s",
filename=LogFileName,
filemode="w",
)
try:
run_client()
except KeyboardInterrupt:
sys.exit(1)
if __name__ == "__main__":
try:
main()
# read_logs()
except KeyboardInterrupt:
sys.exit(1)