# serial_to_vmix.py # pip install pyserial requests import time import argparse import socket import urllib.parse import requests import serial from serial.serialutil import SerialException # --------------------------- # 1) ВАША функция отправки в vMix # Замените содержимое по своему желанию. # --------------------------- def parse(data: str): """ Пользовательская обработка/отправка данных в vMix. Здесь просто печатаем; замените на вашу логику. """ print(f"[parse] {data}") # ---- ПРИМЕРЫ реализации parse (раскомментируйте нужное) ---- # Пример A: отправка через vMix HTTP API (SetText в определённый input/title) # def parse(data: str): # vmix_host = "http://127.0.0.1:8088" # адрес vMix Web Controller # function = "SetText" # input_id = "1" # замените на ваш Input (номер/имя/строка GUID) # selected_title = None # если нужно указать конкретный Title layer: "TitleName.xaml" # field = None # если нужно указать конкретное текстовое поле в титре # # params = { # "Function": function, # "Input": input_id, # "Value": data # } # if selected_title: # params["SelectedName"] = selected_title # if field: # params["SelectedIndex"] = field # # # важно: корректно кодируем Value # url = f"{vmix_host}/api?{urllib.parse.urlencode(params, doseq=True, safe='')}" # try: # r = requests.get(url, timeout=2) # r.raise_for_status() # except requests.RequestException as e: # print(f"[parse][HTTP] ошибка запроса к vMix: {e}") # Пример B: отправка одной строки в vMix TCP (порт 8099) — например, выполнить функцию # def parse(data: str): # vmix_tcp_host = "127.0.0.1" # vmix_tcp_port = 8099 # # пример: выполнить SetText # cmd = f"FUNCTION SetText Input=1 Value=\"{data}\"\r\n" # try: # with socket.create_connection((vmix_tcp_host, vmix_tcp_port), timeout=2) as s: # s.sendall(cmd.encode("utf-8")) # except OSError as e: # print(f"[parse][TCP] ошибка сокета к vMix: {e}") # --------------------------- # 2) Чтение из COM-порта и вызов parse # --------------------------- def read_loop(port: str, baud: int, newline: str, strip_whitespace: bool): # мапинг для окончания строк в pyserial eol = {"CRLF": b"\r\n", "LF": b"\n", "CR": b"\r"}.get(newline.upper(), b"\n") while True: try: with serial.Serial(port, baudrate=baud, timeout=1) as ser: print(f"[serial] открыт {ser.port} @ {baud} бод. Ожидаю данные...") buffer = bytearray() while True: chunk = ser.read(1024) if not chunk: continue buffer.extend(chunk) # разбираем по разделителю строк while True: idx = buffer.find(eol) if idx == -1: break line = buffer[:idx] del buffer[:idx + len(eol)] try: text = line.decode("utf-8", errors="replace") except Exception: text = line.decode("latin-1", errors="replace") if strip_whitespace: text = text.strip() if text != "": parse(text) except SerialException as e: print(f"[serial] не удалось открыть/читать {port}: {e}. Повтор через 3 сек...") time.sleep(3) except KeyboardInterrupt: print("\n[serial] остановлено пользователем.") break except Exception as e: print(f"[serial] непредвиденная ошибка: {e}. Повтор через 3 сек...") time.sleep(3) def main(): ap = argparse.ArgumentParser(description="Чтение COM и отправка в vMix через parse()") ap.add_argument("--port", required=True, help="COM-порт, напр. COM3 (Windows) или /dev/ttyUSB0 (Linux)") ap.add_argument("--baud", type=int, default=19200, help="Скорость, по умолчанию 9600") ap.add_argument("--newline", choices=["LF", "CRLF", "CR"], default="CR", help="Разделитель строк, по умолчанию LF") ap.add_argument("--no-strip", action="store_true", help="Не обрезать пробелы по краям (по умолчанию обрезаем)") args = ap.parse_args() read_loop(args.port, args.baud, args.newline, strip_whitespace=not args.no_strip) if __name__ == "__main__": main()