Files
TIMERS/timer_app4.py

621 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import filedialog, messagebox, ttk
import threading
import socket
import time
import binascii
import requests
from concurrent.futures import ThreadPoolExecutor
def hexspace(data, n):
return " ".join([data[i : i + n] for i in range(0, len(data), n)])
class DataReceiver:
def __init__(self):
self.running = False
self.file_mode = False
self.file_path = None
self.manual_mode = False
def start_tcp(self, host, port, callback):
self.running = True
def run():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
# Подключаемся к серверу
s.connect((host, port))
# Отправляем строку "hello" в виде байтов
s.sendall("hello".encode("utf-8"))
# Основной цикл для получения данных
while self.running:
try:
# Получаем данные от сервера (максимум 1024 байта)
data = s.recv(1024)
if data:
try:
# Преобразуем байты в строку UTF-8
decoded_data = data
# Вызываем callback с декодированными данными
callback(decoded_data)
except UnicodeDecodeError:
print(f"Ошибка декодирования данных: {data}")
callback(data.decode("latin1", errors="replace"))
else:
# Если data пустое, соединение закрылось
print("Сервер закрыл соединение.")
break
except (socket.error, OSError) as e:
print(f"Ошибка приёма данных: {e}")
break
except Exception as e:
print(f"Ошибка подключения: {e}")
finally:
print("Соединение закрыто.")
self.running = False
threading.Thread(target=run, daemon=True).start()
def start_file(self, callback, timeout):
self.running = True
def run():
if not self.file_path:
return
with open(self.file_path, "r") as f:
while self.running:
if self.manual_mode:
continue
line = f.readline()
if line:
callback(eval(line.strip().split(" DEBUG ")[1]))
time.sleep(timeout)
threading.Thread(target=run, daemon=True).start()
def read_next_line(self, callback):
if not self.file_path:
return
with open(self.file_path, "r") as f:
line = f.readline()
if line:
callback(eval(line.strip().split(" DEBUG ")[1]))
def stop(self):
self.running = False
class DataSender:
def __init__(self):
self.targets = []
self.session = requests.Session()
self.session.headers.update({"Connection": "keep-alive"})
def add_target(self, host, port):
self.targets.append((host, port))
def send(self, data):
"""Многопоточная отправка данных на все указанные цели."""
if not self.targets:
print("No targets available to send data.")
return
def send_to_target(target):
host, port = target
url = f"http://{host}:{port}/API/"
# url = f"http://{host}:{port}/"
try:
for k, v in data.items():
par = "SetText" if k.split(".")[1] == "Text" else "SetImage"
params = {
"Function": par,
"Input": 51,
"SelectedName": k,
"Value": v,
}
# params = {
# "action": "set_text",
# "layer": f"SCORE STRIPE FLAGS INTRO\Data\{k}",
# "value": str(v),
# "channel": "live1",
# }
# params = {
# "action": "set_data_source_query_parameter",
# "data_source": "timer_data",
# "parameter": k,
# "value": v,
# }
# params = { "action" : "set_data_source_query_parameter", "data_source" : "XML/JSON Source 11", "parameter" : "TIMER", "value" : "London" }
# print(url, params)
response = self.session.get(url, params=params, timeout=5)
# response = self.session.post(url, params=params)
# print(response.headers)
response.raise_for_status() # Проверяет, есть ли ошибки HTTP
except requests.exceptions.RequestException as e:
print(f"Error sending data to {host}:{port}: {e}")
# Используем ThreadPoolExecutor для многопоточной отправки
with ThreadPoolExecutor(
max_workers=5
) as executor: # max_workers можно настроить
executor.map(send_to_target, self.targets)
def check_vmix_connections(self):
"""Проверяем соединения для всех целей и обновляем индикаторы."""
for index, (host_entry, port_entry) in enumerate(self.target_entries):
host = host_entry.get()
port = port_entry.get()
if not port.isdigit():
continue # Пропускаем некорректные порты
connected = self.sender.check_vmix_connection(host, int(port))
self.update_connection_indicator(index, connected)
def update_connection_indicator(self, index, connected):
"""Обновляем цвет индикатора для определенной цели."""
color = "green" if connected else "red"
self.connection_indicators[index].itemconfig("indicator", fill=color)
def check_vmix_connection(self, host, port):
url = f"http://{host}:{port}/API/"
# url = f"http://{host}:{port}"
try:
response = self.session.get(url, timeout=2) # Таймаут в секундах
return response.status_code == 200
except requests.exceptions.RequestException:
return False
class DataProcessor:
def process_protocol_a(self, data):
return data.upper()
def process_protocol_b(self, data):
return data[::-1]
def process_chine(self, data):
cdata = binascii.hexlify(data)
ddata = cdata.decode("utf-8", errors="replace").upper()
edata = hexspace(ddata, 2)
temp_data = edata.split()
temp_new = [int(t, 16) for t in temp_data]
# print(temp_new)
timer = f"{temp_new[3]}:{temp_new[2]}"
seconds = temp_new[5]
milliseconds = str(temp_new[4])[0]
if int(seconds) > 4:
temp = float(f"{seconds}.{milliseconds}")
if int(milliseconds) >= 5:
seconds = int(seconds) + 1
else:
seconds = seconds
timer_attack = seconds
timer_color = "#59358A"
else:
timer_attack = f"{seconds}.{milliseconds}"
timer_color = "#FF6A13"
if temp_new[0] in [16, 24]:
timer_attack = ""
timer_color = "#000000"
data = {
"12sec.Text": timer_attack,
"12SecBackground.Fill.Color": timer_color,
}
return data
def process_protocol_saratov(self, data):
if data == "\xff":
return
cdata = binascii.hexlify(data)
ddata = cdata.decode("utf-8").upper()
edata = hexspace(ddata, 2)
temp = edata.split("FF 52")
for i in temp:
try:
minutes = int(i.split()[5], 16)
seconds = int(i.split()[6], 16)
timer_str = (
f"{minutes}:{seconds:02d}"
if seconds < 60
else f"{minutes}.{seconds-128}"
)
seconds_24 = int(i.split()[7], 16)
timer_attack = ""
timer_pic = "D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\\24Sec_Orange.png"
if seconds_24 != 255:
if seconds_24 > 127:
seconds_24 -= 128
timer_attack = f"{seconds_24//10}.{seconds_24%10}"
timer_pic = "D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\\24Sec_Red.png"
else:
timer_attack = seconds_24
quarter = int(i.split()[14][1], 16)
timeout1 = int(i.split()[10], 16)
timeout2 = int(i.split()[11], 16)
score1 = int(i.split()[8], 16)
score2 = int(i.split()[9], 16)
foul1 = int(i.split()[12], 16)
foul2 = int(i.split()[13], 16)
data = {
"TIMER.Text": timer_str,
"ATTACK.Text": timer_attack,
"Score_Home.Text": score1,
"Score_Away.Text": score2,
"fouls1.Source": f"D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\Home_{foul1}.png",
"fouls2.Source": f"D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\Away_{foul2}.png",
"24SECBACKGROUND.Source": timer_pic,
}
# data = {
# "Timer": timer_str,
# # "Attack": timer_attack,
# # "Score1": score1,
# # "Score2": score2,
# # "fouls1.Source": f"D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\Home_{foul1}.png",
# # "fouls2.Source": f"D:\Графика\БАСКЕТБОЛ\ЕДИНАЯ ЛИГА ВТБ 2022-2023\Scorebug Indicators\Away_{foul2}.png",
# # "24SECBACKGROUND.Source": timer_pic,
# }
except (IndexError, ValueError):
continue
return data
class App:
def __init__(self, root):
self.receiver = DataReceiver()
self.sender = DataSender()
self.processor = DataProcessor()
self.protocol_var = tk.StringVar(value="Protocol A")
self.mode_var = tk.StringVar(value="Log File")
self.read_mode_var = tk.StringVar(value="Automatic")
self.timeout_var = tk.DoubleVar(value=0.5)
self.create_ui(root)
self.start_vmix_connection_check()
def display_data(self, data):
protocol = self.protocol_var.get()
if protocol == "Protocol A":
processed_data = self.processor.process_protocol_a(data)
elif protocol == "Protocol B":
processed_data = self.processor.process_protocol_b(data)
elif protocol == "Saratov":
processed_data = self.processor.process_protocol_saratov(data)
elif protocol == "Chine":
processed_data = self.processor.process_chine(data)
else:
processed_data = data
if processed_data:
self.update_send_status(True)
self.sender.send(processed_data)
self.update_send_status(False)
self.log_text.insert(tk.END, f"{data}\n")
self.log_text.see(tk.END)
self.processed_data_text.insert(tk.END, f"{processed_data}\n")
self.processed_data_text.see(tk.END)
def create_ui(self, root):
root.title("Data Processing Application")
self.notebook = ttk.Notebook(root)
self.notebook.pack(fill="both", expand=True)
self.settings_frame = tk.Frame(self.notebook)
self.game_frame = tk.Frame(self.notebook)
self.notebook.add(self.settings_frame, text="Settings")
self.notebook.add(self.game_frame, text="Game")
self.create_settings_ui(self.settings_frame)
self.create_game_ui(self.game_frame)
def create_settings_ui(self, frame):
frame_receive = tk.LabelFrame(frame, text="Receive Data")
frame_receive.pack(fill="x", padx=5, pady=5)
tk.Label(frame_receive, text="Select Mode:").pack(side="left", padx=5, pady=5)
tk.OptionMenu(
frame_receive,
self.mode_var,
"Log File",
"TCP Connection",
command=self.update_mode,
).pack(side="left", padx=5, pady=5)
self.file_button = tk.Button(
frame_receive, text="Select Log File", command=self.select_file
)
self.tcp_frame = tk.Frame(frame_receive)
tk.Label(self.tcp_frame, text="IP:").pack(side="left")
self.tcp_ip_entry = tk.Entry(self.tcp_frame)
self.tcp_ip_entry.pack(side="left", padx=5)
self.tcp_ip_entry.insert(0, "192.168.127.254") # например, default IP адрес
tk.Label(self.tcp_frame, text="Port:").pack(side="left")
self.tcp_port_entry = tk.Entry(self.tcp_frame)
self.tcp_port_entry.pack(side="left", padx=5)
self.tcp_port_entry.insert(0, 1993) # например, default порт
self.read_mode_frame = tk.Frame(frame_receive)
tk.Label(self.read_mode_frame, text="Read Mode:").pack(
side="left", padx=5, pady=5
)
tk.OptionMenu(
self.read_mode_frame,
self.read_mode_var,
"Automatic",
"Manual",
command=self.update_read_mode,
).pack(side="left", padx=5, pady=5)
tk.Label(self.read_mode_frame, text="Timeout (s):").pack(
side="left", padx=5, pady=5
)
self.timeout_entry = tk.Entry(
self.read_mode_frame, textvariable=self.timeout_var
)
self.timeout_entry.pack(side="left", padx=5)
self.manual_read_button = tk.Button(
self.read_mode_frame, text="Read Next Line", command=self.read_next_line
)
self.start_button = tk.Button(
frame_receive, text="Start", command=self.start_receiving
)
self.start_button.pack(side="left", padx=5, pady=5)
tk.Button(frame_receive, text="Stop", command=self.stop_receiving).pack(
side="left", padx=5, pady=5
)
frame_process = tk.LabelFrame(frame, text="Process Data")
frame_process.pack(fill="x", padx=5, pady=5)
tk.Label(frame_process, text="Select Protocol:").pack(
side="left", padx=5, pady=5
)
tk.OptionMenu(
frame_process,
self.protocol_var,
"Protocol A",
"Protocol B",
"Saratov",
"Chine",
).pack(side="left", padx=5, pady=5)
frame_send = tk.LabelFrame(frame, text="Send Data")
frame_send.pack(fill="x", padx=5, pady=5)
self.target_count_var = tk.IntVar(value=1)
tk.Label(frame_send, text="Number of Targets:").pack(
side="left", padx=5, pady=5
)
tk.Spinbox(
frame_send,
from_=1,
to=10,
textvariable=self.target_count_var,
command=self.update_targets,
).pack(side="left", padx=5, pady=5)
self.targets_frame = tk.Frame(frame_send)
self.targets_frame.pack(fill="x", padx=5, pady=5)
self.target_entries = []
self.update_targets()
tk.Button(frame_send, text="Send Data", command=self.send_data).pack(
side="left", padx=5, pady=5
)
self.update_mode(self.mode_var.get())
def create_game_ui(self, frame):
frame_status = tk.LabelFrame(frame, text="Status")
frame_status.pack(fill="x", padx=5, pady=5)
# Canvas indicators
self.receive_status_canvas = tk.Canvas(frame_status, width=20, height=20)
self.receive_status_canvas.create_oval(
2, 2, 18, 18, fill="red", tags="indicator"
)
self.receive_status_canvas.pack(side="left", padx=10, pady=5)
tk.Label(frame_status, text="Receiving Data").pack(side="left")
self.send_status_canvas = tk.Canvas(frame_status, width=20, height=20)
self.send_status_canvas.create_oval(2, 2, 18, 18, fill="red", tags="indicator")
self.send_status_canvas.pack(side="left", padx=10, pady=5)
tk.Label(frame_status, text="Sending Data").pack(side="left")
frame_log = tk.LabelFrame(frame, text="Log")
frame_log.pack(fill="both", expand=True, padx=5, pady=5)
self.log_text = tk.Text(frame_log, height=10, state="normal")
self.log_text.pack(fill="both", expand=True, padx=5, pady=5)
frame_processed = tk.LabelFrame(frame, text="Processed Data")
frame_processed.pack(fill="both", expand=True, padx=5, pady=5)
self.processed_data_text = tk.Text(frame_processed, height=10, state="normal")
self.processed_data_text.pack(fill="both", expand=True, padx=5, pady=5)
# self.vmix_status_indicator = tk.Canvas(frame_status, width=20, height=20)
# self.vmix_status_indicator.pack(side="left", padx=5, pady=5)
# self.update_vmix_indicator(False)
# tk.Label(frame_status, text="vMix Connection").pack(side="left", padx=5, pady=5)
# def update_vmix_indicator(self, connected):
# self.vmix_status_indicator.delete("all")
# color = "green" if connected else "red"
# self.vmix_status_indicator.create_oval(2, 2, 18, 18, fill=color)
# def check_vmix_connection(self):
# if not self.target_entries:
# return False
# host_entry, port_entry = self.target_entries[0] # Проверяем первую цель
# host = host_entry.get()
# port = port_entry.get()
# if not port.isdigit():
# return False
# connected = self.sender.check_vmix_connection(host, int(port))
# self.update_vmix_indicator(connected)
# return connected
def check_vmix_connections(self):
for index, (host_entry, port_entry) in enumerate(self.target_entries):
host = host_entry.get()
port = port_entry.get()
if not port.isdigit():
continue # Пропускаем некорректные порты
connected = self.sender.check_vmix_connection(host, int(port))
self.update_connection_indicator(index, connected)
def update_connection_indicator(self, index, connected):
color = "green" if connected else "red"
self.connection_indicators[index].itemconfig("indicator", fill=color)
def start_vmix_connection_check(self):
"""Периодически проверяем соединения."""
def check_loop():
while True:
self.check_vmix_connections()
time.sleep(1)
threading.Thread(target=check_loop, daemon=True).start()
def update_mode(self, mode):
if mode == "Log File":
self.file_button.pack(side="left", padx=5, pady=5)
self.tcp_frame.pack_forget()
self.read_mode_frame.pack(fill="x", padx=5, pady=5)
self.start_button.config(state="normal")
else:
self.file_button.pack_forget()
self.read_mode_frame.pack_forget()
self.tcp_frame.pack(side="left", padx=5, pady=5)
self.start_button.config(state="normal")
def update_read_mode(self, mode):
if mode == "Automatic":
self.timeout_entry.config(state="normal")
self.manual_read_button.pack_forget()
self.receiver.manual_mode = False
else:
self.timeout_entry.config(state="disabled")
self.manual_read_button.pack(side="left", padx=5, pady=5)
self.receiver.manual_mode = True
def select_file(self):
self.receiver.file_path = filedialog.askopenfilename()
def start_receiving(self):
self.start_button.config(state="disabled")
mode = self.mode_var.get()
if mode == "Log File":
if self.receiver.file_path:
timeout = (
self.timeout_var.get()
if self.read_mode_var.get() == "Automatic"
else 0
)
self.receiver.start_file(self.display_data, timeout)
self.update_receive_status(True)
else:
messagebox.showerror("Error", "No file selected.")
elif mode == "TCP Connection":
host = self.tcp_ip_entry.get()
port = self.tcp_port_entry.get()
if host and port.isdigit():
self.receiver.start_tcp(host, int(port), self.display_data)
self.update_receive_status(True)
else:
messagebox.showerror("Error", "Invalid IP or port.")
self.start_button.config(state="normal")
def stop_receiving(self):
self.receiver.stop()
self.start_button.config(state="normal")
self.update_receive_status(False)
def read_next_line(self):
self.receiver.read_next_line(self.display_data)
def update_receive_status(self, active):
color = "green" if active else "red"
self.receive_status_canvas.itemconfig("indicator", fill=color)
def update_send_status(self, active):
color = "green" if active else "red"
self.send_status_canvas.itemconfig("indicator", fill=color)
def update_targets(self):
for widget in self.targets_frame.winfo_children():
widget.destroy()
self.target_entries = []
self.connection_indicators = [] # Храним индикаторы состояния
for i in range(self.target_count_var.get()):
target_frame = tk.Frame(self.targets_frame)
target_frame.pack(fill="x", pady=2)
tk.Label(target_frame, text=f"Target {i+1} Host:").pack(side="left")
host_entry = tk.Entry(target_frame)
host_entry.pack(side="left", padx=5)
host_entry.insert(0, "127.0.0.1")
tk.Label(target_frame, text="Port:").pack(side="left")
port_entry = tk.Entry(target_frame)
port_entry.pack(side="left", padx=5)
port_entry.insert(0, 8088)
indicator = tk.Canvas(target_frame, width=20, height=20)
indicator.create_oval(2, 2, 18, 18, fill="red", tags="indicator")
indicator.pack(side="left", padx=5)
self.target_entries.append((host_entry, port_entry))
self.connection_indicators.append(indicator)
def send_data(self):
data = "Sample data to send"
if not data:
messagebox.showerror("No Data", "No data to send.")
return
self.sender.targets = []
for host_entry, port_entry in self.target_entries:
host = host_entry.get()
port = port_entry.get()
if host and port.isdigit():
self.sender.add_target(host, int(port))
# self.update_send_status(True)
self.sender.send(data)
# self.update_send_status(False)
if __name__ == "__main__":
root = tk.Tk()
app = App(root)
root.mainloop()