Files
TIMERS/timer COM3.py

487 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# vmix_serial_bridge_optimized.py
# pip install pyserial requests
"""
Оптимизированная версия COM -> vMix моста для максимально быстрой доставки
данных в vMix. Ключевые изменения:
- Немедленная обработка входящих байт: неблокирующее чтение с малой задержкой
(timeout) и мгновенная отправка строк при появлении EOL или коротком «простоя».
- Параметризуемые интервалы: --read-timeout, --idle-flush для тонкой настройки
задержки между получением и отправкой.
- Убраны лишние print'ы, добавлен флаг --verbose для отладки без штрафа к скорости.
- Дедупликация значений к vMix включаемая флагом (по умолчанию включена).
- Предкомпиляция регулярных выражений и мелкие микроптимизации парсера.
- Очередь отправки без блокировок и аккуратное «сбросить-старое-сообщение» поведение
на переполнении.
Совместима с прежней CLI, но добавлены новые опции:
--read-timeout (сек, по умолчанию 0.03)
--idle-flush (сек, по умолчанию 0.06)
--max-chunk (байт, по умолчанию 512)
--verbose (подробные логи)
Пример запуска:
python vmix_serial_bridge_optimized.py --port COM2 --baud 19200 --autoeol \
--send-quarter --dedupe --read-timeout 0.02 --idle-flush 0.05
"""
import time
import argparse
import urllib.parse
import requests
import serial
from serial.serialutil import SerialException
from serial.tools import list_ports
import re
from dataclasses import dataclass
from threading import Thread, Event
from queue import Queue, Empty
from typing import Optional, Dict
VMIX_HOST = "http://127.0.0.1:8088"
VMIX_TARGETS = {
"main_time": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "TIMER.Text", "SelectedIndex": None},
"shot_clock": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "24sec.Text", "SelectedIndex": None},
"quarter": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "QUARTER.Text", "SelectedIndex": None},
"score_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE1.Text", "SelectedIndex": None},
"score_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE2.Text", "SelectedIndex": None},
"fouls_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "FOULS1.Text", "SelectedIndex": None},
"fouls_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "FOULS2.Text", "SelectedIndex": None},
}
# ---------------- HTTP client ----------------
class VmixClient:
def __init__(self, host: str, timeout: float = 0.01, retries: int = 2, backoff: float = 0.2):
self.host = host.rstrip("/")
self.timeout = timeout
self.retries = retries
self.backoff = backoff
self.session = requests.Session()
self.session.headers.update({"Connection": "keep-alive"})
def call(self, function: str, input_id: str, value: str, selected_name=None, selected_index=None) -> bool:
params = {"Function": function, "Input": input_id, "Value": value}
if selected_name:
params["SelectedName"] = selected_name
if selected_index is not None:
params["SelectedIndex"] = selected_index
url = f"{self.host}/api?{urllib.parse.urlencode(params, doseq=True, safe='')}"
for attempt in range(1, self.retries + 2):
try:
r = self.session.get(url, timeout=self.timeout)
r.raise_for_status()
return True
except requests.RequestException as e:
if attempt > self.retries:
return False
time.sleep(self.backoff * attempt)
return False
@dataclass
class VmixMessage:
field: str
value: str
class VmixSender:
def __init__(self, client: VmixClient, dedupe: bool = True, max_queue: int = 4096, verbose: bool = False):
self.client = client
self.queue: Queue[VmixMessage] = Queue(maxsize=max_queue)
self.stop_event = Event()
self.thread = Thread(target=self._run, name="vmix-sender", daemon=True)
self.dedupe = dedupe
self._last_sent: Dict[str, Optional[str]] = {}
self.verbose = verbose
def start(self):
self.thread.start()
def stop(self):
self.stop_event.set()
self.thread.join(timeout=2)
def send(self, field: str, value: Optional[str]):
if value is None:
return
if field not in VMIX_TARGETS:
return
if self.dedupe and self._last_sent.get(field) == value:
return
if self.dedupe:
self._last_sent[field] = value
msg = VmixMessage(field, str(value))
try:
self.queue.put_nowait(msg)
except Exception:
# Сбрасываем самое старое сообщение, чтобы не накапливать задержку
try:
_ = self.queue.get_nowait()
self.queue.task_done()
except Empty:
pass
self.queue.put_nowait(msg)
def _run(self):
while not self.stop_event.is_set():
try:
msg = self.queue.get(timeout=0.01)
except Empty:
continue
t = VMIX_TARGETS[msg.field]
ok = self.client.call(t["Function"], t["Input"], msg.value, t.get("SelectedName"), t.get("SelectedIndex"))
if self.verbose:
if ok:
print(f"[vMix] {msg.field} <- {msg.value}")
else:
print(f"[vMix] ERROR send {msg.field} <- {msg.value}")
self.queue.task_done()
# ---------------- Parser ----------------
_QUARTER_RE = re.compile(r"\s([1-5])\s\s\d{6}\s")
def parse_time_5ch(chunk: str) -> Optional[str]:
if len(chunk) != 5:
return None
try:
if chunk[0:2] == "0 " or chunk[0:2] == " ":
print(f"{int(chunk[1:3])}:{int(chunk[3:5]):02d}")
return f"{int(chunk[1:3])}:{int(chunk[3:5]):02d}"
else:
return f"{int(chunk[1:3])}.{chunk[3]}"
except Exception:
return None
def format_shot_clock_from_tail_number(n: str) -> str:
tail = n[-2:]
if tail == "30":
return "" # не обновлять
num = n[:2]
word_to_int = "ABCDEFGHI@"
if n[1] in word_to_int:
return f"{num[0]}.{word_to_int.index(num[1])+1 if num[1] != '@' else '0'}"
else:
return num
@dataclass
class ParsedPacket:
main_time: Optional[str] = None
score1: Optional[str] = None
score2: Optional[str] = None
quarter: Optional[str] = None
shot_clock: Optional[str] = None
def parse_line(line: str, verbose: bool = False) -> Optional[ParsedPacket]:
clean = line.strip().replace("<EFBFBD>", "")
if len(clean) != 52:
clean = clean[-52:]
print(clean)
if not clean or clean[0] not in ["3", "7", "8"]:
return None
# фиксированные позиции — всегда
main_time_raw = clean[2:7]
main_time = parse_time_5ch(main_time_raw)
# score1/score2 есть только если строка начинается с "3"
score1 = None
score2 = None
if clean and clean[0] == "3":
s1 = clean[7:10].strip()
s2 = clean[10:13].strip()
if s1:
score1 = s1
if s2:
score2 = s2
# четверть — всегда, если найдётся
quarter = None
m = _QUARTER_RE.search(clean)
if m:
quarter = m.group(1)
# шот-клок — всегда, если хвост — число
shot_clock = None
tail = clean[-5:]
shot_clock = format_shot_clock_from_tail_number(tail)
return ParsedPacket(
main_time=main_time,
score1=score1,
score2=score2,
quarter=quarter,
shot_clock=shot_clock,
)# ---------------- Serial reader ----------------
EOLS = {"CR": b"\r", "LF": b"\n", "CRLF": b"\r\n"}
def sniff_eol(sample: bytes) -> bytes:
if b"\r\n" in sample:
return EOLS["CRLF"]
if b"\r" in sample and b"\n" not in sample:
return EOLS["CR"]
if b"\n" in sample:
return EOLS["LF"]
return EOLS["CR"]
class SerialReader:
def __init__(
self,
port: str,
baud: int,
newline: str,
strip_ws: bool,
sender: VmixSender,
send_quarter: bool,
autoeol: bool,
bytesize: int,
parity: str,
stopbits: float,
rtscts: bool,
xonxoff: bool,
dsrdtr: bool,
read_timeout: float = 0.03,
idle_flush: float = 0.06,
max_chunk: int = 512,
verbose: bool = False,
):
self.port = port
self.baud = baud
self.eol = EOLS.get(newline.upper(), b"\r")
self.strip_ws = strip_ws
self.sender = sender
self.send_quarter = send_quarter
self.autoeol = autoeol
self.bytesize = bytesize
self.parity = getattr(serial, f"PARITY_{parity.upper()}", serial.PARITY_NONE)
self.stopbits = {1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO}[stopbits]
self.rtscts = rtscts
self.xonxoff = xonxoff
self.dsrdtr = dsrdtr
self.read_timeout = max(0.0, float(read_timeout))
self.idle_flush = max(0.0, float(idle_flush))
self.max_chunk = max(1, int(max_chunk))
self.verbose = verbose
def run(self):
while True:
try:
with serial.Serial(
self.port,
baudrate=self.baud,
timeout=self.read_timeout, # КЛЮЧЕВОЕ: неблокирующее чтение
write_timeout=0.01,
bytesize=self.bytesize,
parity=self.parity,
stopbits=self.stopbits,
rtscts=self.rtscts,
xonxoff=self.xonxoff,
dsrdtr=self.dsrdtr,
) as ser:
# if self.verbose:
# print(
# f"[serial] OPEN {ser.port} @ {self.baud} ({self.bytesize}{self._parity_name()}{self._stopbits_name()})"
# )
try:
ser.setDTR(True)
ser.setRTS(True)
except Exception:
pass
ser.reset_input_buffer()
ser.reset_output_buffer()
buffer = bytearray()
last_feed = time.time()
# Авто-детект EOL: быстрый сэмпл
if self.autoeol:
sample_t0 = time.time()
while time.time() - sample_t0 < 0.15: # ~150 мс
n = ser.in_waiting
if n:
chunk = ser.read(min(n, self.max_chunk))
if chunk:
buffer.extend(chunk)
if buffer:
self.eol = sniff_eol(buffer)
if self.verbose:
print(f"[serial] auto EOL = {self._eol_name(self.eol)}")
break
while True:
n = ser.in_waiting
if n:
chunk = ser.read(min(n, self.max_chunk))
else:
# читаем «минимум 1 байт», чтобы не спиниться
chunk = ser.read(1)
if chunk:
buffer.extend(chunk)
last_feed = time.time()
progressed = False
while True:
idx = buffer.find(self.eol)
if idx == -1:
break
line = buffer[:idx]
del buffer[: idx + len(self.eol)]
text = self._decode(line, self.strip_ws)
if text:
self._process(text)
progressed = True
# Если нет EOL и давно не приходили байты — отдаём накопленное
if not progressed and buffer and (time.time() - last_feed) >= self.idle_flush:
text = self._decode(buffer, self.strip_ws)
buffer.clear()
if text:
self._process(text)
except SerialException as e:
if self.verbose:
print(f"[serial] ERROR open/read {self.port}: {e}. Retry in 0.5s...")
time.sleep(0.5)
except KeyboardInterrupt:
if self.verbose:
print("\n[serial] Stopped by user.")
return
except Exception as e:
if self.verbose:
print(f"[serial] UNEXPECTED: {e}. Retry in 0.5s...")
time.sleep(0.5)
def _decode(self, raw: bytes, strip_ws: bool) -> Optional[str]:
try:
text = raw.decode("utf-8", errors="replace")
except Exception:
text = raw.decode("latin-1", errors="replace")
if strip_ws:
text = text.strip()
return text or None
def _process(self, text: str):
pkt = parse_line(text, verbose=self.verbose)
if not pkt:
if self.verbose:
print(f"[serial] RAW: {text!r}")
return
if pkt.main_time is not None:
self.sender.send("main_time", pkt.main_time)
if pkt.score1 is not None:
self.sender.send("score_1", pkt.score1)
if pkt.score2 is not None:
self.sender.send("score_2", pkt.score2)
if self.send_quarter and pkt.quarter is not None:
self.sender.send("quarter", pkt.quarter)
if pkt.shot_clock is not None:
self.sender.send("shot_clock", pkt.shot_clock)
def _parity_name(self):
return {"N": "N", "E": "E", "O": "O", "M": "M", "S": "S"}[self.parity]
def _stopbits_name(self):
return {
serial.STOPBITS_ONE: "1",
serial.STOPBITS_ONE_POINT_FIVE: "1.5",
serial.STOPBITS_TWO: "2",
}[self.stopbits]
def _eol_name(self, e: bytes):
return {b"\r": "CR", b"\n": "LF", b"\r\n": "CRLF"}.get(e, f"{e!r}")
# ---------------- Probe (sniffer) ----------------
def probe_port(port: str, baud: int, seconds: int, **kwargs):
print("[probe] Available ports:")
for p in list_ports.comports():
print(f" - {p.device}: {p.description}")
try:
with serial.Serial(port, baudrate=baud) as ser:
print(f"[probe] OPEN {ser.port} @ {baud}. Sniffing {seconds}s...")
ser.reset_input_buffer()
t0 = time.time()
total = 0
while time.time() - t0 < seconds:
b = ser.read(512)
if not b:
continue
total += len(b)
print(f"[{len(b):03d} bytes] HEX: {b.hex(' ')}")
try:
txt = b.decode("utf-8")
except Exception:
txt = b.decode("latin-1", errors="replace")
print(f" TXT: {txt!r}")
print(f"[probe] Done. Total bytes: {total}")
except Exception as e:
print(f"[probe] ERROR: {e}")
# ---------------- CLI ----------------
def main():
ap = argparse.ArgumentParser(description="COM -> vMix bridge (optimized)")
ap.add_argument("--port", required=True, help="e.g. COM2 or /dev/ttyUSB0")
ap.add_argument("--baud", type=int, default=19200)
ap.add_argument("--newline", choices=["LF", "CRLF", "CR"], default="CR")
ap.add_argument("--autoeol", action="store_true", help="Auto-detect EOL from stream")
ap.add_argument("--no-strip", action="store_true")
ap.add_argument("--vmix-host", default=VMIX_HOST)
ap.add_argument("--timeout", type=float, default=0.01)
ap.add_argument("--retries", type=int, default=2)
ap.add_argument("--backoff", type=float, default=0.2)
ap.add_argument("--dedupe", action="store_true", help="Deduplicate values before sending to vMix")
ap.add_argument("--send-quarter", action="store_true")
ap.add_argument("--read-timeout", type=float, default=0.03, help="Serial read timeout (s)")
ap.add_argument("--idle-flush", type=float, default=0.06, help="Flush partial line after idle (s)")
ap.add_argument("--max-chunk", type=int, default=512, help="Max bytes per read()")
ap.add_argument("--verbose", action="store_true")
# serial low-level
ap.add_argument("--bytesize", type=int, choices=[5, 6, 7, 8], default=8)
ap.add_argument("--parity", choices=["N", "E", "O", "M", "S"], default="N")
ap.add_argument("--stopbits", type=float, choices=[1, 1.5, 2], default=1)
ap.add_argument("--rtscts", action="store_true")
ap.add_argument("--xonxoff", action="store_true")
ap.add_argument("--dsrdtr", action="store_true")
# tools
ap.add_argument("--probe", type=int, metavar="SECONDS", help="Sniff raw bytes/text for N seconds and exit")
args = ap.parse_args()
if args.probe:
probe_port(args.port, args.baud, args.probe)
return
client = VmixClient(host=args.vmix_host, timeout=args.timeout, retries=args.retries, backoff=args.backoff)
sender = VmixSender(client=client, dedupe=args.dedupe or True, verbose=args.verbose)
sender.start()
reader = SerialReader(
port=args.port,
baud=args.baud,
newline=args.newline,
strip_ws=not args.no_strip,
sender=sender,
send_quarter=args.send_quarter,
autoeol=args.autoeol,
bytesize=args.bytesize,
parity=args.parity,
stopbits=args.stopbits,
rtscts=args.rtscts,
xonxoff=args.xonxoff,
dsrdtr=args.dsrdtr,
read_timeout=args.read_timeout,
idle_flush=args.idle_flush,
max_chunk=args.max_chunk,
verbose=args.verbose,
)
try:
reader.run()
finally:
sender.stop()
if __name__ == "__main__":
main()