# vmix_serial_bridge_optimized.py # pip install pyserial requests """ Оптимизированная версия COM -> vMix моста для максимально быстрой доставки данных в vMix. Ключевые изменения: - Немедленная обработка входящих байт: неблокирующее чтение с малой задержкой (timeout) и мгновенная отправка строк при появлении EOL или коротком «простоя». - Параметризуемые интервалы: --read-timeout, --idle-flush для тонкой настройки задержки между получением и отправкой. - Убраны лишние print'ы, добавлен флаг --verbose для отладки без штрафа к скорости. - Дедупликация значений к vMix включаемая флагом (по умолчанию включена). - Предкомпиляция регулярных выражений и мелкие микроптимизации парсера. - Очередь отправки без блокировок и аккуратное «сбросить-старое-сообщение» поведение на переполнении. Совместима с прежней CLI, но добавлены новые опции: --read-timeout (сек, по умолчанию 0.03) --idle-flush (сек, по умолчанию 0.06) --max-chunk (байт, по умолчанию 512) --verbose (подробные логи) Пример запуска: python vmix_serial_bridge_optimized.py --port COM2 --baud 19200 --autoeol \ --send-quarter --dedupe --read-timeout 0.02 --idle-flush 0.05 """ import time import argparse import urllib.parse import requests import serial from serial.serialutil import SerialException from serial.tools import list_ports import re from dataclasses import dataclass from threading import Thread, Event from queue import Queue, Empty from typing import Optional, Dict VMIX_HOST = "http://127.0.0.1:8088" VMIX_TARGETS = { "main_time": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "TIMER.Text", "SelectedIndex": None}, "shot_clock": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "24sec.Text", "SelectedIndex": None}, "quarter": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "QUARTER.Text", "SelectedIndex": None}, "score_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE1.Text", "SelectedIndex": None}, "score_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE2.Text", "SelectedIndex": None}, "fouls_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "FOULS1.Text", "SelectedIndex": None}, "fouls_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "FOULS2.Text", "SelectedIndex": None}, } # ---------------- HTTP client ---------------- class VmixClient: def __init__(self, host: str, timeout: float = 0.01, retries: int = 2, backoff: float = 0.2): self.host = host.rstrip("/") self.timeout = timeout self.retries = retries self.backoff = backoff self.session = requests.Session() self.session.headers.update({"Connection": "keep-alive"}) def call(self, function: str, input_id: str, value: str, selected_name=None, selected_index=None) -> bool: params = {"Function": function, "Input": input_id, "Value": value} if selected_name: params["SelectedName"] = selected_name if selected_index is not None: params["SelectedIndex"] = selected_index url = f"{self.host}/api?{urllib.parse.urlencode(params, doseq=True, safe='')}" for attempt in range(1, self.retries + 2): try: r = self.session.get(url, timeout=self.timeout) r.raise_for_status() return True except requests.RequestException as e: if attempt > self.retries: return False time.sleep(self.backoff * attempt) return False @dataclass class VmixMessage: field: str value: str class VmixSender: def __init__(self, client: VmixClient, dedupe: bool = True, max_queue: int = 4096, verbose: bool = False): self.client = client self.queue: Queue[VmixMessage] = Queue(maxsize=max_queue) self.stop_event = Event() self.thread = Thread(target=self._run, name="vmix-sender", daemon=True) self.dedupe = dedupe self._last_sent: Dict[str, Optional[str]] = {} self.verbose = verbose def start(self): self.thread.start() def stop(self): self.stop_event.set() self.thread.join(timeout=2) def send(self, field: str, value: Optional[str]): if value is None: return if field not in VMIX_TARGETS: return if self.dedupe and self._last_sent.get(field) == value: return if self.dedupe: self._last_sent[field] = value msg = VmixMessage(field, str(value)) try: self.queue.put_nowait(msg) except Exception: # Сбрасываем самое старое сообщение, чтобы не накапливать задержку try: _ = self.queue.get_nowait() self.queue.task_done() except Empty: pass self.queue.put_nowait(msg) def _run(self): while not self.stop_event.is_set(): try: msg = self.queue.get(timeout=0.01) except Empty: continue t = VMIX_TARGETS[msg.field] ok = self.client.call(t["Function"], t["Input"], msg.value, t.get("SelectedName"), t.get("SelectedIndex")) if self.verbose: if ok: print(f"[vMix] {msg.field} <- {msg.value}") else: print(f"[vMix] ERROR send {msg.field} <- {msg.value}") self.queue.task_done() # ---------------- Parser ---------------- _QUARTER_RE = re.compile(r"\s([1-5])\s\s\d{6}\s") def parse_time_5ch(chunk: str) -> Optional[str]: if len(chunk) != 5: return None try: if chunk[0:2] == "0 " or chunk[0:2] == " ": print(f"{int(chunk[1:3])}:{int(chunk[3:5]):02d}") return f"{int(chunk[1:3])}:{int(chunk[3:5]):02d}" else: return f"{int(chunk[1:3])}.{chunk[3]}" except Exception: return None def format_shot_clock_from_tail_number(n: str) -> str: tail = n[-2:] if tail == "30": return "" # не обновлять num = n[:2] word_to_int = "ABCDEFGHI@" if n[1] in word_to_int: return f"{num[0]}.{word_to_int.index(num[1])+1 if num[1] != '@' else '0'}" else: return num @dataclass class ParsedPacket: main_time: Optional[str] = None score1: Optional[str] = None score2: Optional[str] = None quarter: Optional[str] = None shot_clock: Optional[str] = None def parse_line(line: str, verbose: bool = False) -> Optional[ParsedPacket]: clean = line.strip().replace("�", "") if len(clean) != 52: clean = clean[-52:] print(clean) if not clean or clean[0] not in ["3", "7", "8"]: return None # фиксированные позиции — всегда main_time_raw = clean[2:7] main_time = parse_time_5ch(main_time_raw) # score1/score2 есть только если строка начинается с "3" score1 = None score2 = None if clean and clean[0] == "3": s1 = clean[7:10].strip() s2 = clean[10:13].strip() if s1: score1 = s1 if s2: score2 = s2 # четверть — всегда, если найдётся quarter = None m = _QUARTER_RE.search(clean) if m: quarter = m.group(1) # шот-клок — всегда, если хвост — число shot_clock = None tail = clean[-5:] shot_clock = format_shot_clock_from_tail_number(tail) return ParsedPacket( main_time=main_time, score1=score1, score2=score2, quarter=quarter, shot_clock=shot_clock, )# ---------------- Serial reader ---------------- EOLS = {"CR": b"\r", "LF": b"\n", "CRLF": b"\r\n"} def sniff_eol(sample: bytes) -> bytes: if b"\r\n" in sample: return EOLS["CRLF"] if b"\r" in sample and b"\n" not in sample: return EOLS["CR"] if b"\n" in sample: return EOLS["LF"] return EOLS["CR"] class SerialReader: def __init__( self, port: str, baud: int, newline: str, strip_ws: bool, sender: VmixSender, send_quarter: bool, autoeol: bool, bytesize: int, parity: str, stopbits: float, rtscts: bool, xonxoff: bool, dsrdtr: bool, read_timeout: float = 0.03, idle_flush: float = 0.06, max_chunk: int = 512, verbose: bool = False, ): self.port = port self.baud = baud self.eol = EOLS.get(newline.upper(), b"\r") self.strip_ws = strip_ws self.sender = sender self.send_quarter = send_quarter self.autoeol = autoeol self.bytesize = bytesize self.parity = getattr(serial, f"PARITY_{parity.upper()}", serial.PARITY_NONE) self.stopbits = {1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO}[stopbits] self.rtscts = rtscts self.xonxoff = xonxoff self.dsrdtr = dsrdtr self.read_timeout = max(0.0, float(read_timeout)) self.idle_flush = max(0.0, float(idle_flush)) self.max_chunk = max(1, int(max_chunk)) self.verbose = verbose def run(self): while True: try: with serial.Serial( self.port, baudrate=self.baud, timeout=self.read_timeout, # КЛЮЧЕВОЕ: неблокирующее чтение write_timeout=0.01, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits, rtscts=self.rtscts, xonxoff=self.xonxoff, dsrdtr=self.dsrdtr, ) as ser: # if self.verbose: # print( # f"[serial] OPEN {ser.port} @ {self.baud} ({self.bytesize}{self._parity_name()}{self._stopbits_name()})" # ) try: ser.setDTR(True) ser.setRTS(True) except Exception: pass ser.reset_input_buffer() ser.reset_output_buffer() buffer = bytearray() last_feed = time.time() # Авто-детект EOL: быстрый сэмпл if self.autoeol: sample_t0 = time.time() while time.time() - sample_t0 < 0.15: # ~150 мс n = ser.in_waiting if n: chunk = ser.read(min(n, self.max_chunk)) if chunk: buffer.extend(chunk) if buffer: self.eol = sniff_eol(buffer) if self.verbose: print(f"[serial] auto EOL = {self._eol_name(self.eol)}") break while True: n = ser.in_waiting if n: chunk = ser.read(min(n, self.max_chunk)) else: # читаем «минимум 1 байт», чтобы не спиниться chunk = ser.read(1) if chunk: buffer.extend(chunk) last_feed = time.time() progressed = False while True: idx = buffer.find(self.eol) if idx == -1: break line = buffer[:idx] del buffer[: idx + len(self.eol)] text = self._decode(line, self.strip_ws) if text: self._process(text) progressed = True # Если нет EOL и давно не приходили байты — отдаём накопленное if not progressed and buffer and (time.time() - last_feed) >= self.idle_flush: text = self._decode(buffer, self.strip_ws) buffer.clear() if text: self._process(text) except SerialException as e: if self.verbose: print(f"[serial] ERROR open/read {self.port}: {e}. Retry in 0.5s...") time.sleep(0.5) except KeyboardInterrupt: if self.verbose: print("\n[serial] Stopped by user.") return except Exception as e: if self.verbose: print(f"[serial] UNEXPECTED: {e}. Retry in 0.5s...") time.sleep(0.5) def _decode(self, raw: bytes, strip_ws: bool) -> Optional[str]: try: text = raw.decode("utf-8", errors="replace") except Exception: text = raw.decode("latin-1", errors="replace") if strip_ws: text = text.strip() return text or None def _process(self, text: str): pkt = parse_line(text, verbose=self.verbose) if not pkt: if self.verbose: print(f"[serial] RAW: {text!r}") return if pkt.main_time is not None: self.sender.send("main_time", pkt.main_time) if pkt.score1 is not None: self.sender.send("score_1", pkt.score1) if pkt.score2 is not None: self.sender.send("score_2", pkt.score2) if self.send_quarter and pkt.quarter is not None: self.sender.send("quarter", pkt.quarter) if pkt.shot_clock is not None: self.sender.send("shot_clock", pkt.shot_clock) def _parity_name(self): return {"N": "N", "E": "E", "O": "O", "M": "M", "S": "S"}[self.parity] def _stopbits_name(self): return { serial.STOPBITS_ONE: "1", serial.STOPBITS_ONE_POINT_FIVE: "1.5", serial.STOPBITS_TWO: "2", }[self.stopbits] def _eol_name(self, e: bytes): return {b"\r": "CR", b"\n": "LF", b"\r\n": "CRLF"}.get(e, f"{e!r}") # ---------------- Probe (sniffer) ---------------- def probe_port(port: str, baud: int, seconds: int, **kwargs): print("[probe] Available ports:") for p in list_ports.comports(): print(f" - {p.device}: {p.description}") try: with serial.Serial(port, baudrate=baud) as ser: print(f"[probe] OPEN {ser.port} @ {baud}. Sniffing {seconds}s...") ser.reset_input_buffer() t0 = time.time() total = 0 while time.time() - t0 < seconds: b = ser.read(512) if not b: continue total += len(b) print(f"[{len(b):03d} bytes] HEX: {b.hex(' ')}") try: txt = b.decode("utf-8") except Exception: txt = b.decode("latin-1", errors="replace") print(f" TXT: {txt!r}") print(f"[probe] Done. Total bytes: {total}") except Exception as e: print(f"[probe] ERROR: {e}") # ---------------- CLI ---------------- def main(): ap = argparse.ArgumentParser(description="COM -> vMix bridge (optimized)") ap.add_argument("--port", required=True, help="e.g. COM2 or /dev/ttyUSB0") ap.add_argument("--baud", type=int, default=19200) ap.add_argument("--newline", choices=["LF", "CRLF", "CR"], default="CR") ap.add_argument("--autoeol", action="store_true", help="Auto-detect EOL from stream") ap.add_argument("--no-strip", action="store_true") ap.add_argument("--vmix-host", default=VMIX_HOST) ap.add_argument("--timeout", type=float, default=0.01) ap.add_argument("--retries", type=int, default=2) ap.add_argument("--backoff", type=float, default=0.2) ap.add_argument("--dedupe", action="store_true", help="Deduplicate values before sending to vMix") ap.add_argument("--send-quarter", action="store_true") ap.add_argument("--read-timeout", type=float, default=0.03, help="Serial read timeout (s)") ap.add_argument("--idle-flush", type=float, default=0.06, help="Flush partial line after idle (s)") ap.add_argument("--max-chunk", type=int, default=512, help="Max bytes per read()") ap.add_argument("--verbose", action="store_true") # serial low-level ap.add_argument("--bytesize", type=int, choices=[5, 6, 7, 8], default=8) ap.add_argument("--parity", choices=["N", "E", "O", "M", "S"], default="N") ap.add_argument("--stopbits", type=float, choices=[1, 1.5, 2], default=1) ap.add_argument("--rtscts", action="store_true") ap.add_argument("--xonxoff", action="store_true") ap.add_argument("--dsrdtr", action="store_true") # tools ap.add_argument("--probe", type=int, metavar="SECONDS", help="Sniff raw bytes/text for N seconds and exit") args = ap.parse_args() if args.probe: probe_port(args.port, args.baud, args.probe) return client = VmixClient(host=args.vmix_host, timeout=args.timeout, retries=args.retries, backoff=args.backoff) sender = VmixSender(client=client, dedupe=args.dedupe or True, verbose=args.verbose) sender.start() reader = SerialReader( port=args.port, baud=args.baud, newline=args.newline, strip_ws=not args.no_strip, sender=sender, send_quarter=args.send_quarter, autoeol=args.autoeol, bytesize=args.bytesize, parity=args.parity, stopbits=args.stopbits, rtscts=args.rtscts, xonxoff=args.xonxoff, dsrdtr=args.dsrdtr, read_timeout=args.read_timeout, idle_flush=args.idle_flush, max_chunk=args.max_chunk, verbose=args.verbose, ) try: reader.run() finally: sender.stop() if __name__ == "__main__": main()