прога для сбора логов

This commit is contained in:
Vladislav Ostapov 2025-04-01 19:19:43 +03:00
parent 6f930abf27
commit a552e8e048

View File

@ -1,40 +1,253 @@
import gi import gi
gi.require_version('Gtk', '4.0') gi.require_version('Gtk', '4.0')
from gi.repository import Gtk from gi.repository import Gtk, GLib
import serial
# https://pygobject.gnome.org/tutorials/gtk4/layout-widgets.html import serial.tools.list_ports
import threading
import time
import datetime
import os
class MyWindow(Gtk.ApplicationWindow): class MyWindow(Gtk.ApplicationWindow):
def __init__(self, **kargs):
super().__init__(**kargs, title='Hello World')
box = Gtk.Box(spacing=6) def __init__(self, app):
self.set_child(box) super().__init__(application=app)
self.set_title("My program")
self.set_default_size(800, 400) # Adjust size as needed
button1 = Gtk.Button(label='Hello') self.port = None # Serial port object
button1.connect('clicked', self.on_button1_clicked) self.running = False # Flag to control the reading thread
box.append(button1) self.log_file_path = ""
self.log_file = None
button2 = Gtk.Button(label='Goodbye') # Channel statistics (for demonstration)
button2.props.hexpand = True self.ch1_recv = 0
button2.connect('clicked', self.on_button2_clicked) self.ch1_ovr = 0
box.append(button2) self.ch2_recv = 0
self.ch2_ovr = 0
def on_button1_clicked(self, _widget): # Create widgets
print('Hello') self.port_state_label = Gtk.Label(label="Disconnected")
# Use Gtk.DropDown with Gtk.StringList and Gtk.SingleSelection
self.ports_list = Gtk.StringList()
self.ports_combo = Gtk.DropDown.new(model=self.ports_list, expression=None)
self.ports_combo.set_hexpand(True) # expand
self.ports_selection = Gtk.SingleSelection(model=self.ports_list)
self.ports_combo.set_model(self.ports_selection)
def on_button2_clicked(self, _widget): self.populate_ports_combo() # Populate the combo box initially
print('Goodbye')
self.refresh_ports_button = Gtk.Button(label="Обновить") # New refresh button
self.refresh_ports_button.connect("clicked", self.on_refresh_ports_clicked)
self.connect_button = Gtk.Button(label="Подключиться")
self.connect_button.connect("clicked", self.on_connect_button_clicked)
self.log_file_label = Gtk.Label(label="Файл логов:")
self.log_file_entry = Gtk.Entry()
self.log_file_entry.set_hexpand(True) # expand
self.log_file_entry.set_text(self.generate_default_log_filename())
# Grid labels
self.ch1_recv_label = Gtk.Label(label="0")
self.ch1_ovr_label = Gtk.Label(label="0")
self.ch1_ovr_percent_label = Gtk.Label(label="0%")
self.ch2_recv_label = Gtk.Label(label="0")
self.ch2_ovr_label = Gtk.Label(label="0")
self.ch2_ovr_percent_label = Gtk.Label(label="0%")
# Create layout
hbox1 = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
hbox1.append(self.port_state_label)
hbox1.append(self.ports_combo)
hbox1.append(self.refresh_ports_button) # Add the refresh button
hbox1.append(self.connect_button)
hbox2 = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
hbox2.append(self.log_file_label)
hbox2.append(self.log_file_entry)
grid = Gtk.Grid()
grid.set_column_spacing(5)
grid.set_row_spacing(5)
# Headers
grid.attach(Gtk.Label(label="CH#"), 0, 0, 1, 1)
grid.attach(Gtk.Label(label="recv"), 1, 0, 1, 1)
grid.attach(Gtk.Label(label="ovr"), 2, 0, 1, 1)
grid.attach(Gtk.Label(label="ovr%"), 3, 0, 1, 1)
# Channel 1
grid.attach(Gtk.Label(label="CH1"), 0, 1, 1, 1)
grid.attach(self.ch1_recv_label, 1, 1, 1, 1)
grid.attach(self.ch1_ovr_label, 2, 1, 1, 1)
grid.attach(self.ch1_ovr_percent_label, 3, 1, 1, 1)
# Channel 2
grid.attach(Gtk.Label(label="CH2"), 0, 2, 1, 1)
grid.attach(self.ch2_recv_label, 1, 2, 1, 1)
grid.attach(self.ch2_ovr_label, 2, 2, 1, 1)
grid.attach(self.ch2_ovr_percent_label, 3, 2, 1, 1)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
vbox.set_margin_top(10)
vbox.set_margin_bottom(10)
vbox.set_margin_start(10)
vbox.set_margin_end(10)
vbox.append(hbox1)
vbox.append(Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL)) # Spacer
vbox.append(hbox2)
vbox.append(grid)
self.set_child(vbox)
def populate_ports_combo(self):
"""Populates the combo box with available COM ports."""
ports = serial.tools.list_ports.comports()
# Clear the existing items
while self.ports_list.get_n_items() > 0:
self.ports_list.remove(0) # remove each element by index
for port, desc, hwid in sorted(ports):
self.ports_list.append(port) # append the port string directly
def on_refresh_ports_clicked(self, button):
"""Handles the refresh ports button click."""
self.populate_ports_combo()
def generate_default_log_filename(self):
"""Generates a default log filename with timestamp."""
now = datetime.datetime.now()
return f"log{now.strftime('%Y%m%d_%H%M%S')}.log"
def on_connect_button_clicked(self, button):
"""Handles the connect/disconnect button click."""
if not self.running: # Connect
selected_index = self.ports_selection.get_selected()
if selected_index is not None:
selected_port = self.ports_list.get_item(selected_index).get_string() # This gets a Gtk.StringObject
if isinstance(selected_port, str): # Check if the retrieved value is a string
try:
self.port = serial.Serial(selected_port, baudrate=115200) # Adjust baudrate as needed
self.log_file_path = self.log_file_entry.get_text()
self.log_file = open(self.log_file_path, "a")
self.running = True
self.connect_button.set_label("Отключиться")
self.port_state_label.set_label("Подключено")
self.read_thread = threading.Thread(target=self.read_serial_data)
self.read_thread.daemon = True # Thread exits when main program exits
self.read_thread.start()
except serial.SerialException as e:
print(f"Error opening serial port: {e}")
self.show_error_dialog(f"Ошибка открытия порта: {e}")
else:
self.show_error_dialog("Ошибка: Не удалось получить название порта.")
else: # Disconnect
self.running = False # Stop the reading thread
if self.port:
self.port.close()
self.port = None
if self.log_file:
self.log_file.close()
self.log_file = None
self.connect_button.set_label("Подключиться")
self.port_state_label.set_label("Disconnected")
def read_serial_data(self):
"""Reads data from the serial port in a separate thread."""
while self.running and self.port and self.port.is_open:
try:
if self.port.in_waiting > 0:
while self.port.in_waiting > 0:
line = self.port.readline().decode('utf-8').rstrip()
self.process_data(line)
GLib.idle_add(self.update_gui) # Update GUI from main thread
except serial.SerialException as e:
print(f"Serial port error: {e}")
GLib.idle_add(self.show_error_dialog, f"Ошибка последовательного порта: {e}") # Show from main thread
GLib.idle_add(self.disconnect_from_serial)
break # Exit the loop
except Exception as e:
print(f"Error reading {e}")
GLib.idle_add(self.show_error_dialog, f"Ошибка чтения данных: {e}")
GLib.idle_add(self.disconnect_from_serial)
break # Exit the loop
time.sleep(0.01) # Avoid busy-waiting
def process_data(self, data):
"""Processes the received data (stub implementation)."""
if data.startswith("D1;"):
self.ch1_recv += 1
elif data.startswith("D2;"):
self.ch2_recv += 1
elif data.startswith("D1OVR;"):
self.ch1_ovr += int(data.lstrip()[6:], 16)
elif data.startswith("D2OVR;"):
self.ch2_ovr += int(data.lstrip()[6:], 16)
if self.log_file:
try:
self.log_file.write(data + "\n")
self.log_file.flush() # Ensure data is written immediately
except Exception as e:
print(f"Error writing to log file: {e}")
GLib.idle_add(self.show_error_dialog, f"Ошибка записи в лог-файл: {e}")
def update_gui(self):
"""Updates the GUI elements with the latest data."""
def calculate_percentage(recv, ovr):
if recv == 0:
return "0%"
return f"{round((ovr / recv) * 100, 2)}%"
self.ch1_recv_label.set_label(str(self.ch1_recv))
self.ch1_ovr_label.set_label(str(self.ch1_ovr))
self.ch1_ovr_percent_label.set_label(calculate_percentage(self.ch1_recv, self.ch1_ovr))
self.ch2_recv_label.set_label(str(self.ch2_recv))
self.ch2_ovr_label.set_label(str(self.ch2_ovr))
self.ch2_ovr_percent_label.set_label(calculate_percentage(self.ch2_recv, self.ch2_ovr))
def disconnect_from_serial(self):
"""Disconnects from the serial port and updates the GUI."""
self.running = False
if self.port:
self.port.close()
self.port = None
if self.log_file:
self.log_file.close()
self.log_file = None
self.connect_button.set_label("Подключиться")
self.port_state_label.set_label("Disconnected")
def show_error_dialog(self, message):
"""Shows an error dialog with the given message."""
dialog = Gtk.MessageDialog(
transient_for=self,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK,
text="Error",
)
dialog.format_secondary_text(message)
dialog.run()
dialog.destroy()
def on_activate(app): class MyApplication(Gtk.Application):
# Create window def __init__(self):
win = MyWindow(application=app) super().__init__(application_id="com.example.myprogram")
win.present()
def do_activate(self):
win = MyWindow(self)
win.present()
app = Gtk.Application(application_id='com.example.App') if __name__ == '__main__':
app.connect('activate', on_activate) app = MyApplication()
app.run(None)
app.run(None)