253 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gi
gi.require_version('Gtk', '4.0')
from gi.repository import Gtk, GLib
import serial
import serial.tools.list_ports
import threading
import time
import datetime
import os
class MyWindow(Gtk.ApplicationWindow):
def __init__(self, app):
super().__init__(application=app)
self.set_title("My program")
self.set_default_size(800, 400) # Adjust size as needed
self.port = None # Serial port object
self.running = False # Flag to control the reading thread
self.log_file_path = ""
self.log_file = None
# Channel statistics (for demonstration)
self.ch1_recv = 0
self.ch1_ovr = 0
self.ch2_recv = 0
self.ch2_ovr = 0
# Create widgets
self.port_state_label = Gtk.Label(label="Disconnected")
# Use Gtk.DropDown with Gtk.StringList and Gtk.SingleSelection
self.ports_list = Gtk.StringList()
self.ports_combo = Gtk.DropDown.new(model=self.ports_list, expression=None)
self.ports_combo.set_hexpand(True) # expand
self.ports_selection = Gtk.SingleSelection(model=self.ports_list)
self.ports_combo.set_model(self.ports_selection)
self.populate_ports_combo() # Populate the combo box initially
self.refresh_ports_button = Gtk.Button(label="Обновить") # New refresh button
self.refresh_ports_button.connect("clicked", self.on_refresh_ports_clicked)
self.connect_button = Gtk.Button(label="Подключиться")
self.connect_button.connect("clicked", self.on_connect_button_clicked)
self.log_file_label = Gtk.Label(label="Файл логов:")
self.log_file_entry = Gtk.Entry()
self.log_file_entry.set_hexpand(True) # expand
self.log_file_entry.set_text(self.generate_default_log_filename())
# Grid labels
self.ch1_recv_label = Gtk.Label(label="0")
self.ch1_ovr_label = Gtk.Label(label="0")
self.ch1_ovr_percent_label = Gtk.Label(label="0%")
self.ch2_recv_label = Gtk.Label(label="0")
self.ch2_ovr_label = Gtk.Label(label="0")
self.ch2_ovr_percent_label = Gtk.Label(label="0%")
# Create layout
hbox1 = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
hbox1.append(self.port_state_label)
hbox1.append(self.ports_combo)
hbox1.append(self.refresh_ports_button) # Add the refresh button
hbox1.append(self.connect_button)
hbox2 = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
hbox2.append(self.log_file_label)
hbox2.append(self.log_file_entry)
grid = Gtk.Grid()
grid.set_column_spacing(5)
grid.set_row_spacing(5)
# Headers
grid.attach(Gtk.Label(label="CH#"), 0, 0, 1, 1)
grid.attach(Gtk.Label(label="recv"), 1, 0, 1, 1)
grid.attach(Gtk.Label(label="ovr"), 2, 0, 1, 1)
grid.attach(Gtk.Label(label="ovr%"), 3, 0, 1, 1)
# Channel 1
grid.attach(Gtk.Label(label="CH1"), 0, 1, 1, 1)
grid.attach(self.ch1_recv_label, 1, 1, 1, 1)
grid.attach(self.ch1_ovr_label, 2, 1, 1, 1)
grid.attach(self.ch1_ovr_percent_label, 3, 1, 1, 1)
# Channel 2
grid.attach(Gtk.Label(label="CH2"), 0, 2, 1, 1)
grid.attach(self.ch2_recv_label, 1, 2, 1, 1)
grid.attach(self.ch2_ovr_label, 2, 2, 1, 1)
grid.attach(self.ch2_ovr_percent_label, 3, 2, 1, 1)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
vbox.set_margin_top(10)
vbox.set_margin_bottom(10)
vbox.set_margin_start(10)
vbox.set_margin_end(10)
vbox.append(hbox1)
vbox.append(Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL)) # Spacer
vbox.append(hbox2)
vbox.append(grid)
self.set_child(vbox)
def populate_ports_combo(self):
"""Populates the combo box with available COM ports."""
ports = serial.tools.list_ports.comports()
# Clear the existing items
while self.ports_list.get_n_items() > 0:
self.ports_list.remove(0) # remove each element by index
for port, desc, hwid in sorted(ports):
self.ports_list.append(port) # append the port string directly
def on_refresh_ports_clicked(self, button):
"""Handles the refresh ports button click."""
self.populate_ports_combo()
def generate_default_log_filename(self):
"""Generates a default log filename with timestamp."""
now = datetime.datetime.now()
return f"log{now.strftime('%Y%m%d_%H%M%S')}.log"
def on_connect_button_clicked(self, button):
"""Handles the connect/disconnect button click."""
if not self.running: # Connect
selected_index = self.ports_selection.get_selected()
if selected_index is not None:
selected_port = self.ports_list.get_item(selected_index).get_string() # This gets a Gtk.StringObject
if isinstance(selected_port, str): # Check if the retrieved value is a string
try:
self.port = serial.Serial(selected_port, baudrate=115200) # Adjust baudrate as needed
self.log_file_path = self.log_file_entry.get_text()
self.log_file = open(self.log_file_path, "a")
self.running = True
self.connect_button.set_label("Отключиться")
self.port_state_label.set_label("Подключено")
self.read_thread = threading.Thread(target=self.read_serial_data)
self.read_thread.daemon = True # Thread exits when main program exits
self.read_thread.start()
except serial.SerialException as e:
print(f"Error opening serial port: {e}")
self.show_error_dialog(f"Ошибка открытия порта: {e}")
else:
self.show_error_dialog("Ошибка: Не удалось получить название порта.")
else: # Disconnect
self.running = False # Stop the reading thread
if self.port:
self.port.close()
self.port = None
if self.log_file:
self.log_file.close()
self.log_file = None
self.connect_button.set_label("Подключиться")
self.port_state_label.set_label("Disconnected")
def read_serial_data(self):
"""Reads data from the serial port in a separate thread."""
while self.running and self.port and self.port.is_open:
try:
if self.port.in_waiting > 0:
while self.port.in_waiting > 0:
line = self.port.readline().decode('utf-8').rstrip()
self.process_data(line)
GLib.idle_add(self.update_gui) # Update GUI from main thread
except serial.SerialException as e:
print(f"Serial port error: {e}")
GLib.idle_add(self.show_error_dialog, f"Ошибка последовательного порта: {e}") # Show from main thread
GLib.idle_add(self.disconnect_from_serial)
break # Exit the loop
except Exception as e:
print(f"Error reading {e}")
GLib.idle_add(self.show_error_dialog, f"Ошибка чтения данных: {e}")
GLib.idle_add(self.disconnect_from_serial)
break # Exit the loop
time.sleep(0.01) # Avoid busy-waiting
def process_data(self, data):
"""Processes the received data (stub implementation)."""
if data.startswith("D1;"):
self.ch1_recv += 1
elif data.startswith("D2;"):
self.ch2_recv += 1
elif data.startswith("D1OVR;"):
self.ch1_ovr += int(data.lstrip()[6:], 16)
elif data.startswith("D2OVR;"):
self.ch2_ovr += int(data.lstrip()[6:], 16)
if self.log_file:
try:
self.log_file.write(data + "\n")
self.log_file.flush() # Ensure data is written immediately
except Exception as e:
print(f"Error writing to log file: {e}")
GLib.idle_add(self.show_error_dialog, f"Ошибка записи в лог-файл: {e}")
def update_gui(self):
"""Updates the GUI elements with the latest data."""
def calculate_percentage(recv, ovr):
if recv == 0:
return "0%"
return f"{round((ovr / recv) * 100, 2)}%"
self.ch1_recv_label.set_label(str(self.ch1_recv))
self.ch1_ovr_label.set_label(str(self.ch1_ovr))
self.ch1_ovr_percent_label.set_label(calculate_percentage(self.ch1_recv, self.ch1_ovr))
self.ch2_recv_label.set_label(str(self.ch2_recv))
self.ch2_ovr_label.set_label(str(self.ch2_ovr))
self.ch2_ovr_percent_label.set_label(calculate_percentage(self.ch2_recv, self.ch2_ovr))
def disconnect_from_serial(self):
"""Disconnects from the serial port and updates the GUI."""
self.running = False
if self.port:
self.port.close()
self.port = None
if self.log_file:
self.log_file.close()
self.log_file = None
self.connect_button.set_label("Подключиться")
self.port_state_label.set_label("Disconnected")
def show_error_dialog(self, message):
"""Shows an error dialog with the given message."""
dialog = Gtk.MessageDialog(
transient_for=self,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK,
text="Error",
)
dialog.format_secondary_text(message)
dialog.run()
dialog.destroy()
class MyApplication(Gtk.Application):
def __init__(self):
super().__init__(application_id="com.example.myprogram")
def do_activate(self):
win = MyWindow(self)
win.present()
if __name__ == '__main__':
app = MyApplication()
app.run(None)