From 59d159d6838c52849a428347556f749bef2a7000 Mon Sep 17 00:00:00 2001 From: VladislavOstapov Date: Sun, 16 Jun 2024 20:11:17 +0300 Subject: [PATCH] =?UTF-8?q?=D0=BF=D0=B5=D1=80=D0=B2=D1=8B=D0=B9=20=D0=BA?= =?UTF-8?q?=D0=BE=D0=BC=D0=BC=D0=B8=D1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 + README.md | 23 ++++ example.env.py | 12 ++ src/interfaces.py | 122 ++++++++++++++++++++ src/main.py | 77 +++++++++++++ src/wssmqtt.py | 282 ++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 520 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 example.env.py create mode 100644 src/interfaces.py create mode 100644 src/main.py create mode 100644 src/wssmqtt.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2a8acd6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea/ +*.iml +src/env.py + diff --git a/README.md b/README.md new file mode 100644 index 0000000..8f05828 --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# esp-wssmqtt-board + +Проект создан "на коленке" за пару вечеров. + +Изначально разрабатывался для микроконтроллера ESP32, но можно использовать и ESP8266, и на rp2040(W), и возможно на pyboard. + +Программа связывается с MQTT брокером через вебсокет (с TLS), подписывается на пачку топиков и слушает их. +Значения в топиках следует делать `retain`, чтобы при потере связи они снова отсылались прошивке. + +# Инструментарий + +Для запуска проекта на микроконтроллере нужен установленный micropython (логично). + +Его установку тут описывать не буду. + +# Перед запуском + +Для настройки "переменных окружения" используется один единственный файл - `env.py`. +Перед заливкой прошивки нужно скопировать файл `example.env.py` в `src/env.py` и отредактировать его. + +Дополнительно можно отредактировать файл с интерфейсами `src/intervaces.py`. +Там прописаны физические интерфейсы управления, можно добавлять/изменять/удалять существующие. + diff --git a/example.env.py b/example.env.py new file mode 100644 index 0000000..0ee569a --- /dev/null +++ b/example.env.py @@ -0,0 +1,12 @@ +# настройки Wi-Fi +WIFI_SSID = "My ssid" +WIFI_PASSWORD = "supersecret-password-228" + +# настройки вебсокета +WSS_HOST = "mqtt.mydamain.com" +WSS_PATH = "/mqtt" + +# настройки авторизации +MQTT_USER = "mqtt_user" +MQTT_PASSWORD = "supersecret-mqtt" + diff --git a/src/interfaces.py b/src/interfaces.py new file mode 100644 index 0000000..7350ef6 --- /dev/null +++ b/src/interfaces.py @@ -0,0 +1,122 @@ +import machine +import time +from machine import Pin, PWM + +_U16_MAX = 0xFFFF + + +def _int_to_servo_duty(val): + val = max(0, min(1000, val)) + val += 1000 # все в микросекундах, получается диапазон [1000...2000] + # переводим в нужные числа + val = int(float(val) / 0.305180438) + return val + + +def make_int_interpolate(left_min, left_max, right_min, right_max): + # Figure out how 'wide' each range is + leftSpan = left_max - left_min + rightSpan = right_max - right_min + + # Compute the scale factor between left and right values + scaleFactor = float(rightSpan) / float(leftSpan) + + # create interpolation function using pre-calculated scaleFactor + def interp_fn(value): + val = int(right_min + (value - left_min) * scaleFactor) + if val < right_min: + val = right_min + elif val > right_max: + val = right_max + return val + + return interp_fn + + +class BaseInterface: + def __init__(self, name): + self._name = name + + def get_topics(self): + return (self._name, ) + + def process_message(self, topic, value): + print(f"Process topic {topic} with '{value}' for {self}") + + def __str__(self): + return f"<{self.__class__.__name__} {self._name}>" + + +class PwmInterface(BaseInterface): + def __init__(self, name, pin, freq=2000, power_min=0, power_max=255, pwm_min=0, pwm_max=_U16_MAX): + super().__init__(name) + self._pin = pin + self._pwm = PWM(self._pin, freq=freq, duty_u16=0) + self._pwm_min = pwm_min + self._pwm_max = pwm_max + self._interpolator = make_int_interpolate(power_min, power_max, pwm_min, pwm_max) + self._power = pwm_min + self._powered = False + self._pwm.duty_u16(self._power) + self._topic_cmd = f"{self._name}/cmd" + self._topic_power = f"{self._name}/power" + + def get_topics(self): + return self._topic_power, self._topic_cmd + + def process_message(self, topic, value): + print(f"{self}: process topic {topic} with {value}") + if topic == self._topic_cmd: + if value == 'ON': + self._pwm.duty_u16(self._interpolator(self._power)) + self._powered = True + else: + self._pwm.duty_u16(self._pwm_min) + self._powered = False + else: + self._power = int(value) + if self._powered: + self._pwm.duty_u16(self._interpolator(self._power)) + + +class ServoInterface(PwmInterface): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs, freq=50, pwm_min=3276, pwm_max=6553) + # инициализация ESC + self._pwm.duty_u16(self._pwm_min) + time.sleep(0.2) + self._pwm.duty_u16(self._pwm_max) + time.sleep(0.2) + + self._pwm.duty_u16(self._power) + + +_interfaces = [ + PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n0", Pin(13)), + PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n1", Pin(12)), + PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n2", Pin(14)), + PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n3", Pin(27)), + ServoInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm1n0", Pin(32)), +] + + +def list_topics(): + out = [] + for i in _interfaces: + for t in i.get_topics(): + out.append(t) + return out + + +def init(): + print(f"Interface names: {[i for i in list_topics()]}") + + +def process_message(topic, message): + topic = topic.decode('utf-8') + message = message.decode('utf-8') + print(f'Message received: topic={topic}, message={message}') + for i in _interfaces: + if topic in i.get_topics(): + i.process_message(topic, message) + diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..773a20e --- /dev/null +++ b/src/main.py @@ -0,0 +1,77 @@ +import errno +import network +import interfaces +import utime as time +from wssmqtt import * +from env import * + +interfaces.init() + +wifi = network.WLAN(network.STA_IF) +wifi.active(1) +wifi.connect(WIFI_SSID, WIFI_PASSWORD) + + +def do_connection(): + # Create an MQTT client + keepalive = 5 + client = WSSMQTTClient(host=WSS_HOST, path=WSS_PATH, + keepalive=25, user=MQTT_USER, password=MQTT_PASSWORD) + + client.set_callback(interfaces.process_message) + + # Publish messages to and receive messages from the MQTT broker forever + last_ping = time.ticks_ms() + while True: + if not client.is_connected(): + print('Connecting...') + client.connect() + last_ping = time.ticks_ms() + + for t in interfaces.list_topics(): + print(f'Subscribing to topic {t}...') + client.subscribe(t) + + print('Subscribed') + + else: + # Periodically ping the broker consistently with the "keepalive" + # argument used when connecting. If this isn't done, the broker will + # disconnect when the client has been idle for 1.5x the keepalive + # time. + if (time.ticks_ms() - last_ping) >= keepalive * 1000: + # print('Ping!') + client.ping() + last_ping = time.ticks_ms() + + # Every 5 minutes publish a message for fun. + # client.publish(topic, 'Coming from MicroPython via a websocket @%d' % (time.ticks_ms(),)) + + # Receive the next message if one is available, and handle the case that + # the broker has disconnected. + try: + client.check_msg() + except KeyboardInterrupt as e: + client.disconnect() + raise e + except BaseException as e: + if str(e) == '-1': + client.disconnect() + print('Connection closed') + + +def main(): + while True: + try: + print("Try connection...") + do_connection() + except KeyboardInterrupt as e: + raise e + except OSError as e: + if e.errno == errno.ENOMEM: + machine.reset() + print(f"ERROR {e.__class__.__name__}") + time.sleep(1) + + +main() diff --git a/src/wssmqtt.py b/src/wssmqtt.py new file mode 100644 index 0000000..13ad493 --- /dev/null +++ b/src/wssmqtt.py @@ -0,0 +1,282 @@ +import socket +import struct +import ssl +import usocket as socket +import random +import binascii +import machine +from websocket import websocket + + +class WebSocketClient(websocket): + def __init__(self, sock): + super().__init__(sock) + self.__sock = sock + + def setblocking(self, value): + self.__sock.setblocking(value) + + @staticmethod + def create_ssl_socket(hostname): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + addr = socket.getaddrinfo(hostname, 443)[0][-1] + s.connect(addr) + return ssl.wrap_socket(s) + + @staticmethod + def create_websocket(hostname, path, protocol=None): + sock = WebSocketClient.create_ssl_socket(hostname) + + query = f"GET {path} HTTP/1.1\r\n" + query += f"Host: {hostname}\r\n" + query += f"Connection: Upgrade\r\n" + query += f"Upgrade: websocket\r\n" + query += f"Sec-WebSocket-Key: {binascii.b2a_base64(bytes(random.getrandbits(8) + for _ in range(16)))[:-1].decode('utf-8')}\r\n" + query += f"Sec-WebSocket-Version: 13\r\n" + if protocol is not None: + query += f"Sec-WebSocket-Protocol: {protocol}\r\n" + query += f"\r\n" + + sock.write(query.encode('utf-8')) + + header = sock.readline()[:-2] + if not header: + sock.close() + print("ERROR: WebSocket not created (no response)") + return None + + if not header.startswith(b'HTTP/1.1 101 '): + sock.close() + print(f"ERROR: WebSocket not created (server returns '{header.decode('utf-8')}')") + return None + + print(f"INFO: WebSocket created! (server response: {header.decode('utf-8')})") + + while header: + if header == b'\r\n': + break + else: + print(header) + header = sock.readline() + + return WebSocketClient(sock) + + +class WSSMQTTException(Exception): + pass + + +class WSSMQTTClient: + def __init__( + self, + host, + path='/', + client_id=f"client-{machine.unique_id().hex()}", + user=None, + password=None, + keepalive=0, + ): + self.client_id = client_id + self.sock = None + self.host = host + self.path = path + self.pid = 0 + self.cb = None + self.user = user + self.pswd = password + self.keepalive = keepalive + self.lw_topic = None + self.lw_msg = None + self.lw_qos = 0 + self.lw_retain = False + + def _send_str(self, s): + self.sock.write(struct.pack("!H", len(s))) + self.sock.write(s) + + def _recv_len(self): + n = 0 + sh = 0 + while 1: + b = self.sock.read(1)[0] + n |= (b & 0x7F) << sh + if not b & 0x80: + return n + sh += 7 + + def set_callback(self, f): + self.cb = f + + def set_last_will(self, topic, msg, retain=False, qos=0): + assert 0 <= qos <= 2 + assert topic + self.lw_topic = topic + self.lw_msg = msg + self.lw_qos = qos + self.lw_retain = retain + + def connect(self, clean_session=True): + if self.sock is not None: + raise WSSMQTTException('Already connected') + + self.sock = WebSocketClient.create_websocket(self.host, self.path, protocol='mqtt') + if self.sock is None: + raise WSSMQTTException('Failed to create websocket') + + try: + premsg = bytearray(b"\x10\0\0\0\0\0") + msg = bytearray(b"\x04MQTT\x04\x02\0\0") + + sz = 10 + 2 + len(self.client_id) + msg[6] = clean_session << 1 + if self.user: + sz += 2 + len(self.user) + 2 + len(self.pswd) + msg[6] |= 0xC0 + if self.keepalive: + assert self.keepalive < 65536 + msg[7] |= self.keepalive >> 8 + msg[8] |= self.keepalive & 0x00FF + if self.lw_topic: + sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) + msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 + msg[6] |= self.lw_retain << 5 + + i = 1 + while sz > 0x7F: + premsg[i] = (sz & 0x7F) | 0x80 + sz >>= 7 + i += 1 + premsg[i] = sz + + self.sock.write(premsg, i + 2) + self.sock.write(msg) + # print(hex(len(msg)), hexlify(msg, ":")) + self._send_str(self.client_id) + if self.lw_topic: + self._send_str(self.lw_topic) + self._send_str(self.lw_msg) + if self.user: + self._send_str(self.user) + self._send_str(self.pswd) + resp = self.sock.read(4) + assert resp[0] == 0x20 and resp[1] == 0x02 + if resp[3] != 0: + raise WSSMQTTException(resp[3]) + return resp[2] & 1 + except BaseException as e: + self.sock.close() + self.sock = None + raise e + + def disconnect(self): + if self.sock is None: + return + self.sock.write(b"\xe0\0") + self.sock.close() + self.sock = None + + def is_connected(self): + return self.sock is not None + + def ping(self): + self.sock.write(b"\xc0\0") + + def publish(self, topic, msg, retain=False, qos=0): + pkt = bytearray(b"\x30\0\0\0") + pkt[0] |= qos << 1 | retain + sz = 2 + len(topic) + len(msg) + if qos > 0: + sz += 2 + assert sz < 2097152 + i = 1 + while sz > 0x7F: + pkt[i] = (sz & 0x7F) | 0x80 + sz >>= 7 + i += 1 + pkt[i] = sz + # print(hex(len(pkt)), hexlify(pkt, ":")) + self.sock.write(pkt, i + 1) + self._send_str(topic) + if qos > 0: + self.pid += 1 + pid = self.pid + struct.pack_into("!H", pkt, 0, pid) + self.sock.write(pkt, 2) + self.sock.write(msg) + if qos == 1: + while 1: + op = self.wait_msg() + if op == 0x40: + sz = self.sock.read(1) + assert sz == b"\x02" + rcv_pid = self.sock.read(2) + rcv_pid = rcv_pid[0] << 8 | rcv_pid[1] + if pid == rcv_pid: + return + elif qos == 2: + assert 0 + + def subscribe(self, topic, qos=0): + assert self.cb is not None, "Subscribe callback is not set" + pkt = bytearray(b"\x82\0\0\0") + self.pid += 1 + struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid) + # print(hex(len(pkt)), hexlify(pkt, ":")) + self.sock.write(pkt) + self._send_str(topic) + self.sock.write(qos.to_bytes(1, "little")) + while 1: + op = self.wait_msg() + if op == 0x90: + resp = self.sock.read(4) + # print(resp) + assert resp[1] == pkt[2] and resp[2] == pkt[3] + if resp[3] == 0x80: + raise WSSMQTTException(resp[3]) + return + + # Wait for a single incoming MQTT message and process it. + # Subscribed messages are delivered to a callback previously + # set by .set_callback() method. Other (internal) MQTT + # messages processed internally. + def wait_msg(self): + res = self.sock.read(1) + if res is None: + return None + if res == b"": + raise OSError(-1) + self.sock.setblocking(True) + if res == b"\xd0": # PINGRESP + sz = self.sock.read(1)[0] + assert sz == 0 + return None + op = res[0] + if op & 0xF0 != 0x30: + return op + sz = self._recv_len() + topic_len = self.sock.read(2) + topic_len = (topic_len[0] << 8) | topic_len[1] + topic = self.sock.read(topic_len) + sz -= topic_len + 2 + if op & 6: + pid = self.sock.read(2) + pid = pid[0] << 8 | pid[1] + sz -= 2 + msg = self.sock.read(sz) + self.cb(topic, msg) + if op & 6 == 2: + pkt = bytearray(b"\x40\x02\0\0") + struct.pack_into("!H", pkt, 2, pid) + self.sock.write(pkt) + elif op & 6 == 4: + assert 0 + return op + + # Checks whether a pending message from server is available. + # If not, returns immediately with None. Otherwise, does + # the same processing as wait_msg. + def check_msg(self): + self.sock.setblocking(False) + return self.wait_msg() +