soc-streamer/client.py
2024-05-02 10:48:04 +03:00

384 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/bin/python
import io
import time
import traceback
from threading import Thread, Event, Lock
import gi
from PIL import Image
from streamer_utils import SocketBlocksWrapper, read_json_config
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, GLib, GdkPixbuf, Gdk
CLASSES = read_json_config('classes.json')
CONFIG = read_json_config('client-config.json')
class ConnectionDaemon:
def __init__(self, address, port, object_receive_callback: callable):
self._address = address
self._port = port
self._object_receive_callback = object_receive_callback
self._sock = None
self._lock = Lock()
self.__login = None
self.__password = None
# объект события, только для ожидания авторизации
self.__auth_event = Event()
self.__auth_callback = None
self.__auth_callback_is_done = False
self.__auth_success = False
self._conn_thread = Thread(target=self.__run, daemon=True)
self._conn_thread.start()
def auth(self, login, password, callback: callable):
with self._lock:
self.__login = login
self.__password = password
self.__auth_callback = callback
self.__auth_event.set()
def __do_session(self, login, password, auth_callback: callable):
try:
with SocketBlocksWrapper.connect(CONFIG["server-address"], CONFIG["server-port"]) as sock:
with self._lock:
self._sock = sock
sock.write_object(
{
'type': 'auth',
'client-type': 'client',
'target': login,
'password': password
})
res = sock.read_object()
print(res)
if 'status' in res:
if res['status'] == 'success':
self.__auth_success = True
if auth_callback is not None:
auth_callback(True, "success")
self.__auth_callback_is_done = True
while True:
res = sock.read_object()
if res is None:
break
with self._lock:
callback = self._object_receive_callback
callback(res)
else:
self.__auth_callback_is_done = True
if auth_callback is not None:
auth_callback(False, res['description'])
else:
raise Exception("'status' is not defined in response")
except Exception:
traceback.print_exc()
finally:
with self._lock:
self._sock = None
def __run(self):
# нужно ждать данных авторизации
while True:
if not self.__auth_success:
self.__auth_event.wait()
with self._lock:
login, password, callback = self.__login, self.__password, self.__auth_callback
self.__auth_callback_is_done = False
self.__auth_event.clear()
if login is None or password is None:
continue
# делаем сессию
self.__do_session(login, password, callback)
if not self.__auth_callback_is_done and callback is not None and callable(callback):
callback(False, "Ошибка ввода-вывода")
if not self.__auth_success:
with self._lock:
if not self.__auth_event.is_set():
self.__login = None
self.__password = None
else:
time.sleep(5)
print("Try to reconnect...")
def send_object(self, obj: dict):
try:
self._sock.write_object(obj)
except Exception as e:
# traceback.print_exc()
print(f"Failed to send command {obj} ({e})")
class MainWindow(Gtk.Window):
def __create_main_widget(self):
# основная "коробка"
root_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
root_box.connect("key-press-event", self.on_key_press)
self.image = Gtk.Image()
self.image.set_vexpand(True)
self.image.set_can_focus(True)
no_connection_label = Gtk.Label(label="Нет подключения к плате")
no_connection_label.set_vexpand(True)
no_connection_label.set_hexpand(True)
no_video_label = Gtk.Label(label="Плата подключена\nОжидание действия пользователя")
no_video_label.set_vexpand(True)
no_video_label.set_hexpand(True)
self._image_stack = Gtk.Stack()
self._image_stack.set_transition_type(Gtk.StackTransitionType.SLIDE_DOWN)
self._image_stack.add_named(no_connection_label, "no-connection")
self._image_stack.add_named(no_video_label, "no-video")
self._image_stack.add_named(self.image, "video")
root_box.pack_start(self._image_stack, expand=True, fill=True, padding=0)
# нижняя сторона окна, панелька с управлением
toolbar = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4)
toolbar.set_halign(Gtk.Align.CENTER)
button_start = Gtk.Button(label="Старт")
button_start.connect("clicked", lambda widget: self.__send_action('start'))
button_stop = Gtk.Button(label="Стоп")
button_stop.connect("clicked", lambda widget: self.__send_action('stop'))
button_lazer_start = Gtk.Button(label="Старт лазер")
button_lazer_start.connect("clicked", lambda widget: self.__send_action('lazerOn'))
button_lazer_stop = Gtk.Button(label="Стоп лазер")
button_lazer_stop.connect("clicked", lambda widget: self.__send_action('lazerOff'))
self.select_class = Gtk.ComboBoxText()
self.select_class.connect("changed", self.on_class_select)
sorted_list = [c for c in CLASSES]
self.select_class.append_text("<__ (все) __>")
for c in sorted_list:
self.select_class.append_text(c["label"])
toolbar.pack_start(button_start, expand=False, fill=False, padding=0)
toolbar.pack_start(button_stop, expand=False, fill=False, padding=0)
toolbar.pack_start(self.select_class, expand=False, fill=False, padding=0)
toolbar.pack_start(button_lazer_start, expand=False, fill=False, padding=0)
toolbar.pack_start(button_lazer_stop, expand=False, fill=False, padding=0)
root_box.pack_start(toolbar, expand=False, fill=True, padding=0)
return root_box
def __connection_auth_handler(self, result, message):
def gui_function():
self._spinner.stop()
self.__password_entry.set_editable(True)
self.__login_entry.set_editable(True)
self.__button_auth.set_sensitive(True)
if result:
self.root.set_visible_child_name("main")
else:
dialog = Gtk.MessageDialog(
transient_for=self,
flags=0,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK,
text=message,
title="Ошибка аутентификации"
)
dialog.run()
dialog.destroy()
GLib.idle_add(lambda: gui_function())
def __auth_handler(self, widget):
self._spinner.start()
self.__password_entry.set_editable(False)
self.__login_entry.set_editable(False)
self.__button_auth.set_sensitive(False)
self._daemon.auth(self.__login_entry.get_text(),
self.__password_entry.get_text(),
self.__connection_auth_handler)
def __create_auth_widget(self):
hbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
hbox.set_halign(Gtk.Align.CENTER)
hbox.set_valign(Gtk.Align.CENTER)
logo = Gtk.Image.new_from_file("logo.png")
logo_label = Gtk.Label(label="Ведапроект")
self.__button_auth = Gtk.Button(label="Вход")
self.__button_auth.connect("clicked", self.__auth_handler)
label_login = Gtk.Label(label="Логин")
self.__login_entry = Gtk.Entry(can_default=True)
label_password = Gtk.Label(label="Пароль")
self.__password_entry = Gtk.Entry()
self.__password_entry.set_visibility(False)
self.__login_entry.connect("activate", lambda widget: self.__password_entry.grab_focus())
self.__password_entry.connect("activate", self.__auth_handler)
hbox.pack_start(logo, expand=False, fill=False, padding=0)
hbox.pack_start(logo_label, expand=False, fill=False, padding=10)
hbox.pack_start(label_login, expand=False, fill=False, padding=10)
hbox.pack_start(self.__login_entry, expand=False, fill=False, padding=0)
hbox.pack_start(label_password, expand=False, fill=False, padding=10)
hbox.pack_start(self.__password_entry, expand=False, fill=False, padding=0)
hbox.pack_start(self.__button_auth, expand=False, fill=False, padding=10)
# спинер, хрень которая будет показывать что идет процесс подключения
self._spinner = Gtk.Spinner()
hbox.pack_start(self._spinner, True, True, 0)
return hbox
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._remote_selected_class = False
self._visible_image_child = "no-connection"
self._daemon = ConnectionDaemon(CONFIG["server-address"], CONFIG["server-port"], self.__on_object_received)
self.set_default_size(600, 400)
self.set_title("Ведапроект")
# основная "коробка"
self.root = Gtk.Stack()
self.root.set_transition_type(Gtk.StackTransitionType.SLIDE_LEFT)
self.root.add_named(self.__create_auth_widget(), "auth")
self.root.add_named(self.__create_main_widget(), "main")
self.add(self.root)
# ---------------------- Handlers --------------------------
def _send_command(self, cmd):
self._daemon.send_object({'type': 'command', 'data': cmd})
def __send_action(self, act):
self._send_command({'action': act})
def __get_selected_class(self):
label = self.select_class.get_active_text()
for i in range(0, len(CLASSES)):
if CLASSES[i]["label"] == label:
# теперь метка "не выбрано" имеет индекс 0, поэтому все остальные смещены на +1
return i
return None
def on_class_select(self, widget):
c = self.__get_selected_class()
if self._remote_selected_class:
self._remote_selected_class = False
else:
if c is None:
cl = "__all__"
else:
cl = CLASSES[c]["class"]
print(f'select class: id={c}, {cl}')
self._send_command({'action': 'set-class', 'class': cl})
def on_key_press(self, window, event_key: Gdk.EventKey):
commands = {
"up": [Gdk.KEY_w, Gdk.KEY_W], # Gdk.KEY_Up
"left": [Gdk.KEY_a, Gdk.KEY_A], # Gdk.KEY_Left
"down": [Gdk.KEY_s, Gdk.KEY_S], # Gdk.KEY_Down
"right": [Gdk.KEY_d, Gdk.KEY_D], # Gdk.KEY_Right
"start": Gdk.KEY_z,
"stop": Gdk.KEY_x,
}
keyval = event_key.get_keyval()
action = None
if keyval[0]:
keyval = keyval[1]
# прошерстим команды
for key in commands:
if type(commands[key]) == list:
if keyval in commands[key]:
action = key
break
else:
if commands[key] == keyval:
action = key
break
if action is not None:
self._send_command({'action': action})
def __update_image(self, image):
need_frame = "no-video"
if image is not None:
need_frame = "video"
data = image.tobytes()
w, h = image.size
data = GLib.Bytes.new(data)
pix = GdkPixbuf.Pixbuf.new_from_bytes(data, GdkPixbuf.Colorspace.RGB, False, 8, w, h, w * 3)
self.image.set_from_pixbuf(pix)
if self._visible_image_child is not need_frame:
self._have_image = need_frame
self._image_stack.set_visible_child_name(need_frame)
# print(f"{datetime.datetime.now()} MyWindow: image received")
def __update_selected_class(self, cl):
c = self.__get_selected_class()
if c is None:
c = 0
else:
c += 1
if cl < -1:
cl = 0
else:
cl += 1
if c != cl:
if cl != c:
self._remote_selected_class = True
# то же самое, из-за метки все классы уехали на +1
self.select_class.set_active(cl)
def __on_object_received(self, res):
if res["type"] == "video":
if res["data"] is None:
GLib.idle_add(lambda: self.__update_image(None))
else:
data = io.BytesIO(res["data"])
img = Image.open(data)
GLib.idle_add(lambda: self.__update_image(img))
if 'selected-class' in res:
GLib.idle_add(lambda: self.__update_selected_class(res['selected-class']))
def main():
""" Run the main application"""
win = MainWindow()
win.connect("destroy", Gtk.main_quit)
win.show_all()
Gtk.main()
if __name__ == '__main__':
main()