# import packages from openal import * from imutils.video import VideoStream import itertools import imutils import time import cv2 import json import numpy as np import spidev # Client import io import json import time import traceback from threading import Thread from streamer_utils import SocketBlocksWrapper, read_json_config from PIL import Image spi = spidev.SpiDev() spi.open(1, 0) spi.bits_per_word = 8 spi.max_speed_hz = 500000 X = bool(0) # иниц-я глоб. переменной X_New = bool(0) X_pred = bool(0) # иниц-я глоб. переменной startTime = float(time.time() - 10) # иниц-я глоб. переменной CONFIG = read_json_config('board-config.json') CLASSES = read_json_config('classes.json') class ConnectionDaemon(Thread): def __init__(self): super().__init__(daemon=True) self._sock = None self._message_handler = None def set_message_handler(self, handler: callable): self._message_handler = handler def __do_call_message_handler(self, res): if self._message_handler is not None: try: self._message_handler(res) except Exception: traceback.print_exc() def __do_session(self): try: with SocketBlocksWrapper.connect(CONFIG['server-address'], CONFIG['server-port']) as sock: print("ConnectionDaemon: open connection") self._sock = sock self._sock.write_object({'type': 'auth', 'client-type': 'board', 'name': CONFIG['name']}) res = self._sock.read_object() if res is None: return print(res) if 'status' in res: if res['status'] == 'success': while True: res = self._sock.read_object() if res is None: break self.__do_call_message_handler(res) except Exception: traceback.print_exc() finally: self.socket = None def run(self): while True: print("ConnectionDaemon: start session...") self.__do_session() time.sleep(5) def send_frame(self, fr): if self._sock is not None: try: to_send = { 'type': 'video', 'data': None, "selected-class": selected_class_id } if fr is not None: fr = imutils.resize(fr, width=640, height=360) buffer = cv2.imencode('.jpg', fr, [int(cv2.IMWRITE_JPEG_QUALITY), 60])[1] data_encode = np.array(buffer) to_send["data"] = data_encode.tobytes() self._sock.write_object(to_send) except Exception: traceback.print_exc() def send_image(self, img: Image): if self._sock is not None: try: out = io.BytesIO() img.save(out, format="JPEG") self._sock.write_object({ 'type': 'video', 'data': out.getvalue(), "selected-class": selected_class_id }) except Exception: traceback.print_exc() # камера не движется Left = bool(0) Right = bool(0) Up = bool(0) Down = bool(0) lazer = bool(0) # -2 = нейронка отключена, -1 = включены все классы, остальное - id класса из списка CLASSES selected_class_id = -2 # функция, которая вызывается при получении команды def message_handler(msg): global selected_class_id global Left global Right global Up global Down global lazer print(msg) if msg["type"] == "command": # отлично, наше сообщение act = msg["data"]["action"] if act == "lazerOn": lazer = 1 if act == "lazerOff": lazer = 0 if act == "left": Left = 1 if act == "right": Right = 1 if act == "up": Up = 1 if act == "down": Down = 1 if act == "start": selected_class_id = -1 elif act == "stop": selected_class_id = -2 elif act == "set-class": if selected_class_id < -1: print("message_handler: WARMING: set class-id while board is stop") else: cl = msg["data"]["class"] selected_class_id = -1 # если не найдем, будут выбраны все классы for i in range(0, len(CLASSES)): if CLASSES[i]["class"] == cl: selected_class_id = i break print("============ Initialize connection daemon ============") connection_daemon = ConnectionDaemon() connection_daemon.set_message_handler(message_handler) connection_daemon.start() def notify(): global startTime endTime = time.time() if endTime - startTime > 1.5: # прошло 1.5 секунды # if 1>0: #режим прерывания сообщений global X global X_New global X_pred if X == 0 and X_pred == 1: # поменялось на 0 source = oalOpen("Pot.wav") # Потерян source.play() # воспр. 1 раз startTime = time.time() # отсчёт времени if X==1 and X_pred==1 and X_New==0 and (endTime - startTime > 6): source = oalOpen("Nab.wav") #Потерян source.play() #воспр. 1 раз startTime = time.time() #отсчёт времени if X==1 and X_pred==1 and X_New==1: source = oalOpen("New.wav") #new object source.play() #воспр. 1 раз startTime = time.time() #отсчёт времени elif X == 1 and X_pred == 0: # поменялось на 1 source = oalOpen("Zah.wav") # Захвачен source.play() # воспр. 1 раз startTime = time.time() # отсчёт времени X_pred = X # обновляем предыдущее значение print("[INFO] loading model...") net = cv2.dnn_DetectionModel('AI.cfg', 'AI.weights') #net.setPreferableBackend(cv2.dnn.DNN_BACKEND_INFERENCE_ENGINE) #net.setPreferableTarget(cv2.dnn.DNN_TARGET_MYRIAD) picSize_X = 640 picSize_Y = 480 net.setInputSize(128, 128) net.setInputScale(1.0 / 255) net.setInputSwapRB(True) print("[INFO] starting video stream...") vs = VideoStream(src=0).start() # warm up the camera for a couple of seconds time.sleep(2.0) MAX_sX = 0 MAX_sY = 0 MAX_eX = 0 MAX_eY = 0 centr_X = 0 centr_Y = 0 pred_centr_X = 0 pred_centr_Y = 0 while True: if lazer == 1: txData = [0b11000000] #Вкл spi.xfer(txData) elif lazer == 0: txData = [0b10000000] #Выкл spi.xfer(txData) if selected_class_id >= -1: t0 = time.time() frame = vs.read() #frame = imutils.resize(frame, width=1280, height=720) #(h, w) = frame.shape[:2] S_MAX = 0 X = 0 X_New = 0 # находим объекты и возвращаем их параметры classes, confidences, boxes = net.detect(frame, confThreshold=0.18, nmsThreshold=0.5) # создаём рамки и надписи for classId, confidence, box in zip(list(itertools.chain(classes)), list(itertools.chain(confidences)), boxes): # if classId == 39: # вот так делать не стоит, работать такое точно не будет if selected_class_id == -1 or classId == selected_class_id: X = 1 label = f"{CLASSES[classId]['class']}" label = '%s: %.2f' % (label, confidence) color = CLASSES[classId]["color"] labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) left, top, width, heigth = box S = width * heigth print ('S =', S, 'pics') if S>S_MAX: S_MAX = S MAX_sX = left MAX_sY = top MAX_eX = left + width MAX_eY = top + heigth MAX_label = label print("Object detected: ", label) # TODO этот кусок кода перенести чуть выше и сделать так, чтобы он корректно отрисовывал все найденые объекты, а не только один как сейчас if (X == 1): # Draw a rectangle across the boundary of the object cv2.rectangle(frame, (MAX_sX, MAX_sY), (MAX_eX, MAX_eY), color, 2) y = MAX_sY - 15 if MAX_sY - 15 > 15 else MAX_sY + 15 # Put a text outside the rectangular detection # Choose the font of your choice: FONT_HERSHEY_SIMPLEX, FONT_HERSHEY_PL> cv2.putText(frame, MAX_label, (MAX_sX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) centr_X = (MAX_sX+MAX_eX)/2 centr_Y = (MAX_sY+MAX_eY)/2 if (abs(centr_X-pred_centr_X) > picSize_X/4 or abs(centr_Y-pred_centr_Y) > picSize_Y/4): X_New = 1 if (X == 1 and Left == 0 and Right == 0 and Up == 0 and Down == 0): if (centr_X > (picSize_X/2+picSize_X/10) and centr_Y < (picSize_Y/2+picSize_Y/10) and centr_Y > (picSize_Y/2-picSize_Y/10)): txData = [0b00000111] #Вправо spi.xfer(txData) elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y < (picSize_Y/2+picSize_Y/10) and centr_Y > (picSize_Y/2-picSize_Y/10)): txData = [0b00000110] #Влево spi.xfer(txData) elif (centr_Y > (picSize_Y/2+picSize_Y/10) and centr_X < (picSize_X/2+picSize_X/10) and centr_X > (picSize_X/2-picSize_X/10)): txData = [0b00001101] #Вверх spi.xfer(txData) elif (centr_Y < (picSize_Y/2-picSize_Y/10) and centr_X < (picSize_X/2+picSize_X/10) and centr_X > (picSize_X/2-picSize_X/10)): txData = [0b00001001] #Вниз spi.xfer(txData) elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y < (picSize_Y/2-picSize_Y/10)): txData = [0b00001010] #Влево/вниз spi.xfer(txData) elif (centr_X > (picSize_X/2+picSize_X/10) and centr_Y < (picSize_Y/2-picSize_Y/10)): txData = [0b00001011] #Вправо/вниз spi.xfer(txData) elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y > (picSize_Y/2+picSize_Y/10)): txData = [0b00001110] #Влево/вверх spi.xfer(txData) elif (centr_X > (picSize_X/2+picSize_X/10) and centr_Y > (picSize_Y/2+picSize_Y/10)): txData = [0b00001111] #Вправо/вверх spi.xfer(txData) else: txData = [0b00000101] #Центр spi.xfer(txData) elif (Left == 0 and Right == 1 and Up == 0 and Down == 0): txData = [0b00000111] #Вправо spi.xfer(txData) elif (Left == 1 and Right == 0 and Up == 0 and Down == 0): txData = [0b00000110] #Влево spi.xfer(txData) elif (Left == 0 and Right == 0 and Up == 1 and Down == 0): txData = [0b00001001] #Вверх spi.xfer(txData) elif (Left == 0 and Right == 0 and Up == 0 and Down == 1): txData = [0b00001101] #Вниз spi.xfer(txData) pred_centr_X = centr_X pred_centr_Y = centr_Y # обнуление Left = 0 Right = 0 Up = 0 Down = 0 My_FPS = 1 / (time.time() - t0) FPS_label = 'FPS=%2.f' % My_FPS labelSize, baseLine = cv2.getTextSize(FPS_label, cv2.FONT_HERSHEY_SIMPLEX, 1.5, 1) cv2.rectangle(frame, (4, 4), (4 + labelSize[0], 4 + labelSize[1] + baseLine), (255, 0, 155), cv2.FILLED) cv2.putText(frame, FPS_label, (4, 4 + labelSize[1]), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 0, 0)) notify() # отправка фрейма на сервер connection_daemon.send_frame(frame) else: # отправка раз в секунду пустого фрейма, типа нет видео connection_daemon.send_frame(None) time.sleep(1) spi.close() # Destroy windows and cleanup cv2.destroyAllWindows() # Stop the video stream vs.stop()