346 lines
15 KiB
Python
346 lines
15 KiB
Python
# serial_to_vmix.py
|
||
# pip install pyserial requests
|
||
|
||
import time
|
||
import argparse
|
||
import socket
|
||
import urllib.parse
|
||
import requests
|
||
import serial
|
||
from serial.serialutil import SerialException
|
||
|
||
# ---------------------------
|
||
# 1) ВАША функция отправки в vMix
|
||
# Замените содержимое по своему желанию.
|
||
# ---------------------------
|
||
# --- ВСТАВЬТЕ ВАШ serial_to_vmix.py вместо заглушки parse() ---
|
||
|
||
import re
|
||
from dataclasses import dataclass, asdict
|
||
|
||
session = requests.Session()
|
||
session.headers.update({"Connection": "keep-alive"})
|
||
|
||
VMIX_HOST = "http://127.0.0.1:8088" # адрес vMix Web Controller
|
||
|
||
# куда писать поля в vMix — укажите свои Inputs/поля титров
|
||
# 1) В блоке VMIX_TARGETS добавьте цель для секундника:
|
||
VMIX_TARGETS = {
|
||
"main_time": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "TIMER.Text", "SelectedIndex": None},
|
||
"shot_clock": {"Function": "SetText", "Input": "SCOREBUG","SelectedName": "24sec.Text", "SelectedIndex": None}, # ⬅️ ДОБАВЛЕНО
|
||
"quarter": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": None, "SelectedIndex": None},
|
||
"score_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE1.Text", "SelectedIndex": None},
|
||
"score_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE2.Text", "SelectedIndex": None},
|
||
"fouls_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": None, "SelectedIndex": None},
|
||
"fouls_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": None, "SelectedIndex": None},
|
||
}
|
||
|
||
def vmix_call(function: str, input_id: str, value: str, selected_name=None, selected_index=None, timeout=1.5):
|
||
params = {"Function": function, "Input": input_id, "Value": value}
|
||
if selected_name:
|
||
params["SelectedName"] = selected_name
|
||
if selected_index:
|
||
params["SelectedIndex"] = selected_index
|
||
url = f"{VMIX_HOST}/api?{urllib.parse.urlencode(params, doseq=True, safe='')}"
|
||
try:
|
||
r = session.get(url, timeout=timeout)
|
||
r.raise_for_status()
|
||
except requests.RequestException as e:
|
||
print(f"[vMix] HTTP ошибка: {e} -> {url}")
|
||
|
||
# 2) В State замените shot_clock (int) на текстовый shot_clock (str), чтобы иногда писать десятые:
|
||
@dataclass
|
||
class State:
|
||
main_time: str | None = None # "M:SS"
|
||
shot_clock: str | None = None # ⬅️ теперь строка: "21" или "4.9"
|
||
quarter: int | None = None
|
||
score_1: int | None = None
|
||
score_2: int | None = None
|
||
fouls_1: int | None = None
|
||
fouls_2: int | None = None
|
||
timeouts_1: int | None = None
|
||
timeouts_2: int | None = None
|
||
|
||
|
||
class SmartBasketParser:
|
||
# строка «наша» если начинается с мусора и цифры 3, и содержит блок 0000
|
||
full_line_guard = re.compile(r'^\D*3\b')
|
||
re_numbers = re.compile(r"\d+")
|
||
|
||
def __init__(self):
|
||
self.state = State()
|
||
|
||
@staticmethod
|
||
def _mmss_to_str(n: int) -> str:
|
||
"""
|
||
Универсальный форматтер:
|
||
- < 100 -> 'ss:0'
|
||
- 3 цифры, s<=59 -> 'M:SS' (MMSS)
|
||
- 3 цифры, s>59 -> 'ss:ms' (SSd, где ms = десятые)
|
||
- 4 цифры -> 'M:SS' (MMSS)
|
||
"""
|
||
if n < 0:
|
||
return "0:00"
|
||
|
||
# только секунды
|
||
if n < 100:
|
||
ss = max(0, min(n, 59))
|
||
return f"{ss}:0"
|
||
|
||
# 3 цифры: либо M:SS, либо SS.d
|
||
if 100 <= n <= 999:
|
||
m = n // 100
|
||
s = n % 100
|
||
if s <= 59:
|
||
return f"{m}:{s:02d}" # 102 -> 1:02
|
||
else:
|
||
ss = n // 10 # 199 -> 19
|
||
d = n % 10 # 199 -> 9
|
||
return f"{ss}.{d}" # 199 -> 19:9
|
||
|
||
# 4 цифры: MMSS
|
||
if 1000 <= n <= 5959:
|
||
m, s = divmod(n, 100)
|
||
if s >= 60:
|
||
m += s // 60
|
||
s = s % 60
|
||
return f"{m}:{s:02d}"
|
||
|
||
# всё прочее не считаем временем
|
||
return "0:00"
|
||
|
||
def _pick_main_time(self, nums: list[int]) -> str | None:
|
||
"""
|
||
В 'полной' строке основное время идёт сразу после ведущей '3'.
|
||
Если следом '0' — берём через один.
|
||
Примеры:
|
||
[3, 0, 58, 35, 35, 2200, 1, ...] -> 58:0
|
||
[3, 199, 2, 13200, 1, ...] -> 19:9
|
||
[3, 102, 35, 35, 0000, 1, ...] -> 1:02
|
||
"""
|
||
if not nums:
|
||
return None
|
||
# nums[0] — это 3
|
||
idx = 1
|
||
if len(nums) > 1 and nums[1] == 0:
|
||
idx = 2
|
||
if idx < len(nums):
|
||
t = nums[idx]
|
||
# считаем временем только разумные токены: секунды/3-цифр. спец./MMSS
|
||
if (0 <= t <= 59) or (100 <= t <= 999) or (1000 <= t <= 5959):
|
||
return self._mmss_to_str(t)
|
||
|
||
# fallback (на всякий): ищем первый разумный токен до первого 4-значного блока
|
||
idx_flags = next((i for i, n in enumerate(nums) if len(str(n)) == 4), None)
|
||
search_upto = idx_flags if idx_flags is not None else len(nums)
|
||
for n in nums[:search_upto]:
|
||
if (0 <= n <= 59) or (100 <= n <= 999) or (1000 <= n <= 5959):
|
||
return self._mmss_to_str(n)
|
||
|
||
return None
|
||
|
||
@staticmethod
|
||
def _format_main_time_compact(n: int) -> str:
|
||
"""
|
||
Компактный формат 'MMSSd' (например, 13200 -> 13:20.0).
|
||
m = n // 1000
|
||
s = (n // 10) % 100
|
||
d = n % 10 (десятые)
|
||
Вывод: если m>0 -> 'M:SS', если m==0 -> 'SS:d' (как вы просили '< 1 мин — ss:ms').
|
||
"""
|
||
if n < 0:
|
||
return "0:00"
|
||
m = n // 1000
|
||
s = (n // 10) % 100
|
||
d = n % 10
|
||
if s >= 60: # подстраховка
|
||
m += s // 60
|
||
s = s % 60
|
||
if m > 0:
|
||
return f"{m}:{s:02d}"
|
||
else:
|
||
return f"{s}:{d}"
|
||
|
||
def _format_shot_clock_from_tail_number(self, n: int) -> str | None:
|
||
"""
|
||
n — последнее число в строке (например, 21031).
|
||
Правило валидности: если последние две цифры == 31 — обновляем таймер,
|
||
если == 30 (или не 31) — не обновляем (возвращаем None).
|
||
Расчёт: отбрасываем две последние цифры -> n // 100 (секунды * 10).
|
||
>= 50 -> целые секунды ('21', '20', ...), иначе -> десятые ('4.9', ...).
|
||
"""
|
||
if n < 0:
|
||
return None
|
||
|
||
tail = n % 100
|
||
if tail != 31:
|
||
# 30 — специально игнорируем; всё, что не 31 — тоже игнорим
|
||
return None
|
||
|
||
sec_tenths = n // 100 # «секунды × 10»
|
||
if sec_tenths >= 50:
|
||
return str(sec_tenths // 10) # «21», «20», ...
|
||
else:
|
||
return f"{sec_tenths / 10:.1f}" # «4.9», «4.8», ..., «0.0»
|
||
|
||
|
||
def _pick_scores_flags_and_quarter(self, nums: list[int]):
|
||
"""
|
||
Ищем последовательность: ... s1 s2 F1F2T1T2 Q ...
|
||
СЧЁТ ОБНОВЛЯЕМ ТОЛЬКО если нашли 4-значный блок флагов (1000..9999).
|
||
Если флагов нет — возвращаем None для счёта/флагов/четверти,
|
||
чтобы не перезатирать корректные прошлые значения.
|
||
"""
|
||
score1 = score2 = fouls1 = fouls2 = to1 = to2 = quarter = None
|
||
|
||
# найдём первый 4-значный блок (флаги)
|
||
idx_flags = next((i for i, n in enumerate(nums) if 1000 <= n <= 9999), None)
|
||
if idx_flags is None:
|
||
# нет флагов -> ничего не обновляем
|
||
return score1, score2, fouls1, fouls2, to1, to2, quarter
|
||
|
||
# перед флагами должны стоять два счёта
|
||
if idx_flags >= 2:
|
||
s1, s2 = nums[idx_flags - 2], nums[idx_flags - 1]
|
||
if 0 <= s1 <= 300 and 0 <= s2 <= 300:
|
||
score1, score2 = s1, s2
|
||
|
||
# разбираем флаги F1F2T1T2
|
||
f = f"{nums[idx_flags]:04d}"
|
||
fouls1, fouls2, to1, to2 = map(int, f)
|
||
|
||
# четверть — число сразу после флагов (1..10)
|
||
if idx_flags + 1 < len(nums):
|
||
q = nums[idx_flags + 1]
|
||
if 1 <= q <= 10:
|
||
quarter = q
|
||
|
||
return score1, score2, fouls1, fouls2, to1, to2, quarter
|
||
|
||
# 4) В parse_line() после nums = [...] добавьте вычисление секундника:
|
||
def parse_line(self, line: str) -> State | None:
|
||
if not (self.full_line_guard.search(line) and "0000" in line):
|
||
return None
|
||
|
||
parts = self.re_numbers.findall(line)
|
||
if not parts:
|
||
return None
|
||
nums = [int(p) for p in parts]
|
||
|
||
main_time_str = self._pick_main_time(nums)
|
||
s1, s2, f1, f2, to1, to2, q = self._pick_scores_flags_and_quarter(nums)
|
||
|
||
# ⬇️ новый блок: секундник из последнего числа
|
||
# shot_clock_text = ""
|
||
# ⬇️ новый безопасный блок секундника
|
||
shot_clock_text = self.state.shot_clock # по умолчанию — предыдущее значение
|
||
if nums: # nums уже есть выше: nums = [int(p) for p in parts]
|
||
sc_new = self._format_shot_clock_from_tail_number(nums[-1])
|
||
if sc_new is not None:
|
||
shot_clock_text = sc_new
|
||
|
||
out = State(
|
||
main_time=main_time_str or self.state.main_time,
|
||
shot_clock=shot_clock_text,
|
||
score_1 = s1 if s1 is not None else self.state.score_1,
|
||
score_2 = s2 if s2 is not None else self.state.score_2,
|
||
fouls_1 = f1 if f1 is not None else self.state.fouls_1,
|
||
fouls_2 = f2 if f2 is not None else self.state.fouls_2,
|
||
timeouts_1 = to1 if to1 is not None else self.state.timeouts_1,
|
||
timeouts_2 = to2 if to2 is not None else self.state.timeouts_2,
|
||
quarter = q if q is not None else self.state.quarter,
|
||
)
|
||
return out
|
||
|
||
def update_and_push(self, new_state: State):
|
||
old = self.state
|
||
for k, v in asdict(new_state).items():
|
||
if getattr(old, k) != v and v is not None:
|
||
self._push_to_vmix(k, v)
|
||
self.state = new_state
|
||
|
||
# 5) В _push_to_vmix оставьте как есть — он уже приводит value к строке.
|
||
# Если хотите реально отправлять в vMix — раскомментируйте vmix_call:
|
||
|
||
def _push_to_vmix(self, key: str, value):
|
||
t = VMIX_TARGETS.get(key)
|
||
if not t:
|
||
return
|
||
# 👉 РАСКОММЕНТИРУЙТЕ для отправки в vMix
|
||
vmix_call(
|
||
function=t["Function"],
|
||
input_id=t["Input"],
|
||
value=str(value),
|
||
selected_name=t.get("SelectedName"),
|
||
selected_index=t.get("SelectedIndex"),
|
||
)
|
||
# print(f"[push] {key} = {value}")
|
||
|
||
_parser = SmartBasketParser()
|
||
|
||
def parse(data: str):
|
||
print(data)
|
||
st = _parser.parse_line(data)
|
||
if st:
|
||
_parser.update_and_push(st)
|
||
else:
|
||
# полезно видеть, что игнорируем — закомментируйте, если мешает
|
||
print("[parse] пропуск строки (не формат <20><><EFBFBD><EFBFBD><EFBFBD>3 + 0000)")
|
||
|
||
# ---------------------------
|
||
# 2) Чтение из COM-порта и вызов parse
|
||
# ---------------------------
|
||
def read_loop(port: str, baud: int, newline: str, strip_whitespace: bool):
|
||
# мапинг для окончания строк в pyserial
|
||
eol = {"CRLF": b"\r\n", "LF": b"\n", "CR": b"\r"}.get(newline.upper(), b"\n")
|
||
|
||
while True:
|
||
try:
|
||
with serial.Serial(port, baudrate=baud) as ser:
|
||
print(f"[serial] открыт {ser.port} @ {baud} бод. Ожидаю данные...")
|
||
buffer = bytearray()
|
||
while True:
|
||
chunk = ser.read(1024)
|
||
if not chunk:
|
||
continue
|
||
buffer.extend(chunk)
|
||
# разбираем по разделителю строк
|
||
while True:
|
||
idx = buffer.find(eol)
|
||
if idx == -1:
|
||
break
|
||
line = buffer[:idx]
|
||
del buffer[:idx + len(eol)]
|
||
try:
|
||
text = line.decode("utf-8", errors="replace")
|
||
except Exception:
|
||
text = line.decode("latin-1", errors="replace")
|
||
if strip_whitespace:
|
||
text = text.strip()
|
||
if text != "":
|
||
parse(text)
|
||
except SerialException as e:
|
||
print(f"[serial] не удалось открыть/читать {port}: {e}. Повтор через 3 сек...")
|
||
time.sleep(3)
|
||
except KeyboardInterrupt:
|
||
print("\n[serial] остановлено пользователем.")
|
||
break
|
||
except Exception as e:
|
||
print(f"[serial] непредвиденная ошибка: {e}. Повтор через 3 сек...")
|
||
time.sleep(3)
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description="Чтение COM и отправка в vMix через parse()")
|
||
ap.add_argument("--port", required=True, help="COM-порт, напр. COM3 (Windows) или /dev/ttyUSB0 (Linux)")
|
||
ap.add_argument("--baud", type=int, default=19200, help="Скорость, по умолчанию 19200")
|
||
ap.add_argument("--newline", choices=["LF", "CRLF", "CR"], default="CR",
|
||
help="Разделитель строк, по умолчанию LF")
|
||
ap.add_argument("--no-strip", action="store_true",
|
||
help="Не обрезать пробелы по краям (по умолчанию обрезаем)")
|
||
args = ap.parse_args()
|
||
|
||
read_loop(args.port, args.baud, args.newline, strip_whitespace=not args.no_strip)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|