soc-streamer/board5.py
2024-05-02 10:48:04 +03:00

351 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import packages
from openal import *
from imutils.video import VideoStream
import itertools
import imutils
import time
import cv2
import json
import numpy as np
import spidev
# Client
import io
import json
import time
import traceback
from threading import Thread
from streamer_utils import SocketBlocksWrapper, read_json_config
from PIL import Image
spi = spidev.SpiDev()
spi.open(1, 0)
spi.bits_per_word = 8
spi.max_speed_hz = 500000
X = bool(0) # иниц-я глоб. переменной
X_New = bool(0)
X_pred = bool(0) # иниц-я глоб. переменной
startTime = float(time.time() - 10) # иниц-я глоб. переменной
CONFIG = read_json_config('board-config.json')
CLASSES = read_json_config('classes.json')
class ConnectionDaemon(Thread):
def __init__(self):
super().__init__(daemon=True)
self._sock = None
self._message_handler = None
def set_message_handler(self, handler: callable):
self._message_handler = handler
def __do_call_message_handler(self, res):
if self._message_handler is not None:
try:
self._message_handler(res)
except Exception:
traceback.print_exc()
def __do_session(self):
try:
with SocketBlocksWrapper.connect(CONFIG['server-address'], CONFIG['server-port']) as sock:
print("ConnectionDaemon: open connection")
self._sock = sock
self._sock.write_object({'type': 'auth', 'client-type': 'board', 'name': CONFIG['name']})
res = self._sock.read_object()
if res is None:
return
print(res)
if 'status' in res:
if res['status'] == 'success':
while True:
res = self._sock.read_object()
if res is None:
break
self.__do_call_message_handler(res)
except Exception:
traceback.print_exc()
finally:
self.socket = None
def run(self):
while True:
print("ConnectionDaemon: start session...")
self.__do_session()
time.sleep(5)
def send_frame(self, fr):
if self._sock is not None:
try:
to_send = {
'type': 'video',
'data': None,
"selected-class": selected_class_id
}
if fr is not None:
fr = imutils.resize(fr, width=640, height=360)
buffer = cv2.imencode('.jpg', fr, [int(cv2.IMWRITE_JPEG_QUALITY), 60])[1]
data_encode = np.array(buffer)
to_send["data"] = data_encode.tobytes()
self._sock.write_object(to_send)
except Exception:
traceback.print_exc()
def send_image(self, img: Image):
if self._sock is not None:
try:
out = io.BytesIO()
img.save(out, format="JPEG")
self._sock.write_object({
'type': 'video',
'data': out.getvalue(),
"selected-class": selected_class_id
})
except Exception:
traceback.print_exc()
# камера не движется
Left = bool(0)
Right = bool(0)
Up = bool(0)
Down = bool(0)
lazer = bool(0)
# -2 = нейронка отключена, -1 = включены все классы, остальное - id класса из списка CLASSES
selected_class_id = -2
# функция, которая вызывается при получении команды
def message_handler(msg):
global selected_class_id
global Left
global Right
global Up
global Down
global lazer
print(msg)
if msg["type"] == "command":
# отлично, наше сообщение
act = msg["data"]["action"]
if act == "lazerOn":
lazer = 1
if act == "lazerOff":
lazer = 0
if act == "left":
Left = 1
if act == "right":
Right = 1
if act == "up":
Up = 1
if act == "down":
Down = 1
if act == "start":
selected_class_id = -1
elif act == "stop":
selected_class_id = -2
elif act == "set-class":
if selected_class_id < -1:
print("message_handler: WARMING: set class-id while board is stop")
else:
cl = msg["data"]["class"]
selected_class_id = -1 # если не найдем, будут выбраны все классы
for i in range(0, len(CLASSES)):
if CLASSES[i]["class"] == cl:
selected_class_id = i
break
print("============ Initialize connection daemon ============")
connection_daemon = ConnectionDaemon()
connection_daemon.set_message_handler(message_handler)
connection_daemon.start()
def notify():
global startTime
endTime = time.time()
if endTime - startTime > 1.5: # прошло 1.5 секунды
# if 1>0: #режим прерывания сообщений
global X
global X_New
global X_pred
if X == 0 and X_pred == 1: # поменялось на 0
source = oalOpen("Pot.wav") # Потерян
source.play() # воспр. 1 раз
startTime = time.time() # отсчёт времени
if X==1 and X_pred==1 and X_New==0 and (endTime - startTime > 6):
source = oalOpen("Nab.wav") #Потерян
source.play() #воспр. 1 раз
startTime = time.time() #отсчёт времени
if X==1 and X_pred==1 and X_New==1:
source = oalOpen("New.wav") #new object
source.play() #воспр. 1 раз
startTime = time.time() #отсчёт времени
elif X == 1 and X_pred == 0: # поменялось на 1
source = oalOpen("Zah.wav") # Захвачен
source.play() # воспр. 1 раз
startTime = time.time() # отсчёт времени
X_pred = X # обновляем предыдущее значение
print("[INFO] loading model...")
net = cv2.dnn_DetectionModel('AI.cfg', 'AI.weights')
#net.setPreferableBackend(cv2.dnn.DNN_BACKEND_INFERENCE_ENGINE)
#net.setPreferableTarget(cv2.dnn.DNN_TARGET_MYRIAD)
picSize_X = 640
picSize_Y = 480
net.setInputSize(128, 128)
net.setInputScale(1.0 / 255)
net.setInputSwapRB(True)
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()
# warm up the camera for a couple of seconds
time.sleep(2.0)
MAX_sX = 0
MAX_sY = 0
MAX_eX = 0
MAX_eY = 0
centr_X = 0
centr_Y = 0
pred_centr_X = 0
pred_centr_Y = 0
while True:
if lazer == 1:
txData = [0b11000000] #Вкл
spi.xfer(txData)
elif lazer == 0:
txData = [0b10000000] #Выкл
spi.xfer(txData)
if selected_class_id >= -1:
t0 = time.time()
frame = vs.read()
#frame = imutils.resize(frame, width=1280, height=720)
#(h, w) = frame.shape[:2]
S_MAX = 0
X = 0
X_New = 0
# находим объекты и возвращаем их параметры
classes, confidences, boxes = net.detect(frame, confThreshold=0.18, nmsThreshold=0.5)
# создаём рамки и надписи
for classId, confidence, box in zip(list(itertools.chain(classes)), list(itertools.chain(confidences)), boxes):
# if classId == 39: # вот так делать не стоит, работать такое точно не будет
if selected_class_id == -1 or classId == selected_class_id:
X = 1
label = f"{CLASSES[classId]['class']}"
label = '%s: %.2f' % (label, confidence)
color = CLASSES[classId]["color"]
labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
left, top, width, heigth = box
S = width * heigth
print ('S =', S, 'pics')
if S>S_MAX:
S_MAX = S
MAX_sX = left
MAX_sY = top
MAX_eX = left + width
MAX_eY = top + heigth
MAX_label = label
print("Object detected: ", label)
# TODO этот кусок кода перенести чуть выше и сделать так, чтобы он корректно отрисовывал все найденые объекты, а не только один как сейчас
if (X == 1):
# Draw a rectangle across the boundary of the object
cv2.rectangle(frame, (MAX_sX, MAX_sY), (MAX_eX, MAX_eY), color, 2)
y = MAX_sY - 15 if MAX_sY - 15 > 15 else MAX_sY + 15
# Put a text outside the rectangular detection
# Choose the font of your choice: FONT_HERSHEY_SIMPLEX, FONT_HERSHEY_PL>
cv2.putText(frame, MAX_label, (MAX_sX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
centr_X = (MAX_sX+MAX_eX)/2
centr_Y = (MAX_sY+MAX_eY)/2
if (abs(centr_X-pred_centr_X) > picSize_X/4 or abs(centr_Y-pred_centr_Y) > picSize_Y/4):
X_New = 1
if (X == 1 and Left == 0 and Right == 0 and Up == 0 and Down == 0):
if (centr_X > (picSize_X/2+picSize_X/10) and centr_Y < (picSize_Y/2+picSize_Y/10) and centr_Y > (picSize_Y/2-picSize_Y/10)):
txData = [0b00000111] #Вправо
spi.xfer(txData)
elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y < (picSize_Y/2+picSize_Y/10) and centr_Y > (picSize_Y/2-picSize_Y/10)):
txData = [0b00000110] #Влево
spi.xfer(txData)
elif (centr_Y > (picSize_Y/2+picSize_Y/10) and centr_X < (picSize_X/2+picSize_X/10) and centr_X > (picSize_X/2-picSize_X/10)):
txData = [0b00001101] #Вверх
spi.xfer(txData)
elif (centr_Y < (picSize_Y/2-picSize_Y/10) and centr_X < (picSize_X/2+picSize_X/10) and centr_X > (picSize_X/2-picSize_X/10)):
txData = [0b00001001] #Вниз
spi.xfer(txData)
elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y < (picSize_Y/2-picSize_Y/10)):
txData = [0b00001010] #Влево/вниз
spi.xfer(txData)
elif (centr_X > (picSize_X/2+picSize_X/10) and centr_Y < (picSize_Y/2-picSize_Y/10)):
txData = [0b00001011] #Вправо/вниз
spi.xfer(txData)
elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y > (picSize_Y/2+picSize_Y/10)):
txData = [0b00001110] #Влево/вверх
spi.xfer(txData)
elif (centr_X > (picSize_X/2+picSize_X/10) and centr_Y > (picSize_Y/2+picSize_Y/10)):
txData = [0b00001111] #Вправо/вверх
spi.xfer(txData)
else:
txData = [0b00000101] #Центр
spi.xfer(txData)
elif (Left == 0 and Right == 1 and Up == 0 and Down == 0):
txData = [0b00000111] #Вправо
spi.xfer(txData)
elif (Left == 1 and Right == 0 and Up == 0 and Down == 0):
txData = [0b00000110] #Влево
spi.xfer(txData)
elif (Left == 0 and Right == 0 and Up == 1 and Down == 0):
txData = [0b00001001] #Вверх
spi.xfer(txData)
elif (Left == 0 and Right == 0 and Up == 0 and Down == 1):
txData = [0b00001101] #Вниз
spi.xfer(txData)
pred_centr_X = centr_X
pred_centr_Y = centr_Y
# обнуление
Left = 0
Right = 0
Up = 0
Down = 0
My_FPS = 1 / (time.time() - t0)
FPS_label = 'FPS=%2.f' % My_FPS
labelSize, baseLine = cv2.getTextSize(FPS_label, cv2.FONT_HERSHEY_SIMPLEX, 1.5, 1)
cv2.rectangle(frame, (4, 4), (4 + labelSize[0], 4 + labelSize[1] + baseLine), (255, 0, 155), cv2.FILLED)
cv2.putText(frame, FPS_label, (4, 4 + labelSize[1]), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 0, 0))
notify()
# отправка фрейма на сервер
connection_daemon.send_frame(frame)
else:
# отправка раз в секунду пустого фрейма, типа нет видео
connection_daemon.send_frame(None)
time.sleep(1)
spi.close()
# Destroy windows and cleanup
cv2.destroyAllWindows()
# Stop the video stream
vs.stop()