перевел все на mqtt 5, оно вроде и работало на unix-порте, но в составе прошивки не завелось. вечером разберусь почему
This commit is contained in:
+214
@@ -0,0 +1,214 @@
|
||||
import websocket
|
||||
import struct
|
||||
import random
|
||||
|
||||
# --- Подключение ---
|
||||
URL = "wss://insert-me.ru/mqtt"
|
||||
USER = "esp8266-insert-me"
|
||||
PASS = "insert-me"
|
||||
CLIENT_ID = f"pyclient_{random.randint(0, 10000)}"
|
||||
TOPIC = "#"
|
||||
|
||||
def encode_length(length: int) -> bytes:
|
||||
enc = bytearray()
|
||||
while True:
|
||||
digit = length & 0x7F
|
||||
length >>= 7
|
||||
if length > 0:
|
||||
digit |= 0x80
|
||||
enc.append(digit)
|
||||
if length == 0:
|
||||
break
|
||||
return bytes(enc)
|
||||
|
||||
def decode_length(data: bytes, start: int = 0) -> tuple:
|
||||
"""Декодирует остающуюся длину из data, возвращает (значение, количество_прочитанных_байт)."""
|
||||
length = 0
|
||||
multiplier = 1
|
||||
pos = start
|
||||
while True:
|
||||
if pos >= len(data):
|
||||
raise ValueError("Incomplete length encoding")
|
||||
digit = data[pos]
|
||||
length += (digit & 0x7F) * multiplier
|
||||
multiplier <<= 7
|
||||
pos += 1
|
||||
if not (digit & 0x80):
|
||||
break
|
||||
return length, pos - start
|
||||
|
||||
def build_connect(client_id: str, username: str = None, password: str = None,
|
||||
keepalive: int = 60, clean_start: bool = True) -> bytes:
|
||||
protocol_name = b"MQTT"
|
||||
protocol_level = 5
|
||||
connect_flags = 0
|
||||
if clean_start:
|
||||
connect_flags |= 0x02
|
||||
if username is not None:
|
||||
connect_flags |= 0x80
|
||||
if password is not None:
|
||||
connect_flags |= 0x40
|
||||
|
||||
properties = b"\x00" # без свойств
|
||||
|
||||
client_id_bytes = client_id.encode('utf-8')
|
||||
payload = struct.pack("!H", len(client_id_bytes)) + client_id_bytes
|
||||
|
||||
if username is not None:
|
||||
user_bytes = username.encode('utf-8')
|
||||
payload += struct.pack("!H", len(user_bytes)) + user_bytes
|
||||
if password is not None:
|
||||
pass_bytes = password.encode('utf-8')
|
||||
payload += struct.pack("!H", len(pass_bytes)) + pass_bytes
|
||||
|
||||
variable_header = (
|
||||
struct.pack("!H", len(protocol_name)) +
|
||||
protocol_name +
|
||||
bytes([protocol_level]) +
|
||||
bytes([connect_flags]) +
|
||||
struct.pack("!H", keepalive) +
|
||||
properties
|
||||
)
|
||||
|
||||
remaining_length = len(variable_header) + len(payload)
|
||||
fixed_header = b"\x10" + encode_length(remaining_length)
|
||||
return fixed_header + variable_header + payload
|
||||
|
||||
def build_subscribe(topic: str, packet_id: int = 1, qos: int = 0) -> bytes:
|
||||
fixed_header = b"\x82"
|
||||
topic_bytes = topic.encode('utf-8')
|
||||
properties = b"\x00"
|
||||
payload = struct.pack("!H", packet_id) + properties
|
||||
payload += struct.pack("!H", len(topic_bytes)) + topic_bytes
|
||||
payload += bytes([qos])
|
||||
remaining_length = len(payload)
|
||||
return fixed_header + encode_length(remaining_length) + payload
|
||||
|
||||
def parse_publish(data: bytes) -> tuple:
|
||||
"""
|
||||
Распарсить PUBLISH пакет MQTT 5.
|
||||
Возвращает (topic, payload, qos, packet_id) или (None, None, None, None) если ошибка.
|
||||
"""
|
||||
if not data or data[0] & 0xF0 != 0x30:
|
||||
return None, None, None, None
|
||||
|
||||
# Пропускаем фиксированный заголовок (1 байт) и декодируем длину
|
||||
pos = 1
|
||||
remaining_length, consumed = decode_length(data, pos)
|
||||
pos += consumed
|
||||
|
||||
# Читаем топик
|
||||
if pos + 2 > len(data):
|
||||
return None, None, None, None
|
||||
topic_len = struct.unpack("!H", data[pos:pos+2])[0]
|
||||
pos += 2
|
||||
if pos + topic_len > len(data):
|
||||
return None, None, None, None
|
||||
topic = data[pos:pos+topic_len].decode('utf-8', errors='replace')
|
||||
pos += topic_len
|
||||
|
||||
# QoS и Packet ID
|
||||
qos = (data[0] >> 1) & 0x03
|
||||
packet_id = None
|
||||
if qos > 0:
|
||||
if pos + 2 > len(data):
|
||||
return None, None, None, None
|
||||
packet_id = struct.unpack("!H", data[pos:pos+2])[0]
|
||||
pos += 2
|
||||
|
||||
# Свойства: сначала длина свойств (1 байт, т.к. у нас всегда 0)
|
||||
if pos >= len(data):
|
||||
return None, None, None, None
|
||||
prop_len = data[pos]
|
||||
pos += 1
|
||||
if prop_len > 0:
|
||||
# Если есть свойства, пропускаем их (упрощённо)
|
||||
pos += prop_len
|
||||
|
||||
# Всё что осталось – payload
|
||||
payload = data[pos:].decode('utf-8', errors='replace')
|
||||
return topic, payload, qos, packet_id
|
||||
|
||||
def main():
|
||||
# Создаём WebSocket
|
||||
ws = websocket.create_connection(URL, subprotocols=["mqtt"], timeout=10)
|
||||
|
||||
# 1. CONNECT
|
||||
connect_pkt = build_connect(CLIENT_ID, username=USER, password=PASS, keepalive=60)
|
||||
print("Sending CONNECT:", connect_pkt.hex())
|
||||
ws.send_binary(connect_pkt)
|
||||
|
||||
# 2. Читаем CONNACK
|
||||
try:
|
||||
data = ws.recv()
|
||||
print("Received:", data.hex() if data else "(empty)")
|
||||
if data and data[0] == 0x20:
|
||||
if len(data) >= 4:
|
||||
reason_code = data[3]
|
||||
if reason_code == 0:
|
||||
print("✅ Connected successfully")
|
||||
else:
|
||||
print(f"❌ Connection refused, reason code: {reason_code}")
|
||||
ws.close()
|
||||
return
|
||||
else:
|
||||
print("Malformed CONNACK")
|
||||
ws.close()
|
||||
return
|
||||
else:
|
||||
print("Not a CONNACK received")
|
||||
ws.close()
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"Error receiving CONNACK: {e}")
|
||||
ws.close()
|
||||
return
|
||||
|
||||
# 3. Подписка
|
||||
sub_pkt = build_subscribe(TOPIC, packet_id=1, qos=0)
|
||||
print("Sending SUBSCRIBE:", sub_pkt.hex())
|
||||
ws.send_binary(sub_pkt)
|
||||
|
||||
suback = ws.recv()
|
||||
print("SUBACK:", suback.hex())
|
||||
if suback and suback[0] == 0x90:
|
||||
reason = suback[-1]
|
||||
if reason == 0:
|
||||
print(f"✅ Subscribed to '{TOPIC}'")
|
||||
else:
|
||||
print(f"❌ Subscribe failed, reason code: {reason}")
|
||||
ws.close()
|
||||
return
|
||||
|
||||
# 4. Цикл приёма
|
||||
print("Waiting for messages... (Ctrl+C to stop)")
|
||||
try:
|
||||
ws.settimeout(5)
|
||||
while True:
|
||||
try:
|
||||
msg = ws.recv()
|
||||
except websocket.WebSocketTimeoutException:
|
||||
ws.send_binary(b"\xC0\x00") # PINGREQ
|
||||
continue
|
||||
if not msg:
|
||||
break
|
||||
|
||||
# Если PINGRESP – игнорируем
|
||||
if msg[0] == 0xD0 and len(msg) == 2 and msg[1] == 0x00:
|
||||
continue
|
||||
|
||||
# Парсим PUBLISH
|
||||
topic, payload, qos, pid = parse_publish(msg)
|
||||
if topic is not None:
|
||||
print(f"📨 Topic: {topic}, Payload: {payload}")
|
||||
if qos > 0:
|
||||
print(f" (QoS={qos}, PacketID={pid})")
|
||||
else:
|
||||
print(f"Other MQTT packet: {msg.hex()}")
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
ws.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+7
-6
@@ -1,6 +1,6 @@
|
||||
import machine
|
||||
import time
|
||||
from machine import Pin, PWM
|
||||
# from machine import Pin, PWM
|
||||
|
||||
_U16_MAX = 0xFFFF
|
||||
|
||||
@@ -92,11 +92,12 @@ class ServoInterface(PwmInterface):
|
||||
|
||||
|
||||
_interfaces = [
|
||||
PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n0", Pin(13)),
|
||||
PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n1", Pin(12)),
|
||||
PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n2", Pin(14)),
|
||||
PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n3", Pin(27)),
|
||||
ServoInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm1n0", Pin(32)),
|
||||
# PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n0", Pin(13)),
|
||||
# PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n1", Pin(12)),
|
||||
# PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n2", Pin(14)),
|
||||
# PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n3", Pin(27)),
|
||||
# ServoInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm1n0", Pin(32)),
|
||||
BaseInterface(f"/home/38/esp8266/pwm0n0"),
|
||||
]
|
||||
|
||||
|
||||
|
||||
+4
-4
@@ -1,5 +1,5 @@
|
||||
import errno
|
||||
import network
|
||||
# import network
|
||||
import interfaces
|
||||
import utime as time
|
||||
from wssmqtt import *
|
||||
@@ -7,9 +7,9 @@ from env import *
|
||||
|
||||
interfaces.init()
|
||||
|
||||
wifi = network.WLAN(network.STA_IF)
|
||||
wifi.active(1)
|
||||
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
|
||||
# wifi = network.WLAN(network.STA_IF)
|
||||
# wifi.active(1)
|
||||
# wifi.connect(WIFI_SSID, WIFI_PASSWORD)
|
||||
|
||||
|
||||
def do_connection():
|
||||
|
||||
+176
-155
@@ -8,59 +8,50 @@ import machine
|
||||
from websocket import websocket
|
||||
|
||||
|
||||
class WebSocketClient(websocket):
|
||||
def __init__(self, sock):
|
||||
super().__init__(sock)
|
||||
self.__sock = sock
|
||||
def create_ssl_socket(hostname):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
addr = socket.getaddrinfo(hostname, 443)[0][-1]
|
||||
s.connect(addr)
|
||||
return ssl.wrap_socket(s)
|
||||
|
||||
def setblocking(self, value):
|
||||
self.__sock.setblocking(value)
|
||||
|
||||
@staticmethod
|
||||
def create_ssl_socket(hostname):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
addr = socket.getaddrinfo(hostname, 443)[0][-1]
|
||||
s.connect(addr)
|
||||
return ssl.wrap_socket(s)
|
||||
def create_websocket(hostname, path, protocol=None):
|
||||
sock = create_ssl_socket(hostname)
|
||||
|
||||
@staticmethod
|
||||
def create_websocket(hostname, path, protocol=None):
|
||||
sock = WebSocketClient.create_ssl_socket(hostname)
|
||||
query = f"GET {path} HTTP/1.1\r\n"
|
||||
query += f"Host: {hostname}\r\n"
|
||||
query += f"Connection: Upgrade\r\n"
|
||||
query += f"Upgrade: websocket\r\n"
|
||||
query += f"Sec-WebSocket-Key: {binascii.b2a_base64(bytes(random.getrandbits(8)
|
||||
for _ in range(16)))[:-1].decode('utf-8')}\r\n"
|
||||
query += f"Sec-WebSocket-Version: 13\r\n"
|
||||
if protocol is not None:
|
||||
query += f"Sec-WebSocket-Protocol: {protocol}\r\n"
|
||||
query += f"\r\n"
|
||||
|
||||
query = f"GET {path} HTTP/1.1\r\n"
|
||||
query += f"Host: {hostname}\r\n"
|
||||
query += f"Connection: Upgrade\r\n"
|
||||
query += f"Upgrade: websocket\r\n"
|
||||
query += f"Sec-WebSocket-Key: {binascii.b2a_base64(bytes(random.getrandbits(8)
|
||||
for _ in range(16)))[:-1].decode('utf-8')}\r\n"
|
||||
query += f"Sec-WebSocket-Version: 13\r\n"
|
||||
if protocol is not None:
|
||||
query += f"Sec-WebSocket-Protocol: {protocol}\r\n"
|
||||
query += f"\r\n"
|
||||
sock.write(query.encode('utf-8'))
|
||||
|
||||
sock.write(query.encode('utf-8'))
|
||||
header = sock.readline()[:-2]
|
||||
if not header:
|
||||
sock.close()
|
||||
print("ERROR: WebSocket not created (no response)")
|
||||
return None
|
||||
|
||||
header = sock.readline()[:-2]
|
||||
if not header:
|
||||
sock.close()
|
||||
print("ERROR: WebSocket not created (no response)")
|
||||
return None
|
||||
if not header.startswith(b'HTTP/1.1 101 '):
|
||||
sock.close()
|
||||
print(f"ERROR: WebSocket not created (server returns '{header.decode('utf-8')}')")
|
||||
return None
|
||||
|
||||
if not header.startswith(b'HTTP/1.1 101 '):
|
||||
sock.close()
|
||||
print(f"ERROR: WebSocket not created (server returns '{header.decode('utf-8')}')")
|
||||
return None
|
||||
print(f"INFO: WebSocket created! (server response: {header.decode('utf-8')})")
|
||||
|
||||
print(f"INFO: WebSocket created! (server response: {header.decode('utf-8')})")
|
||||
while header:
|
||||
if header == b'\r\n':
|
||||
break
|
||||
else:
|
||||
print(header)
|
||||
header = sock.readline()
|
||||
|
||||
while header:
|
||||
if header == b'\r\n':
|
||||
break
|
||||
else:
|
||||
print(header)
|
||||
header = sock.readline()
|
||||
|
||||
return WebSocketClient(sock)
|
||||
return websocket(sock)
|
||||
|
||||
|
||||
class WSSMQTTException(Exception):
|
||||
@@ -68,15 +59,7 @@ class WSSMQTTException(Exception):
|
||||
|
||||
|
||||
class WSSMQTTClient:
|
||||
def __init__(
|
||||
self,
|
||||
host,
|
||||
path='/',
|
||||
client_id=f"client-{machine.unique_id().hex()}",
|
||||
user=None,
|
||||
password=None,
|
||||
keepalive=0,
|
||||
):
|
||||
def __init__(self, host, path='/', client_id=f"micropython-unix", user=None, password=None, keepalive=15):
|
||||
self.client_id = client_id
|
||||
self.sock = None
|
||||
self.host = host
|
||||
@@ -90,80 +73,132 @@ class WSSMQTTClient:
|
||||
self.lw_msg = None
|
||||
self.lw_qos = 0
|
||||
self.lw_retain = False
|
||||
self._packet_id_counter = 1
|
||||
|
||||
def _send_str(self, s):
|
||||
self.sock.write(struct.pack("!H", len(s)))
|
||||
self.sock.write(s)
|
||||
def _encode_length(self, length):
|
||||
enc = bytearray()
|
||||
while True:
|
||||
digit = length & 0x7F
|
||||
length >>= 7
|
||||
if length > 0:
|
||||
digit |= 0x80
|
||||
enc.append(digit)
|
||||
if length == 0:
|
||||
break
|
||||
return bytes(enc)
|
||||
|
||||
def _recv_len(self):
|
||||
n = 0
|
||||
sh = 0
|
||||
while 1:
|
||||
b = self.sock.read(1)[0]
|
||||
n |= (b & 0x7F) << sh
|
||||
if not b & 0x80:
|
||||
return n
|
||||
sh += 7
|
||||
def _decode_length(self, data, start = 0):
|
||||
"""Декодирует остающуюся длину из data, возвращает (значение, количество_прочитанных_байт)."""
|
||||
length = 0
|
||||
multiplier = 1
|
||||
pos = start
|
||||
while True:
|
||||
if pos >= len(data):
|
||||
raise WSSMQTTException("Value error: Incomplete length encoding")
|
||||
digit = data[pos]
|
||||
length += (digit & 0x7F) * multiplier
|
||||
multiplier <<= 7
|
||||
pos += 1
|
||||
if not (digit & 0x80):
|
||||
break
|
||||
return length, pos - start
|
||||
|
||||
def _build_connect(self):
|
||||
protocol_name = b"MQTT"
|
||||
protocol_level = 5
|
||||
connect_flags = 0
|
||||
connect_flags |= 0x02 # clean start
|
||||
if self.user is not None:
|
||||
connect_flags |= 0x80
|
||||
if self.pswd is not None:
|
||||
connect_flags |= 0x40
|
||||
|
||||
properties = b"\x00" # без свойств
|
||||
|
||||
client_id_bytes = self.client_id.encode('utf-8')
|
||||
payload = struct.pack("!H", len(client_id_bytes)) + client_id_bytes
|
||||
|
||||
if self.user is not None:
|
||||
user_bytes = self.user.encode('utf-8')
|
||||
payload += struct.pack("!H", len(user_bytes)) + user_bytes
|
||||
if self.pswd is not None:
|
||||
pass_bytes = self.pswd.encode('utf-8')
|
||||
payload += struct.pack("!H", len(pass_bytes)) + pass_bytes
|
||||
|
||||
variable_header = struct.pack("!H", len(protocol_name)) + protocol_name + bytes([protocol_level]) + bytes([connect_flags]) + struct.pack("!H", self.keepalive) + properties
|
||||
|
||||
remaining_length = len(variable_header) + len(payload)
|
||||
fixed_header = b"\x10" + self._encode_length(remaining_length)
|
||||
return fixed_header + variable_header + payload
|
||||
|
||||
def _parse_publish(self, data):
|
||||
"""
|
||||
Распарсить PUBLISH пакет MQTT 5.
|
||||
Возвращает (topic, payload, qos, packet_id) или (None, None, None, None) если ошибка.
|
||||
"""
|
||||
if not data or data[0] & 0xF0 != 0x30:
|
||||
return None, None, None, None
|
||||
|
||||
# Пропускаем фиксированный заголовок (1 байт) и декодируем длину
|
||||
pos = 1
|
||||
remaining_length, consumed = decode_length(data, pos)
|
||||
pos += consumed
|
||||
|
||||
# Читаем топик
|
||||
if pos + 2 > len(data):
|
||||
return None, None, None, None
|
||||
topic_len = struct.unpack("!H", data[pos:pos+2])[0]
|
||||
pos += 2
|
||||
if pos + topic_len > len(data):
|
||||
return None, None, None, None
|
||||
topic = data[pos:pos+topic_len].decode('utf-8', errors='replace')
|
||||
pos += topic_len
|
||||
|
||||
# QoS и Packet ID
|
||||
qos = (data[0] >> 1) & 0x03
|
||||
packet_id = None
|
||||
if qos > 0:
|
||||
if pos + 2 > len(data):
|
||||
return None, None, None, None
|
||||
packet_id = struct.unpack("!H", data[pos:pos+2])[0]
|
||||
pos += 2
|
||||
|
||||
# Свойства: сначала длина свойств (1 байт, т.к. у нас всегда 0)
|
||||
if pos >= len(data):
|
||||
return None, None, None, None
|
||||
prop_len = data[pos]
|
||||
pos += 1
|
||||
if prop_len > 0:
|
||||
# Если есть свойства, пропускаем их (упрощённо)
|
||||
pos += prop_len
|
||||
|
||||
# Всё что осталось – payload
|
||||
payload = data[pos:].decode('utf-8', errors='replace')
|
||||
return topic, payload, qos, packet_id
|
||||
|
||||
def set_callback(self, f):
|
||||
self.cb = f
|
||||
|
||||
def set_last_will(self, topic, msg, retain=False, qos=0):
|
||||
assert 0 <= qos <= 2
|
||||
assert topic
|
||||
self.lw_topic = topic
|
||||
self.lw_msg = msg
|
||||
self.lw_qos = qos
|
||||
self.lw_retain = retain
|
||||
|
||||
def connect(self, clean_session=True):
|
||||
def connect(self):
|
||||
if self.sock is not None:
|
||||
raise WSSMQTTException('Already connected')
|
||||
|
||||
self.sock = WebSocketClient.create_websocket(self.host, self.path, protocol='mqtt')
|
||||
self.sock = create_websocket(self.host, self.path, protocol='mqtt')
|
||||
if self.sock is None:
|
||||
raise WSSMQTTException('Failed to create websocket')
|
||||
|
||||
try:
|
||||
premsg = bytearray(b"\x10\0\0\0\0\0")
|
||||
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
|
||||
|
||||
sz = 10 + 2 + len(self.client_id)
|
||||
msg[6] = clean_session << 1
|
||||
if self.user:
|
||||
sz += 2 + len(self.user) + 2 + len(self.pswd)
|
||||
msg[6] |= 0xC0
|
||||
if self.keepalive:
|
||||
assert self.keepalive < 65536
|
||||
msg[7] |= self.keepalive >> 8
|
||||
msg[8] |= self.keepalive & 0x00FF
|
||||
if self.lw_topic:
|
||||
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
|
||||
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
|
||||
msg[6] |= self.lw_retain << 5
|
||||
|
||||
i = 1
|
||||
while sz > 0x7F:
|
||||
premsg[i] = (sz & 0x7F) | 0x80
|
||||
sz >>= 7
|
||||
i += 1
|
||||
premsg[i] = sz
|
||||
|
||||
self.sock.write(premsg, i + 2)
|
||||
self.sock.write(msg)
|
||||
# print(hex(len(msg)), hexlify(msg, ":"))
|
||||
self._send_str(self.client_id)
|
||||
if self.lw_topic:
|
||||
self._send_str(self.lw_topic)
|
||||
self._send_str(self.lw_msg)
|
||||
if self.user:
|
||||
self._send_str(self.user)
|
||||
self._send_str(self.pswd)
|
||||
resp = self.sock.read(4)
|
||||
assert resp[0] == 0x20 and resp[1] == 0x02
|
||||
if resp[3] != 0:
|
||||
raise WSSMQTTException(resp[3])
|
||||
return resp[2] & 1
|
||||
self.sock.write(self._build_connect())
|
||||
data = self.sock.read()
|
||||
if data and data[0] == 0x20:
|
||||
if len(data) >= 4:
|
||||
reason_code = data[3]
|
||||
if reason_code != 0:
|
||||
raise WSSMQTTException(f'Connection refused, reason code: {reason_code}')
|
||||
else:
|
||||
raise WSSMQTTException(f'Malformed CONNACK {data}')
|
||||
else:
|
||||
raise WSSMQTTException(f'Not a CONNACK received, received {data}')
|
||||
except BaseException as e:
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
@@ -183,53 +218,39 @@ class WSSMQTTClient:
|
||||
self.sock.write(b"\xc0\0")
|
||||
|
||||
def publish(self, topic, msg, retain=False, qos=0):
|
||||
pkt = bytearray(b"\x30\0\0\0")
|
||||
pkt[0] |= qos << 1 | retain
|
||||
sz = 2 + len(topic) + len(msg)
|
||||
if qos > 0:
|
||||
sz += 2
|
||||
assert sz < 2097152
|
||||
i = 1
|
||||
while sz > 0x7F:
|
||||
pkt[i] = (sz & 0x7F) | 0x80
|
||||
sz >>= 7
|
||||
i += 1
|
||||
pkt[i] = sz
|
||||
# print(hex(len(pkt)), hexlify(pkt, ":"))
|
||||
self.sock.write(pkt, i + 1)
|
||||
self._send_str(topic)
|
||||
if qos > 0:
|
||||
self.pid += 1
|
||||
pid = self.pid
|
||||
struct.pack_into("!H", pkt, 0, pid)
|
||||
self.sock.write(pkt, 2)
|
||||
self.sock.write(msg)
|
||||
if qos == 1:
|
||||
while 1:
|
||||
op = self.wait_msg()
|
||||
if op == 0x40:
|
||||
sz = self.sock.read(1)
|
||||
assert sz == b"\x02"
|
||||
rcv_pid = self.sock.read(2)
|
||||
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
|
||||
if pid == rcv_pid:
|
||||
return
|
||||
elif qos == 2:
|
||||
assert 0
|
||||
if self.sock is None:
|
||||
raise WSSMQTTException("Not connected")
|
||||
# Для QoS 0 packet_id не нужен
|
||||
flags = (qos << 1) | (1 if retain else 0)
|
||||
fixed = bytes([0x30 | flags])
|
||||
topic_bytes = topic.encode('utf-8')
|
||||
msg_bytes = msg.encode('utf-8')
|
||||
properties = b"\x00" # пустые свойства
|
||||
# Переменный заголовок: topic length + topic + properties
|
||||
variable = struct.pack("!H", len(topic_bytes)) + topic_bytes + properties
|
||||
payload = msg_bytes
|
||||
remaining = len(variable) + len(payload)
|
||||
pkt = fixed + self._encode_length(remaining) + variable + payload
|
||||
self.sock.write(pkt)
|
||||
# Для QoS 1 и 2 нужно ждать подтверждения, но пока опускаем
|
||||
|
||||
def subscribe(self, topic, qos=0):
|
||||
assert self.cb is not None, "Subscribe callback is not set"
|
||||
pkt = bytearray(b"\x82\0\0\0")
|
||||
self.pid += 1
|
||||
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
|
||||
# print(hex(len(pkt)), hexlify(pkt, ":"))
|
||||
fixed_header = b"\x82"
|
||||
topic_bytes = topic.encode('utf-8')
|
||||
properties = b"\x00"
|
||||
payload = struct.pack("!H", self._packet_id_counter) + properties
|
||||
payload += struct.pack("!H", len(topic_bytes)) + topic_bytes
|
||||
payload += bytes([qos])
|
||||
remaining_length = len(payload)
|
||||
self._packet_id_counter = (self._packet_id_counter + 1) & 0xffff
|
||||
|
||||
pkt = fixed_header + self._encode_length(remaining_length) + payload
|
||||
self.sock.write(pkt)
|
||||
self._send_str(topic)
|
||||
self.sock.write(qos.to_bytes(1, "little"))
|
||||
while 1:
|
||||
op = self.wait_msg()
|
||||
if op == 0x90:
|
||||
resp = self.sock.read(4)
|
||||
resp = self.sock.read()
|
||||
# print(resp)
|
||||
assert resp[1] == pkt[2] and resp[2] == pkt[3]
|
||||
if resp[3] == 0x80:
|
||||
@@ -241,7 +262,7 @@ class WSSMQTTClient:
|
||||
# set by .set_callback() method. Other (internal) MQTT
|
||||
# messages processed internally.
|
||||
def wait_msg(self):
|
||||
res = self.sock.read(1)
|
||||
res = self.sock.read()
|
||||
if res is None:
|
||||
return None
|
||||
if res == b"":
|
||||
|
||||
Reference in New Issue
Block a user