154 lines
4.3 KiB
Python
154 lines
4.3 KiB
Python
import sys
|
|
from socket import *
|
|
from datetime import datetime
|
|
import logging
|
|
import os
|
|
import time
|
|
import requests
|
|
|
|
|
|
SETTING = "9600,RS-232"
|
|
HOST = "192.168.127.254"
|
|
PORT = 1993
|
|
URL = "http://127.0.0.1:8088/API/"
|
|
INPUT = ["ТАЙМЕР МУЖЧИНЫ", "ТАЙМЕР ЖЕНЩИНЫ"]
|
|
|
|
|
|
def send_data(data, name):
|
|
for i in INPUT:
|
|
params = {
|
|
"Function": "SetText",
|
|
"Input": i,
|
|
"SelectedName": f"{name}.Text",
|
|
"Value": data,
|
|
}
|
|
requests.get(URL, params=params)
|
|
|
|
|
|
def win(name):
|
|
params = {
|
|
"Function": "SetTextColour",
|
|
"Input": INPUT,
|
|
"SelectedName": f"{name}.Text",
|
|
"Value": "Green",
|
|
}
|
|
requests.get(URL, params=params)
|
|
|
|
|
|
def reset(name):
|
|
params = {
|
|
"Function": "SetTextColour",
|
|
"Input": INPUT,
|
|
"SelectedName": f"{name}.Text",
|
|
"Value": "white",
|
|
}
|
|
requests.get(URL, params=params)
|
|
|
|
|
|
def func_new(i, name, tag):
|
|
data = (
|
|
i.split(f"{tag}<")[1]
|
|
.replace("0'0", "")
|
|
.replace("0'", "")
|
|
.replace("10>", "")
|
|
.split()[0]
|
|
)
|
|
if "FALSE START" in i:
|
|
send_data("FS", name)
|
|
elif "OK NO FS" in i:
|
|
pass
|
|
else:
|
|
send_data(data, name)
|
|
# if "WIN" in i:
|
|
# win(name)
|
|
|
|
|
|
def parse(line):
|
|
decode_line = line.decode("utf-8")
|
|
print(decode_line)
|
|
temp_list = decode_line.split("<E>")
|
|
name1, name2, name3, name4 = "Time1", "Time2", "Reaction1", "Reaction2"
|
|
for i in temp_list:
|
|
if i != "":
|
|
if "READY" in i:
|
|
send_data('0"000', name1)
|
|
send_data('0"000', name2)
|
|
send_data("", name3)
|
|
send_data("", name4)
|
|
# reset(name1)
|
|
# reset(name2)
|
|
|
|
if not any(char in i[7] for char in ["R", "G", "B", "C", "Y", "M", "A"]):
|
|
if "<ID01>" in i and not any(char in i for char in ["$", "@", "%"]):
|
|
func_new(i, name1, "<ID01>")
|
|
elif "<ID02>" in i and not any(char in i for char in ["$", "@", "%"]):
|
|
func_new(i, name2, "<ID02>")
|
|
elif "<ID03>" in i:
|
|
func_new(i, name3, "<ID03>")
|
|
elif "<ID04>" in i:
|
|
func_new(i, name4, "<ID04>")
|
|
|
|
|
|
def read_logs():
|
|
with open("C:\Code\LOGS\\timer_NPORT_tcp_2024-03-29_13-24-22.log", "r") as file:
|
|
timestamp_temp = ""
|
|
diff_time = 0
|
|
for line in file:
|
|
parts = line.strip().split(" DEBUG ")
|
|
if len(parts) == 2:
|
|
timestamp = parts[0]
|
|
if timestamp_temp == "":
|
|
timestamp_temp = timestamp
|
|
else:
|
|
time_format = "%Y-%m-%d %H:%M:%S,%f"
|
|
timestamp_datetime = datetime.strptime(timestamp, time_format)
|
|
timestamp_temp_datetime = datetime.strptime(
|
|
timestamp_temp, time_format
|
|
)
|
|
delta = timestamp_datetime - timestamp_temp_datetime
|
|
seconds = delta.total_seconds() # Convert timedelta to seconds
|
|
timestamp_temp = timestamp
|
|
time.sleep(abs(seconds - diff_time))
|
|
data = eval(parts[1])
|
|
start_time = time.time()
|
|
parse(data)
|
|
end_time = time.time()
|
|
diff_time = end_time - start_time
|
|
|
|
|
|
def main():
|
|
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
if not os.path.isdir("LOGS"):
|
|
os.mkdir("LOGS")
|
|
|
|
LogFileName = f"LOGS/timer_NPORT_tcp_{current_time}.log"
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
filename=LogFileName,
|
|
filemode="w",
|
|
)
|
|
|
|
try:
|
|
tcp_socket = socket(AF_INET, SOCK_STREAM)
|
|
tcp_socket.connect((HOST, PORT))
|
|
data = str.encode("hello")
|
|
tcp_socket.send(data)
|
|
data = bytes.decode(data)
|
|
while True:
|
|
data = tcp_socket.recv(1024)
|
|
logging.debug(data)
|
|
parse(data)
|
|
except KeyboardInterrupt:
|
|
tcp_socket.close()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
# while True:
|
|
# read_logs()
|
|
except KeyboardInterrupt:
|
|
sys.exit(1)
|