import machine import time from machine import Pin, PWM _U16_MAX = 0xFFFF def _int_to_servo_duty(val): val = max(0, min(1000, val)) val += 1000 # все в микросекундах, получается диапазон [1000...2000] # переводим в нужные числа val = int(float(val) / 0.305180438) return val def make_int_interpolate(left_min, left_max, right_min, right_max): # Figure out how 'wide' each range is leftSpan = left_max - left_min rightSpan = right_max - right_min # Compute the scale factor between left and right values scaleFactor = float(rightSpan) / float(leftSpan) # create interpolation function using pre-calculated scaleFactor def interp_fn(value): val = int(right_min + (value - left_min) * scaleFactor) if val < right_min: val = right_min elif val > right_max: val = right_max return val return interp_fn class BaseInterface: def __init__(self, name): self._name = name def get_topics(self): return (self._name, ) def process_message(self, topic, value): print(f"Process topic {topic} with '{value}' for {self}") def __str__(self): return f"<{self.__class__.__name__} {self._name}>" class PwmInterface(BaseInterface): def __init__(self, name, pin, freq=2000, power_min=0, power_max=255, pwm_min=0, pwm_max=_U16_MAX): super().__init__(name) self._pin = pin self._pwm = PWM(self._pin, freq=freq, duty_u16=0) self._pwm_min = pwm_min self._pwm_max = pwm_max self._interpolator = make_int_interpolate(power_min, power_max, pwm_min, pwm_max) self._power = pwm_min self._powered = False self._pwm.duty_u16(self._power) self._topic_cmd = f"{self._name}/cmd" self._topic_power = f"{self._name}/power" def get_topics(self): return self._topic_power, self._topic_cmd def process_message(self, topic, value): print(f"{self}: process topic {topic} with {value}") if topic == self._topic_cmd: if value == 'ON': self._pwm.duty_u16(self._interpolator(self._power)) self._powered = True else: self._pwm.duty_u16(self._pwm_min) self._powered = False else: self._power = int(value) if self._powered: self._pwm.duty_u16(self._interpolator(self._power)) class ServoInterface(PwmInterface): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs, freq=50, pwm_min=3276, pwm_max=6553) # инициализация ESC self._pwm.duty_u16(self._pwm_min) time.sleep(0.2) self._pwm.duty_u16(self._pwm_max) time.sleep(0.2) self._pwm.duty_u16(self._power) _interfaces = [ PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n0", Pin(13)), PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n1", Pin(12)), PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n2", Pin(14)), PwmInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm0n3", Pin(27)), ServoInterface(f"/dorm/66/esp32-{machine.unique_id().hex()}/pwm1n0", Pin(32)), ] def list_topics(): out = [] for i in _interfaces: for t in i.get_topics(): out.append(t) return out def init(): print(f"Interface names: {[i for i in list_topics()]}") def process_message(topic, message): topic = topic.decode('utf-8') message = message.decode('utf-8') print(f'Message received: topic={topic}, message={message}') for i in _interfaces: if topic in i.get_topics(): i.process_message(topic, message)