soc-streamer/host_test.py
2024-05-02 10:48:04 +03:00

338 lines
12 KiB
Python

# import packages
from openal import *
from imutils.video import VideoStream
import itertools
import imutils
import time
import cv2
import json
import numpy as np
import spidev
# Client
import io
import json
import time
import traceback
from threading import Thread
from streamer_utils import SocketBlocksWrapper, read_json_config
from PIL import Image
#spi = spidev.SpiDev()
#spi.open(1, 0)
#spi.bits_per_word = 8
#spi.max_speed_hz = 500000
X = bool(0) # иниц-я глоб. переменной
X_New = bool(0)
X_pred = bool(0) # иниц-я глоб. переменной
startTime = float(time.time() - 10) # иниц-я глоб. переменной
CONFIG = read_json_config('board-config.json')
CLASSES = read_json_config('classes.json')
class ConnectionDaemon(Thread):
def __init__(self):
super().__init__(daemon=True)
self._sock = None
self._message_handler = None
def set_message_handler(self, handler: callable):
self._message_handler = handler
def __do_call_message_handler(self, res):
if self._message_handler is not None:
try:
self._message_handler(res)
except Exception:
traceback.print_exc()
def __do_session(self):
try:
with SocketBlocksWrapper.connect(CONFIG['server-address'], CONFIG['server-port']) as sock:
print("ConnectionDaemon: open connection")
self._sock = sock
self._sock.write_object({'type': 'auth', 'client-type': 'board', 'name': CONFIG['name']})
res = self._sock.read_object()
if res is None:
return
print(res)
if 'status' in res:
if res['status'] == 'success':
while True:
res = self._sock.read_object()
if res is None:
break
self.__do_call_message_handler(res)
except Exception:
traceback.print_exc()
finally:
self.socket = None
def run(self):
while True:
print("ConnectionDaemon: start session...")
self.__do_session()
time.sleep(5)
def send_frame(self, fr):
if self._sock is not None:
try:
to_send = {
'type': 'video',
'data': None,
"selected-class": selected_class_id
}
if fr is not None:
fr = imutils.resize(fr, width=640, height=360)
buffer = cv2.imencode('.jpg', fr, [int(cv2.IMWRITE_JPEG_QUALITY), 60])[1]
data_encode = np.array(buffer)
to_send["data"] = data_encode.tobytes()
self._sock.write_object(to_send)
except Exception:
traceback.print_exc()
def send_image(self, img: Image):
if self._sock is not None:
try:
out = io.BytesIO()
img.save(out, format="JPEG")
self._sock.write_object({
'type': 'video',
'data': out.getvalue(),
"selected-class": selected_class_id
})
except Exception:
traceback.print_exc()
# камера не движется
Left = bool(0)
Right = bool(0)
Up = bool(0)
Down = bool(0)
# -2 = нейронка отключена, -1 = включены все классы, остальное - id класса из списка CLASSES
selected_class_id = -2
# функция, которая вызывается при получении команды
def message_handler(msg):
global selected_class_id
global Left
global Right
global Up
global Down
print(msg)
if msg["type"] == "command":
# отлично, наше сообщение
act = msg["data"]["action"]
if act == "left":
Left = 1
if act == "right":
Right = 1
if act == "up":
Up = 1
if act == "down":
Down = 1
if act == "start":
selected_class_id = -1
elif act == "stop":
selected_class_id = -2
elif act == "set-class":
if selected_class_id < -1:
print("message_handler: WARMING: set class-id while board is stop")
else:
cl = msg["data"]["class"]
selected_class_id = -1 # если не найдем, будут выбраны все классы
for i in range(0, len(CLASSES)):
if CLASSES[i]["class"] == cl:
selected_class_id = i
break
print("============ Initialize connection daemon ============")
connection_daemon = ConnectionDaemon()
connection_daemon.set_message_handler(message_handler)
connection_daemon.start()
def notify():
global startTime
endTime = time.time()
if endTime - startTime > 1.5: # прошло 1.5 секунды
# if 1>0: #режим прерывания сообщений
global X
global X_New
global X_pred
if X == 0 and X_pred == 1: # поменялось на 0
source = oalOpen("Pot.wav") # Потерян
source.play() # воспр. 1 раз
startTime = time.time() # отсчёт времени
if X==1 and X_pred==1 and X_New==0 and (endTime - startTime > 6):
source = oalOpen("Nab.wav") #Потерян
source.play() #воспр. 1 раз
startTime = time.time() #отсчёт времени
if X==1 and X_pred==1 and X_New==1:
source = oalOpen("New.wav") #new object
source.play() #воспр. 1 раз
startTime = time.time() #отсчёт времени
elif X == 1 and X_pred == 0: # поменялось на 1
source = oalOpen("Zah.wav") # Захвачен
source.play() # воспр. 1 раз
startTime = time.time() # отсчёт времени
X_pred = X # обновляем предыдущее значение
print("[INFO] loading model...")
net = cv2.dnn_DetectionModel('AI.cfg', 'AI.weights')
#net.setPreferableBackend(cv2.dnn.DNN_BACKEND_INFERENCE_ENGINE)
#net.setPreferableTarget(cv2.dnn.DNN_TARGET_MYRIAD)
picSize_X = 640
picSize_Y = 480
net.setInputSize(128, 128)
net.setInputScale(1.0 / 255)
net.setInputSwapRB(True)
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()
# warm up the camera for a couple of seconds
time.sleep(2.0)
MAX_sX = 0
MAX_sY = 0
MAX_eX = 0
MAX_eY = 0
centr_X = 0
centr_Y = 0
pred_centr_X = 0
pred_centr_Y = 0
while True:
if selected_class_id >= 0:
t0 = time.time()
frame = vs.read()
#frame = imutils.resize(frame, width=1280, height=720)
#(h, w) = frame.shape[:2]
S_MAX = 0
X = 0
X_New = 0
# находим объекты и возвращаем их параметры
classes, confidences, boxes = net.detect(frame, confThreshold=0.18, nmsThreshold=0.5)
# создаём рамки и надписи
for classId, confidence, box in zip(list(itertools.chain(classes)), list(itertools.chain(confidences)), boxes):
# if classId == 39: # вот так делать не стоит, работать такое точно не будет
if selected_class_id == -1 or classId == selected_class_id:
X = 1
label = f"{CLASSES[classId]['class']}"
label = '%s: %.2f' % (label, confidence)
color = CLASSES[classId]["color"]
labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
left, top, width, heigth = box
S = width * heigth
print ('S =', S, 'pics')
if S>S_MAX:
S_MAX = S
MAX_sX = left
MAX_sY = top
MAX_eX = left + width
MAX_eY = top + heigth
MAX_label = label
print("Object detected: ", label)
if (X == 1):
# Draw a rectangle across the boundary of the object
cv2.rectangle(frame, (MAX_sX, MAX_sY), (MAX_eX, MAX_eY), color, 2)
y = MAX_sY - 15 if MAX_sY - 15 > 15 else MAX_sY + 15
# Put a text outside the rectangular detection
# Choose the font of your choice: FONT_HERSHEY_SIMPLEX, FONT_HERSHEY_PL>
cv2.putText(frame, MAX_label, (MAX_sX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
centr_X = (MAX_sX+MAX_eX)/2
centr_Y = (MAX_sY+MAX_eY)/2
if (abs(centr_X-pred_centr_X) > picSize_X/4 or abs(centr_Y-pred_centr_Y) > picSize_Y/4):
X_New = 1
# if (X == 1 and Left == 0 and Right == 0 and Up == 0 and Down == 0):
# if (centr_X > (picSize_X/2+picSize_X/10) and centr_Y < (picSize_Y/2+picSize_Y/10) and centr_Y > (picSize_Y/2-picSize_Y/10)):
# txData = [0b00000111] #Вправо
# spi.xfer(txData)
# elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y < (picSize_Y/2+picSize_Y/10) and centr_Y > (picSize_Y/2-picSize_Y/10)):
# txData = [0b00000110] #Влево
# spi.xfer(txData)
# elif (centr_Y > (picSize_Y/2+picSize_Y/10) and centr_X < (picSize_X/2+picSize_X/10) and centr_X > (picSize_X/2-picSize_X/10)):
# txData = [0b00001101] #Вверх
# spi.xfer(txData)
# elif (centr_Y < (picSize_Y/2-picSize_Y/10) and centr_X < (picSize_X/2+picSize_X/10) and centr_X > (picSize_X/2-picSize_X/10)):
# txData = [0b00001001] #Вниз
# spi.xfer(txData)
# elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y < (picSize_Y/2-picSize_Y/10)):
# txData = [0b00001010] #Влево/вниз
# spi.xfer(txData)
# elif (centr_X > (picSize_X/2+picSize_X/10) and centr_Y < (picSize_Y/2-picSize_Y/10)):
# txData = [0b00001011] #Вправо/вниз
# spi.xfer(txData)
# elif (centr_X < (picSize_X/2-picSize_X/10) and centr_Y > (picSize_Y/2+picSize_Y/10)):
# txData = [0b00001110] #Влево/вверх
# spi.xfer(txData)
# elif (centr_X > (picSize_X/2+picSize_X/10) and centr_Y > (picSize_Y/2+picSize_Y/10)):
# txData = [0b00001111] #Вправо/вверх
# spi.xfer(txData)
# else:
# txData = [0b00000101] #Центр
# spi.xfer(txData)
# elif (Left == 0 and Right == 1 and Up == 0 and Down == 0):
# txData = [0b00000111] #Вправо
# spi.xfer(txData)
# elif (Left == 1 and Right == 0 and Up == 0 and Down == 0):
# txData = [0b00000110] #Влево
# spi.xfer(txData)
# elif (Left == 0 and Right == 0 and Up == 1 and Down == 0):
# txData = [0b00001001] #Вверх
# spi.xfer(txData)
# elif (Left == 0 and Right == 0 and Up == 0 and Down == 1):
# txData = [0b00001101] #Вниз
# spi.xfer(txData)
pred_centr_X = centr_X
pred_centr_Y = centr_Y
# обнуление
Left = 0
Right = 0
Up = 0
Down = 0
My_FPS = 1 / (time.time() - t0)
FPS_label = 'FPS=%2.f' % My_FPS
labelSize, baseLine = cv2.getTextSize(FPS_label, cv2.FONT_HERSHEY_SIMPLEX, 1.5, 1)
cv2.rectangle(frame, (4, 4), (4 + labelSize[0], 4 + labelSize[1] + baseLine), (255, 0, 155), cv2.FILLED)
cv2.putText(frame, FPS_label, (4, 4 + labelSize[1]), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 0, 0))
notify()
# отправка фрейма на сервер
connection_daemon.send_frame(frame)
else:
# отправка раз в секунду пустого фрейма
connection_daemon.send_frame(None)
time.sleep(1)
spi.close()
# Destroy windows and cleanup
cv2.destroyAllWindows()
# Stop the video stream
vs.stop()