Files
TIMERS/timer COM copy.py

364 lines
14 KiB
Python
Raw Blame History

# vmix_serial_bridge.py
# pip install pyserial requests
import time
import argparse
import urllib.parse
import requests
import serial
from serial.serialutil import SerialException
from serial.tools import list_ports
import re
from dataclasses import dataclass
from threading import Thread, Event
from queue import Queue, Empty
VMIX_HOST = "http://127.0.0.1:8088"
VMIX_TARGETS = {
"main_time": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "TIMER.Text", "SelectedIndex": None},
"shot_clock": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "24sec.Text", "SelectedIndex": None},
"quarter": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "QUARTER.Text", "SelectedIndex": None},
"score_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE1.Text", "SelectedIndex": None},
"score_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "SCORE2.Text", "SelectedIndex": None},
"fouls_1": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "FOULS1.Text", "SelectedIndex": None},
"fouls_2": {"Function": "SetText", "Input": "SCOREBUG", "SelectedName": "FOULS2.Text", "SelectedIndex": None},
}
# ---------------- HTTP client ----------------
class VmixClient:
def __init__(self, host: str, timeout: float = 0.01, retries: int = 3, backoff: float = 0.3):
self.host = host.rstrip("/")
self.timeout = timeout
self.retries = retries
self.backoff = backoff
self.session = requests.Session()
self.session.headers.update({"Connection": "keep-alive"})
def call(self, function: str, input_id: str, value: str, selected_name=None, selected_index=None) -> bool:
params = {"Function": function, "Input": input_id, "Value": value}
if selected_name:
params["SelectedName"] = selected_name
if selected_index is not None:
params["SelectedIndex"] = selected_index
url = f"{self.host}/api?{urllib.parse.urlencode(params, doseq=True, safe='')}"
for attempt in range(1, self.retries + 2):
try:
r = self.session.get(url, timeout=self.timeout)
print(r.status_code())
r.raise_for_status()
return True
except requests.RequestException as e:
if attempt > self.retries:
print(f"[vMix] ERROR: {e} -> {url}")
return False
delay = self.backoff * attempt
print(f"[vMix] WARN attempt {attempt}: {e}. Retry in {delay:.2f}s")
time.sleep(delay)
@dataclass
class VmixMessage:
field: str
value: str
class VmixSender:
def __init__(self, client: VmixClient, dedupe: bool = False, max_queue: int = 2000):
self.client = client
self.queue: Queue[VmixMessage] = Queue(maxsize=max_queue)
self.stop_event = Event()
self.thread = Thread(target=self._run, name="vmix-sender", daemon=True)
self.dedupe = dedupe
self._last_sent: dict[str, str | None] = {}
def start(self): self.thread.start()
def stop(self): self.stop_event.set(); self.thread.join(timeout=2)
# VmixSender
def send(self, field: str, value: str | None):
if value is None:
return
if field not in VMIX_TARGETS:
return
if self.dedupe and self._last_sent.get(field) == value:
return
if self.dedupe:
self._last_sent[field] = value
try:
self.queue.put_nowait(VmixMessage(field, str(value)))
except Exception:
try:
_ = self.queue.get_nowait(); self.queue.task_done()
except Empty:
pass
self.queue.put_nowait(VmixMessage(field, str(value)))
def _run(self):
while not self.stop_event.is_set():
try:
msg = self.queue.get(timeout=0.2)
except Empty:
continue
t = VMIX_TARGETS[msg.field]
ok = self.client.call(t["Function"], t["Input"], msg.value, t.get("SelectedName"), t.get("SelectedIndex"))
if ok: print(f"[vMix] {msg.field} <- {msg.value}")
self.queue.task_done()
# ---------------- Parser ----------------
def parse_time_5ch(chunk: str) -> str | None:
if len(chunk) != 5: return None
try:
if chunk[0:2] == "0 ":
return f"{int(chunk[1:3])}:{int(chunk[3:5]):02d}"
# elif len(chunk) == 5:
# return f"{int(chunk[1:3])}:{int(chunk[3:5]):02d}"
else:
return f"{int(chunk[1:3])}.{chunk[3]}"
except Exception:
return None
def format_shot_clock_from_tail_number(n: int) -> str | None:
if n < 0: return ""
tail = n % 100
if tail != 31: return "" # не обновлять
sec_tenths = n // 100
return str(sec_tenths // 10) if sec_tenths >= 50 else f"{sec_tenths / 10:.1f}"
@dataclass
class ParsedPacket:
main_time: str | None = None
score1: str | None = None
score2: str | None = None
quarter: str | None = None
shot_clock: str | None = None
def parse_line(line: str) -> ParsedPacket | None:
clean = line.strip().replace("<EFBFBD>", "")
print(clean)
# if not clean or len(clean) < 24: return None
try:
if clean[0] != "3": return None
except Exception:
return None
main_time_raw = clean[2:7]
main_time = parse_time_5ch(main_time_raw)
# print(main_time)
score1 = clean[7:10].strip()
score2 = clean[10:13].strip()
quarter = None
m = re.search(r"\s([1-5])\s\s\d{6}\s", clean)
if m: quarter = m.group(1)
shot_clock = ""
tail = clean[-5:]
if tail.isdigit():
shot_clock = format_shot_clock_from_tail_number(int(tail))
return ParsedPacket(
main_time=main_time,
score1=score1 or None,
score2=score2 or None,
quarter=quarter,
shot_clock=shot_clock,
)
# ---------------- Serial reader ----------------
EOLS = {"CR": b"\r", "LF": b"\n", "CRLF": b"\r\n"}
def sniff_eol(sample: bytes) -> bytes:
# простейшая эвристика
if b"\r\n" in sample: return EOLS["CRLF"]
if b"\r" in sample and b"\n" not in sample: return EOLS["CR"]
if b"\n" in sample: return EOLS["LF"]
return EOLS["CR"] # дефолт
class SerialReader:
def __init__(self, port: str, baud: int, newline: str, strip_ws: bool, sender: VmixSender,
send_quarter: bool, autoeol: bool, bytesize: int, parity: str, stopbits: float,
rtscts: bool, xonxoff: bool, dsrdtr: bool):
self.port = port
self.baud = baud
self.eol = EOLS.get(newline.upper(), b"\r")
self.strip_ws = strip_ws
self.sender = sender
self.send_quarter = send_quarter
self.autoeol = autoeol
self.bytesize = bytesize
self.parity = getattr(serial, f"PARITY_{parity.upper()}", serial.PARITY_NONE)
self.stopbits = {1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO}[stopbits]
self.rtscts = rtscts
self.xonxoff = xonxoff
self.dsrdtr = dsrdtr
def run(self):
while True:
try:
with serial.Serial(
self.port,
baudrate=self.baud,
timeout=0.01,
bytesize=self.bytesize,
parity=self.parity,
stopbits=self.stopbits,
rtscts=self.rtscts,
xonxoff=self.xonxoff,
dsrdtr=self.dsrdtr,
) as ser:
print(f"[serial] OPEN {ser.port} @ {self.baud} ({self.bytesize}{self._parity_name()}{self._stopbits_name()})")
# сбросим линии и буферы
try:
ser.setDTR(True); ser.setRTS(True)
except Exception: pass
ser.reset_input_buffer(); ser.reset_output_buffer()
buffer = bytearray()
last_feed = time.time()
# авто-детект EOL по первому куску
if self.autoeol:
chunk = ser.read(256)
if chunk:
self.eol = sniff_eol(chunk)
print(f"[serial] auto EOL = {self._eol_name(self.eol)}")
buffer.extend(chunk)
while True:
chunk = ser.read(1024)
if chunk:
buffer.extend(chunk)
last_feed = time.time()
# разбор по EOL
progressed = False
while True:
idx = buffer.find(self.eol)
if idx == -1: break
line = buffer[:idx]
del buffer[: idx + len(self.eol)]
text = self._decode(line, self.strip_ws)
if text:
self._process(text)
progressed = True
# фолбэк: если долго не было EOL, попробуем отдать «как есть»
if not progressed and buffer and (time.time() - last_feed) > 0.5:
text = self._decode(buffer, self.strip_ws)
buffer.clear()
if text:
self._process(text)
except SerialException as e:
print(f"[serial] ERROR open/read {self.port}: {e}. Retry in 3s...")
time.sleep(3)
except KeyboardInterrupt:
print("\n[serial] Stopped by user.")
return
except Exception as e:
print(f"[serial] UNEXPECTED: {e}. Retry in 3s...")
time.sleep(3)
def _decode(self, raw: bytes, strip_ws: bool) -> str | None:
try:
text = raw.decode("utf-8", errors="replace")
except Exception:
text = raw.decode("latin-1", errors="replace")
if strip_ws: text = text.strip()
return text or None
def _process(self, text: str):
pkt = parse_line(text)
if not pkt:
print(f"[serial] RAW: {text!r}")
return
if pkt.main_time is not None: self.sender.send("main_time", pkt.main_time)
if pkt.score1 is not None: self.sender.send("score_1", pkt.score1)
if pkt.score2 is not None: self.sender.send("score_2", pkt.score2)
if self.send_quarter and pkt.quarter is not None:
self.sender.send("quarter", pkt.quarter)
if pkt.shot_clock is not None:
self.sender.send("shot_clock", pkt.shot_clock)
def _parity_name(self): return {"N":"N","E":"E","O":"O","M":"M","S":"S"}[self.parity]
def _stopbits_name(self): return {serial.STOPBITS_ONE:"1", serial.STOPBITS_ONE_POINT_FIVE:"1.5", serial.STOPBITS_TWO:"2"}[self.stopbits]
def _eol_name(self, e: bytes): return {b"\r":"CR", b"\n":"LF", b"\r\n":"CRLF"}.get(e, f"{e!r}")
# ---------------- Probe (sniffer) ----------------
def probe_port(port: str, baud: int, seconds: int, **kwargs):
print("[probe] Available ports:")
for p in list_ports.comports():
print(f" - {p.device}: {p.description}")
try:
with serial.Serial(port, baudrate=baud, timeout=0.01) as ser:
print(f"[probe] OPEN {ser.port} @ {baud}. Sniffing {seconds}s...")
ser.reset_input_buffer()
t0 = time.time()
total = 0
while time.time() - t0 < seconds:
b = ser.read(512)
if not b: continue
total += len(b)
print(f"[{len(b):03d} bytes] HEX: {b.hex(' ')}")
try:
txt = b.decode("utf-8")
except Exception:
txt = b.decode("latin-1", errors="replace")
print(f" TXT: {txt!r}")
print(f"[probe] Done. Total bytes: {total}")
except Exception as e:
print(f"[probe] ERROR: {e}")
# ---------------- CLI ----------------
def main():
ap = argparse.ArgumentParser(description="COM -> vMix bridge with diagnostics")
ap.add_argument("--port", required=True, help="e.g. COM2 or /dev/ttyUSB0")
ap.add_argument("--baud", type=int, default=19200)
ap.add_argument("--newline", choices=["LF", "CRLF", "CR"], default="CR")
ap.add_argument("--autoeol", action="store_true", help="Auto-detect EOL from stream")
ap.add_argument("--no-strip", action="store_true")
ap.add_argument("--vmix-host", default=VMIX_HOST)
ap.add_argument("--timeout", type=float, default=0.01)
ap.add_argument("--retries", type=int, default=3)
ap.add_argument("--backoff", type=float, default=0.3)
ap.add_argument("--dedupe", action="store_true")
ap.add_argument("--send-quarter", action="store_true")
# serial low-level
ap.add_argument("--bytesize", type=int, choices=[5,6,7,8], default=8)
ap.add_argument("--parity", choices=["N","E","O","M","S"], default="N")
ap.add_argument("--stopbits", type=float, choices=[1,1.5,2], default=1)
ap.add_argument("--rtscts", action="store_true")
ap.add_argument("--xonxoff", action="store_true")
ap.add_argument("--dsrdtr", action="store_true")
# tools
ap.add_argument("--probe", type=int, metavar="SECONDS", help="Sniff raw bytes/text for N seconds and exit")
args = ap.parse_args()
if args.probe:
probe_port(args.port, args.baud, args.probe)
return
client = VmixClient(host=args.vmix_host, timeout=args.timeout, retries=args.retries, backoff=args.backoff)
sender = VmixSender(client=client, dedupe=args.dedupe); sender.start()
reader = SerialReader(
port=args.port,
baud=args.baud,
newline=args.newline,
strip_ws=not args.no_strip,
sender=sender,
send_quarter=args.send_quarter,
autoeol=args.autoeol,
bytesize=args.bytesize,
parity=args.parity,
stopbits=args.stopbits,
rtscts=args.rtscts,
xonxoff=args.xonxoff,
dsrdtr=args.dsrdtr,
)
try:
reader.run()
finally:
sender.stop()
if __name__ == "__main__":
main()