# serial_to_vmix.py # pip install pyserial requests import time import argparse import socket import urllib.parse import requests import serial from serial.serialutil import SerialException # --------------------------- # 1) ВАША функция отправки в vMix # Замените содержимое по своему желанию. # --------------------------- # --- ВСТАВЬТЕ ВАШ serial_to_vmix.py вместо заглушки parse() --- import re from dataclasses import dataclass, asdict session = requests.Session() session.headers.update({"Connection": "keep-alive"}) VMIX_HOST = "http://127.0.0.1:8088" # адрес vMix Web Controller # куда писать поля в vMix — укажите свои Inputs/поля титров # 1) В блоке VMIX_TARGETS добавьте цель для секундника: VMIX_TARGETS = { "main_time": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "TIMER.Text", "SelectedIndex": None}, "shot_clock": {"Function": "SetText", "Input": "SCOREBUG","SelectedName": "24sec.Text", "SelectedIndex": None}, # ⬅️ ДОБАВЛЕНО "quarter": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": None, "SelectedIndex": None}, "score_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE1.Text", "SelectedIndex": None}, "score_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE2.Text", "SelectedIndex": None}, "fouls_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": None, "SelectedIndex": None}, "fouls_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": None, "SelectedIndex": None}, } def vmix_call(function: str, input_id: str, value: str, selected_name=None, selected_index=None, timeout=1.5): params = {"Function": function, "Input": input_id, "Value": value} if selected_name: params["SelectedName"] = selected_name if selected_index: params["SelectedIndex"] = selected_index url = f"{VMIX_HOST}/api?{urllib.parse.urlencode(params, doseq=True, safe='')}" try: r = session.get(url, timeout=timeout) r.raise_for_status() except requests.RequestException as e: print(f"[vMix] HTTP ошибка: {e} -> {url}") # 2) В State замените shot_clock (int) на текстовый shot_clock (str), чтобы иногда писать десятые: @dataclass class State: main_time: str | None = None # "M:SS" shot_clock: str | None = None # ⬅️ теперь строка: "21" или "4.9" quarter: int | None = None score_1: int | None = None score_2: int | None = None fouls_1: int | None = None fouls_2: int | None = None timeouts_1: int | None = None timeouts_2: int | None = None class SmartBasketParser: # строка «наша» если начинается с мусора и цифры 3, и содержит блок 0000 full_line_guard = re.compile(r'^\D*3\b') re_numbers = re.compile(r"\d+") def __init__(self): self.state = State() @staticmethod def _mmss_to_str(n: int) -> str: """ Универсальный форматтер: - < 100 -> 'ss:0' - 3 цифры, s<=59 -> 'M:SS' (MMSS) - 3 цифры, s>59 -> 'ss:ms' (SSd, где ms = десятые) - 4 цифры -> 'M:SS' (MMSS) """ if n < 0: return "0:00" # только секунды if n < 100: ss = max(0, min(n, 59)) return f"{ss}:0" # 3 цифры: либо M:SS, либо SS.d if 100 <= n <= 999: m = n // 100 s = n % 100 if s <= 59: return f"{m}:{s:02d}" # 102 -> 1:02 else: ss = n // 10 # 199 -> 19 d = n % 10 # 199 -> 9 return f"{ss}.{d}" # 199 -> 19:9 # 4 цифры: MMSS if 1000 <= n <= 5959: m, s = divmod(n, 100) if s >= 60: m += s // 60 s = s % 60 return f"{m}:{s:02d}" # всё прочее не считаем временем return "0:00" def _pick_main_time(self, nums: list[int]) -> str | None: """ В 'полной' строке основное время идёт сразу после ведущей '3'. Если следом '0' — берём через один. Примеры: [3, 0, 58, 35, 35, 2200, 1, ...] -> 58:0 [3, 199, 2, 13200, 1, ...] -> 19:9 [3, 102, 35, 35, 0000, 1, ...] -> 1:02 """ if not nums: return None # nums[0] — это 3 idx = 1 if len(nums) > 1 and nums[1] == 0: idx = 2 if idx < len(nums): t = nums[idx] # считаем временем только разумные токены: секунды/3-цифр. спец./MMSS if (0 <= t <= 59) or (100 <= t <= 999) or (1000 <= t <= 5959): return self._mmss_to_str(t) # fallback (на всякий): ищем первый разумный токен до первого 4-значного блока idx_flags = next((i for i, n in enumerate(nums) if len(str(n)) == 4), None) search_upto = idx_flags if idx_flags is not None else len(nums) for n in nums[:search_upto]: if (0 <= n <= 59) or (100 <= n <= 999) or (1000 <= n <= 5959): return self._mmss_to_str(n) return None @staticmethod def _format_main_time_compact(n: int) -> str: """ Компактный формат 'MMSSd' (например, 13200 -> 13:20.0). m = n // 1000 s = (n // 10) % 100 d = n % 10 (десятые) Вывод: если m>0 -> 'M:SS', если m==0 -> 'SS:d' (как вы просили '< 1 мин — ss:ms'). """ if n < 0: return "0:00" m = n // 1000 s = (n // 10) % 100 d = n % 10 if s >= 60: # подстраховка m += s // 60 s = s % 60 if m > 0: return f"{m}:{s:02d}" else: return f"{s}:{d}" def _format_shot_clock_from_tail_number(self, n: int) -> str | None: """ n — последнее число в строке (например, 21031). Правило валидности: если последние две цифры == 31 — обновляем таймер, если == 30 (или не 31) — не обновляем (возвращаем None). Расчёт: отбрасываем две последние цифры -> n // 100 (секунды * 10). >= 50 -> целые секунды ('21', '20', ...), иначе -> десятые ('4.9', ...). """ if n < 0: return None tail = n % 100 if tail != 31: # 30 — специально игнорируем; всё, что не 31 — тоже игнорим return None sec_tenths = n // 100 # «секунды × 10» if sec_tenths >= 50: return str(sec_tenths // 10) # «21», «20», ... else: return f"{sec_tenths / 10:.1f}" # «4.9», «4.8», ..., «0.0» def _pick_scores_flags_and_quarter(self, nums: list[int]): """ Ищем последовательность: ... s1 s2 F1F2T1T2 Q ... СЧЁТ ОБНОВЛЯЕМ ТОЛЬКО если нашли 4-значный блок флагов (1000..9999). Если флагов нет — возвращаем None для счёта/флагов/четверти, чтобы не перезатирать корректные прошлые значения. """ score1 = score2 = fouls1 = fouls2 = to1 = to2 = quarter = None # найдём первый 4-значный блок (флаги) idx_flags = next((i for i, n in enumerate(nums) if 1000 <= n <= 9999), None) if idx_flags is None: # нет флагов -> ничего не обновляем return score1, score2, fouls1, fouls2, to1, to2, quarter # перед флагами должны стоять два счёта if idx_flags >= 2: s1, s2 = nums[idx_flags - 2], nums[idx_flags - 1] if 0 <= s1 <= 300 and 0 <= s2 <= 300: score1, score2 = s1, s2 # разбираем флаги F1F2T1T2 f = f"{nums[idx_flags]:04d}" fouls1, fouls2, to1, to2 = map(int, f) # четверть — число сразу после флагов (1..10) if idx_flags + 1 < len(nums): q = nums[idx_flags + 1] if 1 <= q <= 10: quarter = q return score1, score2, fouls1, fouls2, to1, to2, quarter # 4) В parse_line() после nums = [...] добавьте вычисление секундника: def parse_line(self, line: str) -> State | None: if not (self.full_line_guard.search(line) and "0000" in line): return None parts = self.re_numbers.findall(line) if not parts: return None nums = [int(p) for p in parts] main_time_str = self._pick_main_time(nums) s1, s2, f1, f2, to1, to2, q = self._pick_scores_flags_and_quarter(nums) # ⬇️ новый блок: секундник из последнего числа # shot_clock_text = "" # ⬇️ новый безопасный блок секундника shot_clock_text = self.state.shot_clock # по умолчанию — предыдущее значение if nums: # nums уже есть выше: nums = [int(p) for p in parts] sc_new = self._format_shot_clock_from_tail_number(nums[-1]) if sc_new is not None: shot_clock_text = sc_new out = State( main_time=main_time_str or self.state.main_time, shot_clock=shot_clock_text, score_1 = s1 if s1 is not None else self.state.score_1, score_2 = s2 if s2 is not None else self.state.score_2, fouls_1 = f1 if f1 is not None else self.state.fouls_1, fouls_2 = f2 if f2 is not None else self.state.fouls_2, timeouts_1 = to1 if to1 is not None else self.state.timeouts_1, timeouts_2 = to2 if to2 is not None else self.state.timeouts_2, quarter = q if q is not None else self.state.quarter, ) return out def update_and_push(self, new_state: State): old = self.state for k, v in asdict(new_state).items(): if getattr(old, k) != v and v is not None: self._push_to_vmix(k, v) self.state = new_state # 5) В _push_to_vmix оставьте как есть — он уже приводит value к строке. # Если хотите реально отправлять в vMix — раскомментируйте vmix_call: def _push_to_vmix(self, key: str, value): t = VMIX_TARGETS.get(key) if not t: return # 👉 РАСКОММЕНТИРУЙТЕ для отправки в vMix vmix_call( function=t["Function"], input_id=t["Input"], value=str(value), selected_name=t.get("SelectedName"), selected_index=t.get("SelectedIndex"), ) # print(f"[push] {key} = {value}") _parser = SmartBasketParser() def parse(data: str): print(data) st = _parser.parse_line(data) if st: _parser.update_and_push(st) else: # полезно видеть, что игнорируем — закомментируйте, если мешает print("[parse] пропуск строки (не формат �����3 + 0000)") # --------------------------- # 2) Чтение из COM-порта и вызов parse # --------------------------- def read_loop(port: str, baud: int, newline: str, strip_whitespace: bool): # мапинг для окончания строк в pyserial eol = {"CRLF": b"\r\n", "LF": b"\n", "CR": b"\r"}.get(newline.upper(), b"\n") while True: try: with serial.Serial(port, baudrate=baud) as ser: print(f"[serial] открыт {ser.port} @ {baud} бод. Ожидаю данные...") buffer = bytearray() while True: chunk = ser.read(1024) if not chunk: continue buffer.extend(chunk) # разбираем по разделителю строк while True: idx = buffer.find(eol) if idx == -1: break line = buffer[:idx] del buffer[:idx + len(eol)] try: text = line.decode("utf-8", errors="replace") except Exception: text = line.decode("latin-1", errors="replace") if strip_whitespace: text = text.strip() if text != "": parse(text) except SerialException as e: print(f"[serial] не удалось открыть/читать {port}: {e}. Повтор через 3 сек...") time.sleep(3) except KeyboardInterrupt: print("\n[serial] остановлено пользователем.") break except Exception as e: print(f"[serial] непредвиденная ошибка: {e}. Повтор через 3 сек...") time.sleep(3) def main(): ap = argparse.ArgumentParser(description="Чтение COM и отправка в vMix через parse()") ap.add_argument("--port", required=True, help="COM-порт, напр. COM3 (Windows) или /dev/ttyUSB0 (Linux)") ap.add_argument("--baud", type=int, default=19200, help="Скорость, по умолчанию 19200") ap.add_argument("--newline", choices=["LF", "CRLF", "CR"], default="CR", help="Разделитель строк, по умолчанию LF") ap.add_argument("--no-strip", action="store_true", help="Не обрезать пробелы по краям (по умолчанию обрезаем)") args = ap.parse_args() read_loop(args.port, args.baud, args.newline, strip_whitespace=not args.no_strip) if __name__ == "__main__": main()