# vmix_serial_bridge.py # pip install pyserial requests import time import argparse import urllib.parse import requests import serial from serial.serialutil import SerialException from serial.tools import list_ports import re from dataclasses import dataclass from threading import Thread, Event from queue import Queue, Empty VMIX_HOST = "http://127.0.0.1:8088" VMIX_TARGETS = { "main_time": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "TIMER.Text", "SelectedIndex": None}, "shot_clock": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "24sec.Text", "SelectedIndex": None}, "quarter": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "QUARTER.Text", "SelectedIndex": None}, "score_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE1.Text", "SelectedIndex": None}, "score_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE2.Text", "SelectedIndex": None}, "fouls_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "FOULS1.Text", "SelectedIndex": None}, "fouls_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "FOULS2.Text", "SelectedIndex": None}, } # ---------------- HTTP client ---------------- class VmixClient: def __init__(self, host: str, timeout: float = 0.01, retries: int = 3, backoff: float = 0.3): self.host = host.rstrip("/") self.timeout = timeout self.retries = retries self.backoff = backoff self.session = requests.Session() self.session.headers.update({"Connection": "keep-alive"}) def call(self, function: str, input_id: str, value: str, selected_name=None, selected_index=None) -> bool: params = {"Function": function, "Input": input_id, "Value": value} if selected_name: params["SelectedName"] = selected_name if selected_index is not None: params["SelectedIndex"] = selected_index url = f"{self.host}/api?{urllib.parse.urlencode(params, doseq=True, safe='')}" for attempt in range(1, self.retries + 2): try: r = self.session.get(url, timeout=self.timeout) print(r.status_code()) r.raise_for_status() return True except requests.RequestException as e: if attempt > self.retries: print(f"[vMix] ERROR: {e} -> {url}") return False delay = self.backoff * attempt print(f"[vMix] WARN attempt {attempt}: {e}. Retry in {delay:.2f}s") time.sleep(delay) @dataclass class VmixMessage: field: str value: str class VmixSender: def __init__(self, client: VmixClient, dedupe: bool = False, max_queue: int = 2000): self.client = client self.queue: Queue[VmixMessage] = Queue(maxsize=max_queue) self.stop_event = Event() self.thread = Thread(target=self._run, name="vmix-sender", daemon=True) self.dedupe = dedupe self._last_sent: dict[str, str | None] = {} def start(self): self.thread.start() def stop(self): self.stop_event.set(); self.thread.join(timeout=2) # VmixSender def send(self, field: str, value: str | None): if value is None: return if field not in VMIX_TARGETS: return if self.dedupe and self._last_sent.get(field) == value: return if self.dedupe: self._last_sent[field] = value try: self.queue.put_nowait(VmixMessage(field, str(value))) except Exception: try: _ = self.queue.get_nowait(); self.queue.task_done() except Empty: pass self.queue.put_nowait(VmixMessage(field, str(value))) def _run(self): while not self.stop_event.is_set(): try: msg = self.queue.get(timeout=0.2) except Empty: continue t = VMIX_TARGETS[msg.field] ok = self.client.call(t["Function"], t["Input"], msg.value, t.get("SelectedName"), t.get("SelectedIndex")) if ok: print(f"[vMix] {msg.field} <- {msg.value}") self.queue.task_done() # ---------------- Parser ---------------- def parse_time_5ch(chunk: str) -> str | None: if len(chunk) != 5: return None try: if chunk[0:2] == "0 ": return f"{int(chunk[1:3])}:{int(chunk[3:5]):02d}" # elif len(chunk) == 5: # return f"{int(chunk[1:3])}:{int(chunk[3:5]):02d}" else: return f"{int(chunk[1:3])}.{chunk[3]}" except Exception: return None def format_shot_clock_from_tail_number(n: int) -> str | None: if n < 0: return "" tail = n % 100 if tail != 31: return "" # не обновлять sec_tenths = n // 100 return str(sec_tenths // 10) if sec_tenths >= 50 else f"{sec_tenths / 10:.1f}" @dataclass class ParsedPacket: main_time: str | None = None score1: str | None = None score2: str | None = None quarter: str | None = None shot_clock: str | None = None def parse_line(line: str) -> ParsedPacket | None: clean = line.strip().replace("�", "") print(clean) # if not clean or len(clean) < 24: return None try: if clean[0] != "3": return None except Exception: return None main_time_raw = clean[2:7] main_time = parse_time_5ch(main_time_raw) # print(main_time) score1 = clean[7:10].strip() score2 = clean[10:13].strip() quarter = None m = re.search(r"\s([1-5])\s\s\d{6}\s", clean) if m: quarter = m.group(1) shot_clock = "" tail = clean[-5:] if tail.isdigit(): shot_clock = format_shot_clock_from_tail_number(int(tail)) return ParsedPacket( main_time=main_time, score1=score1 or None, score2=score2 or None, quarter=quarter, shot_clock=shot_clock, ) # ---------------- Serial reader ---------------- EOLS = {"CR": b"\r", "LF": b"\n", "CRLF": b"\r\n"} def sniff_eol(sample: bytes) -> bytes: # простейшая эвристика if b"\r\n" in sample: return EOLS["CRLF"] if b"\r" in sample and b"\n" not in sample: return EOLS["CR"] if b"\n" in sample: return EOLS["LF"] return EOLS["CR"] # дефолт class SerialReader: def __init__(self, port: str, baud: int, newline: str, strip_ws: bool, sender: VmixSender, send_quarter: bool, autoeol: bool, bytesize: int, parity: str, stopbits: float, rtscts: bool, xonxoff: bool, dsrdtr: bool): self.port = port self.baud = baud self.eol = EOLS.get(newline.upper(), b"\r") self.strip_ws = strip_ws self.sender = sender self.send_quarter = send_quarter self.autoeol = autoeol self.bytesize = bytesize self.parity = getattr(serial, f"PARITY_{parity.upper()}", serial.PARITY_NONE) self.stopbits = {1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO}[stopbits] self.rtscts = rtscts self.xonxoff = xonxoff self.dsrdtr = dsrdtr def run(self): while True: try: with serial.Serial( self.port, baudrate=self.baud, timeout=0.01, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits, rtscts=self.rtscts, xonxoff=self.xonxoff, dsrdtr=self.dsrdtr, ) as ser: print(f"[serial] OPEN {ser.port} @ {self.baud} ({self.bytesize}{self._parity_name()}{self._stopbits_name()})") # сбросим линии и буферы try: ser.setDTR(True); ser.setRTS(True) except Exception: pass ser.reset_input_buffer(); ser.reset_output_buffer() buffer = bytearray() last_feed = time.time() # авто-детект EOL по первому куску if self.autoeol: chunk = ser.read(256) if chunk: self.eol = sniff_eol(chunk) print(f"[serial] auto EOL = {self._eol_name(self.eol)}") buffer.extend(chunk) while True: chunk = ser.read(1024) if chunk: buffer.extend(chunk) last_feed = time.time() # разбор по EOL progressed = False while True: idx = buffer.find(self.eol) if idx == -1: break line = buffer[:idx] del buffer[: idx + len(self.eol)] text = self._decode(line, self.strip_ws) if text: self._process(text) progressed = True # фолбэк: если долго не было EOL, попробуем отдать «как есть» if not progressed and buffer and (time.time() - last_feed) > 0.5: text = self._decode(buffer, self.strip_ws) buffer.clear() if text: self._process(text) except SerialException as e: print(f"[serial] ERROR open/read {self.port}: {e}. Retry in 3s...") time.sleep(3) except KeyboardInterrupt: print("\n[serial] Stopped by user.") return except Exception as e: print(f"[serial] UNEXPECTED: {e}. Retry in 3s...") time.sleep(3) def _decode(self, raw: bytes, strip_ws: bool) -> str | None: try: text = raw.decode("utf-8", errors="replace") except Exception: text = raw.decode("latin-1", errors="replace") if strip_ws: text = text.strip() return text or None def _process(self, text: str): pkt = parse_line(text) if not pkt: print(f"[serial] RAW: {text!r}") return if pkt.main_time is not None: self.sender.send("main_time", pkt.main_time) if pkt.score1 is not None: self.sender.send("score_1", pkt.score1) if pkt.score2 is not None: self.sender.send("score_2", pkt.score2) if self.send_quarter and pkt.quarter is not None: self.sender.send("quarter", pkt.quarter) if pkt.shot_clock is not None: self.sender.send("shot_clock", pkt.shot_clock) def _parity_name(self): return {"N":"N","E":"E","O":"O","M":"M","S":"S"}[self.parity] def _stopbits_name(self): return {serial.STOPBITS_ONE:"1", serial.STOPBITS_ONE_POINT_FIVE:"1.5", serial.STOPBITS_TWO:"2"}[self.stopbits] def _eol_name(self, e: bytes): return {b"\r":"CR", b"\n":"LF", b"\r\n":"CRLF"}.get(e, f"{e!r}") # ---------------- Probe (sniffer) ---------------- def probe_port(port: str, baud: int, seconds: int, **kwargs): print("[probe] Available ports:") for p in list_ports.comports(): print(f" - {p.device}: {p.description}") try: with serial.Serial(port, baudrate=baud, timeout=0.01) as ser: print(f"[probe] OPEN {ser.port} @ {baud}. Sniffing {seconds}s...") ser.reset_input_buffer() t0 = time.time() total = 0 while time.time() - t0 < seconds: b = ser.read(512) if not b: continue total += len(b) print(f"[{len(b):03d} bytes] HEX: {b.hex(' ')}") try: txt = b.decode("utf-8") except Exception: txt = b.decode("latin-1", errors="replace") print(f" TXT: {txt!r}") print(f"[probe] Done. Total bytes: {total}") except Exception as e: print(f"[probe] ERROR: {e}") # ---------------- CLI ---------------- def main(): ap = argparse.ArgumentParser(description="COM -> vMix bridge with diagnostics") ap.add_argument("--port", required=True, help="e.g. COM2 or /dev/ttyUSB0") ap.add_argument("--baud", type=int, default=19200) ap.add_argument("--newline", choices=["LF", "CRLF", "CR"], default="CR") ap.add_argument("--autoeol", action="store_true", help="Auto-detect EOL from stream") ap.add_argument("--no-strip", action="store_true") ap.add_argument("--vmix-host", default=VMIX_HOST) ap.add_argument("--timeout", type=float, default=0.01) ap.add_argument("--retries", type=int, default=3) ap.add_argument("--backoff", type=float, default=0.3) ap.add_argument("--dedupe", action="store_true") ap.add_argument("--send-quarter", action="store_true") # serial low-level ap.add_argument("--bytesize", type=int, choices=[5,6,7,8], default=8) ap.add_argument("--parity", choices=["N","E","O","M","S"], default="N") ap.add_argument("--stopbits", type=float, choices=[1,1.5,2], default=1) ap.add_argument("--rtscts", action="store_true") ap.add_argument("--xonxoff", action="store_true") ap.add_argument("--dsrdtr", action="store_true") # tools ap.add_argument("--probe", type=int, metavar="SECONDS", help="Sniff raw bytes/text for N seconds and exit") args = ap.parse_args() if args.probe: probe_port(args.port, args.baud, args.probe) return client = VmixClient(host=args.vmix_host, timeout=args.timeout, retries=args.retries, backoff=args.backoff) sender = VmixSender(client=client, dedupe=args.dedupe); sender.start() reader = SerialReader( port=args.port, baud=args.baud, newline=args.newline, strip_ws=not args.no_strip, sender=sender, send_quarter=args.send_quarter, autoeol=args.autoeol, bytesize=args.bytesize, parity=args.parity, stopbits=args.stopbits, rtscts=args.rtscts, xonxoff=args.xonxoff, dsrdtr=args.dsrdtr, ) try: reader.run() finally: sender.stop() if __name__ == "__main__": main()