diff --git a/mqtt-test.py b/mqtt-test.py new file mode 100644 index 0000000..33ab849 --- /dev/null +++ b/mqtt-test.py @@ -0,0 +1,214 @@ +import websocket +import struct +import random + +# --- Подключение --- +URL = "wss://insert-me.ru/mqtt" +USER = "esp8266-insert-me" +PASS = "insert-me" +CLIENT_ID = f"pyclient_{random.randint(0, 10000)}" +TOPIC = "#" + +def encode_length(length: int) -> bytes: + enc = bytearray() + while True: + digit = length & 0x7F + length >>= 7 + if length > 0: + digit |= 0x80 + enc.append(digit) + if length == 0: + break + return bytes(enc) + +def decode_length(data: bytes, start: int = 0) -> tuple: + """Декодирует остающуюся длину из data, возвращает (значение, количество_прочитанных_байт).""" + length = 0 + multiplier = 1 + pos = start + while True: + if pos >= len(data): + raise ValueError("Incomplete length encoding") + digit = data[pos] + length += (digit & 0x7F) * multiplier + multiplier <<= 7 + pos += 1 + if not (digit & 0x80): + break + return length, pos - start + +def build_connect(client_id: str, username: str = None, password: str = None, + keepalive: int = 60, clean_start: bool = True) -> bytes: + protocol_name = b"MQTT" + protocol_level = 5 + connect_flags = 0 + if clean_start: + connect_flags |= 0x02 + if username is not None: + connect_flags |= 0x80 + if password is not None: + connect_flags |= 0x40 + + properties = b"\x00" # без свойств + + client_id_bytes = client_id.encode('utf-8') + payload = struct.pack("!H", len(client_id_bytes)) + client_id_bytes + + if username is not None: + user_bytes = username.encode('utf-8') + payload += struct.pack("!H", len(user_bytes)) + user_bytes + if password is not None: + pass_bytes = password.encode('utf-8') + payload += struct.pack("!H", len(pass_bytes)) + pass_bytes + + variable_header = ( + struct.pack("!H", len(protocol_name)) + + protocol_name + + bytes([protocol_level]) + + bytes([connect_flags]) + + struct.pack("!H", keepalive) + + properties + ) + + remaining_length = len(variable_header) + len(payload) + fixed_header = b"\x10" + encode_length(remaining_length) + return fixed_header + variable_header + payload + +def build_subscribe(topic: str, packet_id: int = 1, qos: int = 0) -> bytes: + fixed_header = b"\x82" + topic_bytes = topic.encode('utf-8') + properties = b"\x00" + payload = struct.pack("!H", packet_id) + properties + payload += struct.pack("!H", len(topic_bytes)) + topic_bytes + payload += bytes([qos]) + remaining_length = len(payload) + return fixed_header + encode_length(remaining_length) + payload + +def parse_publish(data: bytes) -> tuple: + """ + Распарсить PUBLISH пакет MQTT 5. + Возвращает (topic, payload, qos, packet_id) или (None, None, None, None) если ошибка. + """ + if not data or data[0] & 0xF0 != 0x30: + return None, None, None, None + + # Пропускаем фиксированный заголовок (1 байт) и декодируем длину + pos = 1 + remaining_length, consumed = decode_length(data, pos) + pos += consumed + + # Читаем топик + if pos + 2 > len(data): + return None, None, None, None + topic_len = struct.unpack("!H", data[pos:pos+2])[0] + pos += 2 + if pos + topic_len > len(data): + return None, None, None, None + topic = data[pos:pos+topic_len].decode('utf-8', errors='replace') + pos += topic_len + + # QoS и Packet ID + qos = (data[0] >> 1) & 0x03 + packet_id = None + if qos > 0: + if pos + 2 > len(data): + return None, None, None, None + packet_id = struct.unpack("!H", data[pos:pos+2])[0] + pos += 2 + + # Свойства: сначала длина свойств (1 байт, т.к. у нас всегда 0) + if pos >= len(data): + return None, None, None, None + prop_len = data[pos] + pos += 1 + if prop_len > 0: + # Если есть свойства, пропускаем их (упрощённо) + pos += prop_len + + # Всё что осталось – payload + payload = data[pos:].decode('utf-8', errors='replace') + return topic, payload, qos, packet_id + +def main(): + # Создаём WebSocket + ws = websocket.create_connection(URL, subprotocols=["mqtt"], timeout=10) + + # 1. CONNECT + connect_pkt = build_connect(CLIENT_ID, username=USER, password=PASS, keepalive=60) + print("Sending CONNECT:", connect_pkt.hex()) + ws.send_binary(connect_pkt) + + # 2. Читаем CONNACK + try: + data = ws.recv() + print("Received:", data.hex() if data else "(empty)") + if data and data[0] == 0x20: + if len(data) >= 4: + reason_code = data[3] + if reason_code == 0: + print("✅ Connected successfully") + else: + print(f"❌ Connection refused, reason code: {reason_code}") + ws.close() + return + else: + print("Malformed CONNACK") + ws.close() + return + else: + print("Not a CONNACK received") + ws.close() + return + except Exception as e: + print(f"Error receiving CONNACK: {e}") + ws.close() + return + + # 3. Подписка + sub_pkt = build_subscribe(TOPIC, packet_id=1, qos=0) + print("Sending SUBSCRIBE:", sub_pkt.hex()) + ws.send_binary(sub_pkt) + + suback = ws.recv() + print("SUBACK:", suback.hex()) + if suback and suback[0] == 0x90: + reason = suback[-1] + if reason == 0: + print(f"✅ Subscribed to '{TOPIC}'") + else: + print(f"❌ Subscribe failed, reason code: {reason}") + ws.close() + return + + # 4. Цикл приёма + print("Waiting for messages... (Ctrl+C to stop)") + try: + ws.settimeout(5) + while True: + try: + msg = ws.recv() + except websocket.WebSocketTimeoutException: + ws.send_binary(b"\xC0\x00") # PINGREQ + continue + if not msg: + break + + # Если PINGRESP – игнорируем + if msg[0] == 0xD0 and len(msg) == 2 and msg[1] == 0x00: + continue + + # Парсим PUBLISH + topic, payload, qos, pid = parse_publish(msg) + if topic is not None: + print(f"📨 Topic: {topic}, Payload: {payload}") + if qos > 0: + print(f" (QoS={qos}, PacketID={pid})") + else: + print(f"Other MQTT packet: {msg.hex()}") + except KeyboardInterrupt: + pass + finally: + ws.close() + +if __name__ == "__main__": + main() diff --git a/src/interfaces.py b/src/interfaces.py index 7350ef6..887f859 100644 --- a/src/interfaces.py +++ b/src/interfaces.py @@ -1,6 +1,6 @@ import machine import time -from machine import Pin, PWM +# from machine import Pin, PWM _U16_MAX = 0xFFFF @@ -92,11 +92,12 @@ class ServoInterface(PwmInterface): _interfaces = [ - PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n0", Pin(13)), - PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n1", Pin(12)), - PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n2", Pin(14)), - PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n3", Pin(27)), - ServoInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm1n0", Pin(32)), + # PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n0", Pin(13)), + # PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n1", Pin(12)), + # PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n2", Pin(14)), + # PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n3", Pin(27)), + # ServoInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm1n0", Pin(32)), + BaseInterface(f"/home/38/esp8266/pwm0n0"), ] diff --git a/src/main.py b/src/main.py index 773a20e..9013b13 100644 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,5 @@ import errno -import network +# import network import interfaces import utime as time from wssmqtt import * @@ -7,9 +7,9 @@ from env import * interfaces.init() -wifi = network.WLAN(network.STA_IF) -wifi.active(1) -wifi.connect(WIFI_SSID, WIFI_PASSWORD) +# wifi = network.WLAN(network.STA_IF) +# wifi.active(1) +# wifi.connect(WIFI_SSID, WIFI_PASSWORD) def do_connection(): diff --git a/src/wssmqtt.py b/src/wssmqtt.py index 13ad493..1c3450d 100644 --- a/src/wssmqtt.py +++ b/src/wssmqtt.py @@ -8,59 +8,50 @@ import machine from websocket import websocket -class WebSocketClient(websocket): - def __init__(self, sock): - super().__init__(sock) - self.__sock = sock +def create_ssl_socket(hostname): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + addr = socket.getaddrinfo(hostname, 443)[0][-1] + s.connect(addr) + return ssl.wrap_socket(s) - def setblocking(self, value): - self.__sock.setblocking(value) - @staticmethod - def create_ssl_socket(hostname): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - addr = socket.getaddrinfo(hostname, 443)[0][-1] - s.connect(addr) - return ssl.wrap_socket(s) +def create_websocket(hostname, path, protocol=None): + sock = create_ssl_socket(hostname) - @staticmethod - def create_websocket(hostname, path, protocol=None): - sock = WebSocketClient.create_ssl_socket(hostname) + query = f"GET {path} HTTP/1.1\r\n" + query += f"Host: {hostname}\r\n" + query += f"Connection: Upgrade\r\n" + query += f"Upgrade: websocket\r\n" + query += f"Sec-WebSocket-Key: {binascii.b2a_base64(bytes(random.getrandbits(8) + for _ in range(16)))[:-1].decode('utf-8')}\r\n" + query += f"Sec-WebSocket-Version: 13\r\n" + if protocol is not None: + query += f"Sec-WebSocket-Protocol: {protocol}\r\n" + query += f"\r\n" - query = f"GET {path} HTTP/1.1\r\n" - query += f"Host: {hostname}\r\n" - query += f"Connection: Upgrade\r\n" - query += f"Upgrade: websocket\r\n" - query += f"Sec-WebSocket-Key: {binascii.b2a_base64(bytes(random.getrandbits(8) - for _ in range(16)))[:-1].decode('utf-8')}\r\n" - query += f"Sec-WebSocket-Version: 13\r\n" - if protocol is not None: - query += f"Sec-WebSocket-Protocol: {protocol}\r\n" - query += f"\r\n" + sock.write(query.encode('utf-8')) - sock.write(query.encode('utf-8')) + header = sock.readline()[:-2] + if not header: + sock.close() + print("ERROR: WebSocket not created (no response)") + return None - header = sock.readline()[:-2] - if not header: - sock.close() - print("ERROR: WebSocket not created (no response)") - return None + if not header.startswith(b'HTTP/1.1 101 '): + sock.close() + print(f"ERROR: WebSocket not created (server returns '{header.decode('utf-8')}')") + return None - if not header.startswith(b'HTTP/1.1 101 '): - sock.close() - print(f"ERROR: WebSocket not created (server returns '{header.decode('utf-8')}')") - return None + print(f"INFO: WebSocket created! (server response: {header.decode('utf-8')})") - print(f"INFO: WebSocket created! (server response: {header.decode('utf-8')})") + while header: + if header == b'\r\n': + break + else: + print(header) + header = sock.readline() - while header: - if header == b'\r\n': - break - else: - print(header) - header = sock.readline() - - return WebSocketClient(sock) + return websocket(sock) class WSSMQTTException(Exception): @@ -68,15 +59,7 @@ class WSSMQTTException(Exception): class WSSMQTTClient: - def __init__( - self, - host, - path='/', - client_id=f"client-{machine.unique_id().hex()}", - user=None, - password=None, - keepalive=0, - ): + def __init__(self, host, path='/', client_id=f"micropython-unix", user=None, password=None, keepalive=15): self.client_id = client_id self.sock = None self.host = host @@ -90,80 +73,132 @@ class WSSMQTTClient: self.lw_msg = None self.lw_qos = 0 self.lw_retain = False + self._packet_id_counter = 1 - def _send_str(self, s): - self.sock.write(struct.pack("!H", len(s))) - self.sock.write(s) + def _encode_length(self, length): + enc = bytearray() + while True: + digit = length & 0x7F + length >>= 7 + if length > 0: + digit |= 0x80 + enc.append(digit) + if length == 0: + break + return bytes(enc) - def _recv_len(self): - n = 0 - sh = 0 - while 1: - b = self.sock.read(1)[0] - n |= (b & 0x7F) << sh - if not b & 0x80: - return n - sh += 7 + def _decode_length(self, data, start = 0): + """Декодирует остающуюся длину из data, возвращает (значение, количество_прочитанных_байт).""" + length = 0 + multiplier = 1 + pos = start + while True: + if pos >= len(data): + raise WSSMQTTException("Value error: Incomplete length encoding") + digit = data[pos] + length += (digit & 0x7F) * multiplier + multiplier <<= 7 + pos += 1 + if not (digit & 0x80): + break + return length, pos - start + + def _build_connect(self): + protocol_name = b"MQTT" + protocol_level = 5 + connect_flags = 0 + connect_flags |= 0x02 # clean start + if self.user is not None: + connect_flags |= 0x80 + if self.pswd is not None: + connect_flags |= 0x40 + + properties = b"\x00" # без свойств + + client_id_bytes = self.client_id.encode('utf-8') + payload = struct.pack("!H", len(client_id_bytes)) + client_id_bytes + + if self.user is not None: + user_bytes = self.user.encode('utf-8') + payload += struct.pack("!H", len(user_bytes)) + user_bytes + if self.pswd is not None: + pass_bytes = self.pswd.encode('utf-8') + payload += struct.pack("!H", len(pass_bytes)) + pass_bytes + + variable_header = struct.pack("!H", len(protocol_name)) + protocol_name + bytes([protocol_level]) + bytes([connect_flags]) + struct.pack("!H", self.keepalive) + properties + + remaining_length = len(variable_header) + len(payload) + fixed_header = b"\x10" + self._encode_length(remaining_length) + return fixed_header + variable_header + payload + + def _parse_publish(self, data): + """ + Распарсить PUBLISH пакет MQTT 5. + Возвращает (topic, payload, qos, packet_id) или (None, None, None, None) если ошибка. + """ + if not data or data[0] & 0xF0 != 0x30: + return None, None, None, None + + # Пропускаем фиксированный заголовок (1 байт) и декодируем длину + pos = 1 + remaining_length, consumed = decode_length(data, pos) + pos += consumed + + # Читаем топик + if pos + 2 > len(data): + return None, None, None, None + topic_len = struct.unpack("!H", data[pos:pos+2])[0] + pos += 2 + if pos + topic_len > len(data): + return None, None, None, None + topic = data[pos:pos+topic_len].decode('utf-8', errors='replace') + pos += topic_len + + # QoS и Packet ID + qos = (data[0] >> 1) & 0x03 + packet_id = None + if qos > 0: + if pos + 2 > len(data): + return None, None, None, None + packet_id = struct.unpack("!H", data[pos:pos+2])[0] + pos += 2 + + # Свойства: сначала длина свойств (1 байт, т.к. у нас всегда 0) + if pos >= len(data): + return None, None, None, None + prop_len = data[pos] + pos += 1 + if prop_len > 0: + # Если есть свойства, пропускаем их (упрощённо) + pos += prop_len + + # Всё что осталось – payload + payload = data[pos:].decode('utf-8', errors='replace') + return topic, payload, qos, packet_id def set_callback(self, f): self.cb = f - def set_last_will(self, topic, msg, retain=False, qos=0): - assert 0 <= qos <= 2 - assert topic - self.lw_topic = topic - self.lw_msg = msg - self.lw_qos = qos - self.lw_retain = retain - - def connect(self, clean_session=True): + def connect(self): if self.sock is not None: raise WSSMQTTException('Already connected') - self.sock = WebSocketClient.create_websocket(self.host, self.path, protocol='mqtt') + self.sock = create_websocket(self.host, self.path, protocol='mqtt') if self.sock is None: raise WSSMQTTException('Failed to create websocket') try: - premsg = bytearray(b"\x10\0\0\0\0\0") - msg = bytearray(b"\x04MQTT\x04\x02\0\0") - - sz = 10 + 2 + len(self.client_id) - msg[6] = clean_session << 1 - if self.user: - sz += 2 + len(self.user) + 2 + len(self.pswd) - msg[6] |= 0xC0 - if self.keepalive: - assert self.keepalive < 65536 - msg[7] |= self.keepalive >> 8 - msg[8] |= self.keepalive & 0x00FF - if self.lw_topic: - sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) - msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 - msg[6] |= self.lw_retain << 5 - - i = 1 - while sz > 0x7F: - premsg[i] = (sz & 0x7F) | 0x80 - sz >>= 7 - i += 1 - premsg[i] = sz - - self.sock.write(premsg, i + 2) - self.sock.write(msg) - # print(hex(len(msg)), hexlify(msg, ":")) - self._send_str(self.client_id) - if self.lw_topic: - self._send_str(self.lw_topic) - self._send_str(self.lw_msg) - if self.user: - self._send_str(self.user) - self._send_str(self.pswd) - resp = self.sock.read(4) - assert resp[0] == 0x20 and resp[1] == 0x02 - if resp[3] != 0: - raise WSSMQTTException(resp[3]) - return resp[2] & 1 + self.sock.write(self._build_connect()) + data = self.sock.read() + if data and data[0] == 0x20: + if len(data) >= 4: + reason_code = data[3] + if reason_code != 0: + raise WSSMQTTException(f'Connection refused, reason code: {reason_code}') + else: + raise WSSMQTTException(f'Malformed CONNACK {data}') + else: + raise WSSMQTTException(f'Not a CONNACK received, received {data}') except BaseException as e: self.sock.close() self.sock = None @@ -183,53 +218,39 @@ class WSSMQTTClient: self.sock.write(b"\xc0\0") def publish(self, topic, msg, retain=False, qos=0): - pkt = bytearray(b"\x30\0\0\0") - pkt[0] |= qos << 1 | retain - sz = 2 + len(topic) + len(msg) - if qos > 0: - sz += 2 - assert sz < 2097152 - i = 1 - while sz > 0x7F: - pkt[i] = (sz & 0x7F) | 0x80 - sz >>= 7 - i += 1 - pkt[i] = sz - # print(hex(len(pkt)), hexlify(pkt, ":")) - self.sock.write(pkt, i + 1) - self._send_str(topic) - if qos > 0: - self.pid += 1 - pid = self.pid - struct.pack_into("!H", pkt, 0, pid) - self.sock.write(pkt, 2) - self.sock.write(msg) - if qos == 1: - while 1: - op = self.wait_msg() - if op == 0x40: - sz = self.sock.read(1) - assert sz == b"\x02" - rcv_pid = self.sock.read(2) - rcv_pid = rcv_pid[0] << 8 | rcv_pid[1] - if pid == rcv_pid: - return - elif qos == 2: - assert 0 + if self.sock is None: + raise WSSMQTTException("Not connected") + # Для QoS 0 packet_id не нужен + flags = (qos << 1) | (1 if retain else 0) + fixed = bytes([0x30 | flags]) + topic_bytes = topic.encode('utf-8') + msg_bytes = msg.encode('utf-8') + properties = b"\x00" # пустые свойства + # Переменный заголовок: topic length + topic + properties + variable = struct.pack("!H", len(topic_bytes)) + topic_bytes + properties + payload = msg_bytes + remaining = len(variable) + len(payload) + pkt = fixed + self._encode_length(remaining) + variable + payload + self.sock.write(pkt) + # Для QoS 1 и 2 нужно ждать подтверждения, но пока опускаем def subscribe(self, topic, qos=0): assert self.cb is not None, "Subscribe callback is not set" - pkt = bytearray(b"\x82\0\0\0") - self.pid += 1 - struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid) - # print(hex(len(pkt)), hexlify(pkt, ":")) + fixed_header = b"\x82" + topic_bytes = topic.encode('utf-8') + properties = b"\x00" + payload = struct.pack("!H", self._packet_id_counter) + properties + payload += struct.pack("!H", len(topic_bytes)) + topic_bytes + payload += bytes([qos]) + remaining_length = len(payload) + self._packet_id_counter = (self._packet_id_counter + 1) & 0xffff + + pkt = fixed_header + self._encode_length(remaining_length) + payload self.sock.write(pkt) - self._send_str(topic) - self.sock.write(qos.to_bytes(1, "little")) while 1: op = self.wait_msg() if op == 0x90: - resp = self.sock.read(4) + resp = self.sock.read() # print(resp) assert resp[1] == pkt[2] and resp[2] == pkt[3] if resp[3] == 0x80: @@ -241,7 +262,7 @@ class WSSMQTTClient: # set by .set_callback() method. Other (internal) MQTT # messages processed internally. def wait_msg(self): - res = self.sock.read(1) + res = self.sock.read() if res is None: return None if res == b"":