253 lines
10 KiB
Python
253 lines
10 KiB
Python
import gi
|
||
gi.require_version('Gtk', '4.0')
|
||
from gi.repository import Gtk, GLib
|
||
import serial
|
||
import serial.tools.list_ports
|
||
import threading
|
||
import time
|
||
import datetime
|
||
import os
|
||
|
||
class MyWindow(Gtk.ApplicationWindow):
|
||
|
||
def __init__(self, app):
|
||
super().__init__(application=app)
|
||
self.set_title("My program")
|
||
self.set_default_size(800, 400) # Adjust size as needed
|
||
|
||
self.port = None # Serial port object
|
||
self.running = False # Flag to control the reading thread
|
||
self.log_file_path = ""
|
||
self.log_file = None
|
||
|
||
# Channel statistics (for demonstration)
|
||
self.ch1_recv = 0
|
||
self.ch1_ovr = 0
|
||
self.ch2_recv = 0
|
||
self.ch2_ovr = 0
|
||
|
||
# Create widgets
|
||
self.port_state_label = Gtk.Label(label="Disconnected")
|
||
# Use Gtk.DropDown with Gtk.StringList and Gtk.SingleSelection
|
||
self.ports_list = Gtk.StringList()
|
||
self.ports_combo = Gtk.DropDown.new(model=self.ports_list, expression=None)
|
||
self.ports_combo.set_hexpand(True) # expand
|
||
self.ports_selection = Gtk.SingleSelection(model=self.ports_list)
|
||
self.ports_combo.set_model(self.ports_selection)
|
||
|
||
self.populate_ports_combo() # Populate the combo box initially
|
||
|
||
self.refresh_ports_button = Gtk.Button(label="Обновить") # New refresh button
|
||
self.refresh_ports_button.connect("clicked", self.on_refresh_ports_clicked)
|
||
|
||
self.connect_button = Gtk.Button(label="Подключиться")
|
||
self.connect_button.connect("clicked", self.on_connect_button_clicked)
|
||
|
||
self.log_file_label = Gtk.Label(label="Файл логов:")
|
||
self.log_file_entry = Gtk.Entry()
|
||
self.log_file_entry.set_hexpand(True) # expand
|
||
self.log_file_entry.set_text(self.generate_default_log_filename())
|
||
|
||
# Grid labels
|
||
self.ch1_recv_label = Gtk.Label(label="0")
|
||
self.ch1_ovr_label = Gtk.Label(label="0")
|
||
self.ch1_ovr_percent_label = Gtk.Label(label="0%")
|
||
self.ch2_recv_label = Gtk.Label(label="0")
|
||
self.ch2_ovr_label = Gtk.Label(label="0")
|
||
self.ch2_ovr_percent_label = Gtk.Label(label="0%")
|
||
|
||
# Create layout
|
||
hbox1 = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
|
||
hbox1.append(self.port_state_label)
|
||
hbox1.append(self.ports_combo)
|
||
hbox1.append(self.refresh_ports_button) # Add the refresh button
|
||
hbox1.append(self.connect_button)
|
||
|
||
hbox2 = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
|
||
hbox2.append(self.log_file_label)
|
||
hbox2.append(self.log_file_entry)
|
||
|
||
grid = Gtk.Grid()
|
||
grid.set_column_spacing(5)
|
||
grid.set_row_spacing(5)
|
||
|
||
# Headers
|
||
grid.attach(Gtk.Label(label="CH#"), 0, 0, 1, 1)
|
||
grid.attach(Gtk.Label(label="recv"), 1, 0, 1, 1)
|
||
grid.attach(Gtk.Label(label="ovr"), 2, 0, 1, 1)
|
||
grid.attach(Gtk.Label(label="ovr%"), 3, 0, 1, 1)
|
||
|
||
# Channel 1
|
||
grid.attach(Gtk.Label(label="CH1"), 0, 1, 1, 1)
|
||
grid.attach(self.ch1_recv_label, 1, 1, 1, 1)
|
||
grid.attach(self.ch1_ovr_label, 2, 1, 1, 1)
|
||
grid.attach(self.ch1_ovr_percent_label, 3, 1, 1, 1)
|
||
|
||
# Channel 2
|
||
grid.attach(Gtk.Label(label="CH2"), 0, 2, 1, 1)
|
||
grid.attach(self.ch2_recv_label, 1, 2, 1, 1)
|
||
grid.attach(self.ch2_ovr_label, 2, 2, 1, 1)
|
||
grid.attach(self.ch2_ovr_percent_label, 3, 2, 1, 1)
|
||
|
||
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
|
||
vbox.set_margin_top(10)
|
||
vbox.set_margin_bottom(10)
|
||
vbox.set_margin_start(10)
|
||
vbox.set_margin_end(10)
|
||
vbox.append(hbox1)
|
||
vbox.append(Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL)) # Spacer
|
||
vbox.append(hbox2)
|
||
vbox.append(grid)
|
||
|
||
self.set_child(vbox)
|
||
|
||
def populate_ports_combo(self):
|
||
"""Populates the combo box with available COM ports."""
|
||
ports = serial.tools.list_ports.comports()
|
||
# Clear the existing items
|
||
while self.ports_list.get_n_items() > 0:
|
||
self.ports_list.remove(0) # remove each element by index
|
||
|
||
for port, desc, hwid in sorted(ports):
|
||
self.ports_list.append(port) # append the port string directly
|
||
|
||
def on_refresh_ports_clicked(self, button):
|
||
"""Handles the refresh ports button click."""
|
||
self.populate_ports_combo()
|
||
|
||
def generate_default_log_filename(self):
|
||
"""Generates a default log filename with timestamp."""
|
||
now = datetime.datetime.now()
|
||
return f"log{now.strftime('%Y%m%d_%H%M%S')}.log"
|
||
|
||
def on_connect_button_clicked(self, button):
|
||
"""Handles the connect/disconnect button click."""
|
||
if not self.running: # Connect
|
||
selected_index = self.ports_selection.get_selected()
|
||
if selected_index is not None:
|
||
selected_port = self.ports_list.get_item(selected_index).get_string() # This gets a Gtk.StringObject
|
||
if isinstance(selected_port, str): # Check if the retrieved value is a string
|
||
try:
|
||
self.port = serial.Serial(selected_port, baudrate=115200) # Adjust baudrate as needed
|
||
self.log_file_path = self.log_file_entry.get_text()
|
||
self.log_file = open(self.log_file_path, "a")
|
||
|
||
self.running = True
|
||
self.connect_button.set_label("Отключиться")
|
||
self.port_state_label.set_label("Подключено")
|
||
self.read_thread = threading.Thread(target=self.read_serial_data)
|
||
self.read_thread.daemon = True # Thread exits when main program exits
|
||
self.read_thread.start()
|
||
|
||
except serial.SerialException as e:
|
||
print(f"Error opening serial port: {e}")
|
||
self.show_error_dialog(f"Ошибка открытия порта: {e}")
|
||
else:
|
||
self.show_error_dialog("Ошибка: Не удалось получить название порта.")
|
||
else: # Disconnect
|
||
self.running = False # Stop the reading thread
|
||
if self.port:
|
||
self.port.close()
|
||
self.port = None
|
||
if self.log_file:
|
||
self.log_file.close()
|
||
self.log_file = None
|
||
self.connect_button.set_label("Подключиться")
|
||
self.port_state_label.set_label("Disconnected")
|
||
|
||
def read_serial_data(self):
|
||
"""Reads data from the serial port in a separate thread."""
|
||
while self.running and self.port and self.port.is_open:
|
||
try:
|
||
if self.port.in_waiting > 0:
|
||
while self.port.in_waiting > 0:
|
||
line = self.port.readline().decode('utf-8').rstrip()
|
||
self.process_data(line)
|
||
GLib.idle_add(self.update_gui) # Update GUI from main thread
|
||
|
||
except serial.SerialException as e:
|
||
print(f"Serial port error: {e}")
|
||
GLib.idle_add(self.show_error_dialog, f"Ошибка последовательного порта: {e}") # Show from main thread
|
||
GLib.idle_add(self.disconnect_from_serial)
|
||
break # Exit the loop
|
||
|
||
except Exception as e:
|
||
print(f"Error reading {e}")
|
||
GLib.idle_add(self.show_error_dialog, f"Ошибка чтения данных: {e}")
|
||
GLib.idle_add(self.disconnect_from_serial)
|
||
break # Exit the loop
|
||
|
||
time.sleep(0.01) # Avoid busy-waiting
|
||
|
||
def process_data(self, data):
|
||
"""Processes the received data (stub implementation)."""
|
||
if data.startswith("D1;"):
|
||
self.ch1_recv += 1
|
||
elif data.startswith("D2;"):
|
||
self.ch2_recv += 1
|
||
elif data.startswith("D1OVR;"):
|
||
self.ch1_ovr += int(data.lstrip()[6:], 16)
|
||
elif data.startswith("D2OVR;"):
|
||
self.ch2_ovr += int(data.lstrip()[6:], 16)
|
||
|
||
if self.log_file:
|
||
try:
|
||
self.log_file.write(data + "\n")
|
||
self.log_file.flush() # Ensure data is written immediately
|
||
except Exception as e:
|
||
print(f"Error writing to log file: {e}")
|
||
GLib.idle_add(self.show_error_dialog, f"Ошибка записи в лог-файл: {e}")
|
||
|
||
def update_gui(self):
|
||
"""Updates the GUI elements with the latest data."""
|
||
|
||
def calculate_percentage(recv, ovr):
|
||
if recv == 0:
|
||
return "0%"
|
||
return f"{round((ovr / recv) * 100, 2)}%"
|
||
|
||
self.ch1_recv_label.set_label(str(self.ch1_recv))
|
||
self.ch1_ovr_label.set_label(str(self.ch1_ovr))
|
||
self.ch1_ovr_percent_label.set_label(calculate_percentage(self.ch1_recv, self.ch1_ovr))
|
||
|
||
self.ch2_recv_label.set_label(str(self.ch2_recv))
|
||
self.ch2_ovr_label.set_label(str(self.ch2_ovr))
|
||
self.ch2_ovr_percent_label.set_label(calculate_percentage(self.ch2_recv, self.ch2_ovr))
|
||
|
||
def disconnect_from_serial(self):
|
||
"""Disconnects from the serial port and updates the GUI."""
|
||
self.running = False
|
||
if self.port:
|
||
self.port.close()
|
||
self.port = None
|
||
if self.log_file:
|
||
self.log_file.close()
|
||
self.log_file = None
|
||
self.connect_button.set_label("Подключиться")
|
||
self.port_state_label.set_label("Disconnected")
|
||
|
||
def show_error_dialog(self, message):
|
||
"""Shows an error dialog with the given message."""
|
||
dialog = Gtk.MessageDialog(
|
||
transient_for=self,
|
||
message_type=Gtk.MessageType.ERROR,
|
||
buttons=Gtk.ButtonsType.OK,
|
||
text="Error",
|
||
)
|
||
dialog.format_secondary_text(message)
|
||
dialog.run()
|
||
dialog.destroy()
|
||
|
||
|
||
class MyApplication(Gtk.Application):
|
||
def __init__(self):
|
||
super().__init__(application_id="com.example.myprogram")
|
||
|
||
def do_activate(self):
|
||
win = MyWindow(self)
|
||
win.present()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app = MyApplication()
|
||
app.run(None) |