import tkinter as tk from tkinter import filedialog, messagebox, ttk import threading import socket import time import binascii import requests from concurrent.futures import ThreadPoolExecutor def hexspace(data, n): return " ".join([data[i : i + n] for i in range(0, len(data), n)]) class DataReceiver: def __init__(self): self.running = False self.file_mode = False self.file_path = None self.manual_mode = False def start_tcp(self, host, port, callback): self.running = True def run(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: # Подключаемся к серверу s.connect((host, port)) # Отправляем строку "hello" в виде байтов s.sendall("hello".encode("utf-8")) # Основной цикл для получения данных while self.running: try: # Получаем данные от сервера (максимум 1024 байта) data = s.recv(1024) if data: try: # Преобразуем байты в строку UTF-8 decoded_data = data # Вызываем callback с декодированными данными callback(decoded_data) except UnicodeDecodeError: print(f"Ошибка декодирования данных: {data}") callback(data.decode("latin1", errors="replace")) else: # Если data пустое, соединение закрылось print("Сервер закрыл соединение.") break except (socket.error, OSError) as e: print(f"Ошибка приёма данных: {e}") break except Exception as e: print(f"Ошибка подключения: {e}") finally: print("Соединение закрыто.") self.running = False threading.Thread(target=run, daemon=True).start() def start_file(self, callback, timeout): self.running = True def run(): if not self.file_path: return with open(self.file_path, "r") as f: while self.running: if self.manual_mode: continue line = f.readline() if line: callback(eval(line.strip().split(" DEBUG ")[1])) time.sleep(timeout) threading.Thread(target=run, daemon=True).start() def read_next_line(self, callback): if not self.file_path: return with open(self.file_path, "r") as f: line = f.readline() if line: callback(eval(line.strip().split(" DEBUG ")[1])) def stop(self): self.running = False class DataSender: def __init__(self): self.targets = [] self.session = requests.Session() self.session.headers.update({"Connection": "keep-alive"}) def add_target(self, host, port): self.targets.append((host, port)) def send(self, data): """Многопоточная отправка данных на все указанные цели.""" if not self.targets: print("No targets available to send data.") return def send_to_target(target): host, port = target url = f"http://{host}:{port}/API/" # url = f"http://{host}:{port}/" try: for k, v in data.items(): par = "SetText" if k.split(".")[1] == "Text" else "SetImage" params = { "Function": par, "Input": 51, "SelectedName": k, "Value": v, } # params = { # "action": "set_text", # "layer": f"SCORE STRIPE FLAGS INTRO\Data\{k}", # "value": str(v), # "channel": "live1", # } # params = { # "action": "set_data_source_query_parameter", # "data_source": "timer_data", # "parameter": k, # "value": v, # } # params = { "action" : "set_data_source_query_parameter", "data_source" : "XML/JSON Source 11", "parameter" : "TIMER", "value" : "London" } # print(url, params) response = self.session.get(url, params=params, timeout=5) # response = self.session.post(url, params=params) # print(response.headers) response.raise_for_status() # Проверяет, есть ли ошибки HTTP except requests.exceptions.RequestException as e: print(f"Error sending data to {host}:{port}: {e}") # Используем ThreadPoolExecutor для многопоточной отправки with ThreadPoolExecutor( max_workers=5 ) as executor: # max_workers можно настроить executor.map(send_to_target, self.targets) def check_vmix_connections(self): """Проверяем соединения для всех целей и обновляем индикаторы.""" for index, (host_entry, port_entry) in enumerate(self.target_entries): host = host_entry.get() port = port_entry.get() if not port.isdigit(): continue # Пропускаем некорректные порты connected = self.sender.check_vmix_connection(host, int(port)) self.update_connection_indicator(index, connected) def update_connection_indicator(self, index, connected): """Обновляем цвет индикатора для определенной цели.""" color = "green" if connected else "red" self.connection_indicators[index].itemconfig("indicator", fill=color) def check_vmix_connection(self, host, port): url = f"http://{host}:{port}/API/" # url = f"http://{host}:{port}" try: response = self.session.get(url, timeout=2) # Таймаут в секундах return response.status_code == 200 except requests.exceptions.RequestException: return False class DataProcessor: def process_protocol_a(self, data): return data.upper() def process_protocol_b(self, data): return data[::-1] def process_chine(self, data): cdata = binascii.hexlify(data) ddata = cdata.decode("utf-8", errors="replace").upper() edata = hexspace(ddata, 2) temp_data = edata.split() temp_new = [int(t, 16) for t in temp_data] # print(temp_new) timer = f"{temp_new[3]}:{temp_new[2]}" seconds = temp_new[5] milliseconds = str(temp_new[4])[0] if int(seconds) > 4: temp = float(f"{seconds}.{milliseconds}") if int(milliseconds) >= 5: seconds = int(seconds) + 1 else: seconds = seconds timer_attack = seconds timer_color = "#59358A" else: timer_attack = f"{seconds}.{milliseconds}" timer_color = "#FF6A13" if temp_new[0] in [16, 24]: timer_attack = "" timer_color = "#000000" data = { "12sec.Text": timer_attack, "12SecBackground.Fill.Color": timer_color, } return data def process_protocol_saratov(self, data): if data == "\xff": return cdata = binascii.hexlify(data) ddata = cdata.decode("utf-8").upper() edata = hexspace(ddata, 2) temp = edata.split("FF 52") for i in temp: try: minutes = int(i.split()[5], 16) seconds = int(i.split()[6], 16) timer_str = ( f"{minutes}:{seconds:02d}" if seconds < 60 else f"{minutes}.{seconds-128}" ) seconds_24 = int(i.split()[7], 16) timer_attack = "" timer_pic = "D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\\24Sec_Orange.png" if seconds_24 != 255: if seconds_24 > 127: seconds_24 -= 128 timer_attack = f"{seconds_24//10}.{seconds_24%10}" timer_pic = "D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\\24Sec_Red.png" else: timer_attack = seconds_24 quarter = int(i.split()[14][1], 16) timeout1 = int(i.split()[10], 16) timeout2 = int(i.split()[11], 16) score1 = int(i.split()[8], 16) score2 = int(i.split()[9], 16) foul1 = int(i.split()[12], 16) foul2 = int(i.split()[13], 16) data = { "TIMER.Text": timer_str, "ATTACK.Text": timer_attack, "Score_Home.Text": score1, "Score_Away.Text": score2, "fouls1.Source": f"D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\Home_{foul1}.png", "fouls2.Source": f"D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\Away_{foul2}.png", "24SECBACKGROUND.Source": timer_pic, } # data = { # "Timer": timer_str, # # "Attack": timer_attack, # # "Score1": score1, # # "Score2": score2, # # "fouls1.Source": f"D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\Home_{foul1}.png", # # "fouls2.Source": f"D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\Away_{foul2}.png", # # "24SECBACKGROUND.Source": timer_pic, # } except (IndexError, ValueError): continue return data class App: def __init__(self, root): self.receiver = DataReceiver() self.sender = DataSender() self.processor = DataProcessor() self.protocol_var = tk.StringVar(value="Protocol A") self.mode_var = tk.StringVar(value="Log File") self.read_mode_var = tk.StringVar(value="Automatic") self.timeout_var = tk.DoubleVar(value=0.5) self.create_ui(root) self.start_vmix_connection_check() def display_data(self, data): protocol = self.protocol_var.get() if protocol == "Protocol A": processed_data = self.processor.process_protocol_a(data) elif protocol == "Protocol B": processed_data = self.processor.process_protocol_b(data) elif protocol == "Saratov": processed_data = self.processor.process_protocol_saratov(data) elif protocol == "Chine": processed_data = self.processor.process_chine(data) else: processed_data = data if processed_data: self.update_send_status(True) self.sender.send(processed_data) self.update_send_status(False) self.log_text.insert(tk.END, f"{data}\n") self.log_text.see(tk.END) self.processed_data_text.insert(tk.END, f"{processed_data}\n") self.processed_data_text.see(tk.END) def create_ui(self, root): root.title("Data Processing Application") self.notebook = ttk.Notebook(root) self.notebook.pack(fill="both", expand=True) self.settings_frame = tk.Frame(self.notebook) self.game_frame = tk.Frame(self.notebook) self.notebook.add(self.settings_frame, text="Settings") self.notebook.add(self.game_frame, text="Game") self.create_settings_ui(self.settings_frame) self.create_game_ui(self.game_frame) def create_settings_ui(self, frame): frame_receive = tk.LabelFrame(frame, text="Receive Data") frame_receive.pack(fill="x", padx=5, pady=5) tk.Label(frame_receive, text="Select Mode:").pack(side="left", padx=5, pady=5) tk.OptionMenu( frame_receive, self.mode_var, "Log File", "TCP Connection", command=self.update_mode, ).pack(side="left", padx=5, pady=5) self.file_button = tk.Button( frame_receive, text="Select Log File", command=self.select_file ) self.tcp_frame = tk.Frame(frame_receive) tk.Label(self.tcp_frame, text="IP:").pack(side="left") self.tcp_ip_entry = tk.Entry(self.tcp_frame) self.tcp_ip_entry.pack(side="left", padx=5) self.tcp_ip_entry.insert(0, "192.168.127.254") # например, default IP адрес tk.Label(self.tcp_frame, text="Port:").pack(side="left") self.tcp_port_entry = tk.Entry(self.tcp_frame) self.tcp_port_entry.pack(side="left", padx=5) self.tcp_port_entry.insert(0, 1993) # например, default порт self.read_mode_frame = tk.Frame(frame_receive) tk.Label(self.read_mode_frame, text="Read Mode:").pack( side="left", padx=5, pady=5 ) tk.OptionMenu( self.read_mode_frame, self.read_mode_var, "Automatic", "Manual", command=self.update_read_mode, ).pack(side="left", padx=5, pady=5) tk.Label(self.read_mode_frame, text="Timeout (s):").pack( side="left", padx=5, pady=5 ) self.timeout_entry = tk.Entry( self.read_mode_frame, textvariable=self.timeout_var ) self.timeout_entry.pack(side="left", padx=5) self.manual_read_button = tk.Button( self.read_mode_frame, text="Read Next Line", command=self.read_next_line ) self.start_button = tk.Button( frame_receive, text="Start", command=self.start_receiving ) self.start_button.pack(side="left", padx=5, pady=5) tk.Button(frame_receive, text="Stop", command=self.stop_receiving).pack( side="left", padx=5, pady=5 ) frame_process = tk.LabelFrame(frame, text="Process Data") frame_process.pack(fill="x", padx=5, pady=5) tk.Label(frame_process, text="Select Protocol:").pack( side="left", padx=5, pady=5 ) tk.OptionMenu( frame_process, self.protocol_var, "Protocol A", "Protocol B", "Saratov", "Chine", ).pack(side="left", padx=5, pady=5) frame_send = tk.LabelFrame(frame, text="Send Data") frame_send.pack(fill="x", padx=5, pady=5) self.target_count_var = tk.IntVar(value=1) tk.Label(frame_send, text="Number of Targets:").pack( side="left", padx=5, pady=5 ) tk.Spinbox( frame_send, from_=1, to=10, textvariable=self.target_count_var, command=self.update_targets, ).pack(side="left", padx=5, pady=5) self.targets_frame = tk.Frame(frame_send) self.targets_frame.pack(fill="x", padx=5, pady=5) self.target_entries = [] self.update_targets() tk.Button(frame_send, text="Send Data", command=self.send_data).pack( side="left", padx=5, pady=5 ) self.update_mode(self.mode_var.get()) def create_game_ui(self, frame): frame_status = tk.LabelFrame(frame, text="Status") frame_status.pack(fill="x", padx=5, pady=5) # Canvas indicators self.receive_status_canvas = tk.Canvas(frame_status, width=20, height=20) self.receive_status_canvas.create_oval( 2, 2, 18, 18, fill="red", tags="indicator" ) self.receive_status_canvas.pack(side="left", padx=10, pady=5) tk.Label(frame_status, text="Receiving Data").pack(side="left") self.send_status_canvas = tk.Canvas(frame_status, width=20, height=20) self.send_status_canvas.create_oval(2, 2, 18, 18, fill="red", tags="indicator") self.send_status_canvas.pack(side="left", padx=10, pady=5) tk.Label(frame_status, text="Sending Data").pack(side="left") frame_log = tk.LabelFrame(frame, text="Log") frame_log.pack(fill="both", expand=True, padx=5, pady=5) self.log_text = tk.Text(frame_log, height=10, state="normal") self.log_text.pack(fill="both", expand=True, padx=5, pady=5) frame_processed = tk.LabelFrame(frame, text="Processed Data") frame_processed.pack(fill="both", expand=True, padx=5, pady=5) self.processed_data_text = tk.Text(frame_processed, height=10, state="normal") self.processed_data_text.pack(fill="both", expand=True, padx=5, pady=5) # self.vmix_status_indicator = tk.Canvas(frame_status, width=20, height=20) # self.vmix_status_indicator.pack(side="left", padx=5, pady=5) # self.update_vmix_indicator(False) # tk.Label(frame_status, text="vMix Connection").pack(side="left", padx=5, pady=5) # def update_vmix_indicator(self, connected): # self.vmix_status_indicator.delete("all") # color = "green" if connected else "red" # self.vmix_status_indicator.create_oval(2, 2, 18, 18, fill=color) # def check_vmix_connection(self): # if not self.target_entries: # return False # host_entry, port_entry = self.target_entries[0] # Проверяем первую цель # host = host_entry.get() # port = port_entry.get() # if not port.isdigit(): # return False # connected = self.sender.check_vmix_connection(host, int(port)) # self.update_vmix_indicator(connected) # return connected def check_vmix_connections(self): for index, (host_entry, port_entry) in enumerate(self.target_entries): host = host_entry.get() port = port_entry.get() if not port.isdigit(): continue # Пропускаем некорректные порты connected = self.sender.check_vmix_connection(host, int(port)) self.update_connection_indicator(index, connected) def update_connection_indicator(self, index, connected): color = "green" if connected else "red" self.connection_indicators[index].itemconfig("indicator", fill=color) def start_vmix_connection_check(self): """Периодически проверяем соединения.""" def check_loop(): while True: self.check_vmix_connections() time.sleep(1) threading.Thread(target=check_loop, daemon=True).start() def update_mode(self, mode): if mode == "Log File": self.file_button.pack(side="left", padx=5, pady=5) self.tcp_frame.pack_forget() self.read_mode_frame.pack(fill="x", padx=5, pady=5) self.start_button.config(state="normal") else: self.file_button.pack_forget() self.read_mode_frame.pack_forget() self.tcp_frame.pack(side="left", padx=5, pady=5) self.start_button.config(state="normal") def update_read_mode(self, mode): if mode == "Automatic": self.timeout_entry.config(state="normal") self.manual_read_button.pack_forget() self.receiver.manual_mode = False else: self.timeout_entry.config(state="disabled") self.manual_read_button.pack(side="left", padx=5, pady=5) self.receiver.manual_mode = True def select_file(self): self.receiver.file_path = filedialog.askopenfilename() def start_receiving(self): self.start_button.config(state="disabled") mode = self.mode_var.get() if mode == "Log File": if self.receiver.file_path: timeout = ( self.timeout_var.get() if self.read_mode_var.get() == "Automatic" else 0 ) self.receiver.start_file(self.display_data, timeout) self.update_receive_status(True) else: messagebox.showerror("Error", "No file selected.") elif mode == "TCP Connection": host = self.tcp_ip_entry.get() port = self.tcp_port_entry.get() if host and port.isdigit(): self.receiver.start_tcp(host, int(port), self.display_data) self.update_receive_status(True) else: messagebox.showerror("Error", "Invalid IP or port.") self.start_button.config(state="normal") def stop_receiving(self): self.receiver.stop() self.start_button.config(state="normal") self.update_receive_status(False) def read_next_line(self): self.receiver.read_next_line(self.display_data) def update_receive_status(self, active): color = "green" if active else "red" self.receive_status_canvas.itemconfig("indicator", fill=color) def update_send_status(self, active): color = "green" if active else "red" self.send_status_canvas.itemconfig("indicator", fill=color) def update_targets(self): for widget in self.targets_frame.winfo_children(): widget.destroy() self.target_entries = [] self.connection_indicators = [] # Храним индикаторы состояния for i in range(self.target_count_var.get()): target_frame = tk.Frame(self.targets_frame) target_frame.pack(fill="x", pady=2) tk.Label(target_frame, text=f"Target {i+1} Host:").pack(side="left") host_entry = tk.Entry(target_frame) host_entry.pack(side="left", padx=5) host_entry.insert(0, "127.0.0.1") tk.Label(target_frame, text="Port:").pack(side="left") port_entry = tk.Entry(target_frame) port_entry.pack(side="left", padx=5) port_entry.insert(0, 8088) indicator = tk.Canvas(target_frame, width=20, height=20) indicator.create_oval(2, 2, 18, 18, fill="red", tags="indicator") indicator.pack(side="left", padx=5) self.target_entries.append((host_entry, port_entry)) self.connection_indicators.append(indicator) def send_data(self): data = "Sample data to send" if not data: messagebox.showerror("No Data", "No data to send.") return self.sender.targets = [] for host_entry, port_entry in self.target_entries: host = host_entry.get() port = port_entry.get() if host and port.isdigit(): self.sender.add_target(host, int(port)) # self.update_send_status(True) self.sender.send(data) # self.update_send_status(False) if __name__ == "__main__": root = tk.Tk() app = App(root) root.mainloop()