from django.db import models from django.contrib.auth.models import PermissionsMixin, AbstractBaseUser, BaseUserManager from django.core.validators import * from arka.settings import PHONE_VERIFICATION_ENABLE, PHONE_VERIFICATION_ATTEMPTS,\ PHONE_VERIFICATION_APP_ID, PHONE_VERIFICATION_RESEND_TIME_SECS from threading import Thread, Lock, Event import random import traceback from datetime import datetime import requests import re class PhoneVerificationService: __lock = Lock() __event = Event() # номера, для которых отправлен запрос верификации (на внешний ресурс) # имеет структуру: # "{phone}": {"code": None|int|"FAILED", "attempts": int} # None в code означает что ответ еще не пришел, остальное вроде понятно # attempts - сколько попыток верификации осталось (по умолчанию 5) __codes = {} # очередь номеров, которые требуют верификации __to_verify = [] __instance = None @staticmethod def __service_run(): while True: try: PhoneVerificationService.__event.wait() phones = None with PhoneVerificationService.__lock: if PhoneVerificationService.__event.is_set(): PhoneVerificationService.__event.clear() phones = PhoneVerificationService.__to_verify.copy() PhoneVerificationService.__to_verify.clear() if phones is not None: for phone in phones: # тут должна быть проверка, есть ли телефон в списке кодов obj = { "code": None, "attempts": PHONE_VERIFICATION_ATTEMPTS, "time": datetime.now() } if PHONE_VERIFICATION_ENABLE: with PhoneVerificationService.__lock: PhoneVerificationService.__codes[phone] = obj request_success = False try: params = { "phone": lambda: phone[1:] if phone.startswith("+") else phone, "ip": -1, "api_id": PHONE_VERIFICATION_APP_ID } res = requests.get("https://sms.ru/code/call", params=params, timeout=5) res_json = res.json() request_success = True print(res.content) if res_json["status"] == "OK": with PhoneVerificationService.__lock: PhoneVerificationService.__codes[phone]["code"] = res_json["code"] print(f"Verify code for {phone}: {res_json['code']}") else: with PhoneVerificationService.__lock: PhoneVerificationService.__codes[phone]["code"] = "FAILED" except: if not request_success: with PhoneVerificationService.__lock: PhoneVerificationService.__codes[phone]["code"] = "FAILED" traceback.print_exc() else: with PhoneVerificationService.__lock: obj["code"] = random.randint(1000, 9999) PhoneVerificationService.__codes[phone] = obj print(f"Verify code for {phone}: {obj['code']}") except: traceback.print_exc() @staticmethod def create(): if PhoneVerificationService.__instance is None: PhoneVerificationService.__instance = Thread(target=PhoneVerificationService.__service_run, daemon=True) PhoneVerificationService.__instance.start() CHECK_PHONE_NOT_FOUND = "not-found" CHECK_PHONE_NOT_READY = "not-ready" CHECK_PHONE_FAILED = "failed" CHECK_PHONE_INVALID_CODE = "invalid" CHECK_PHONE_MAX_ATTEMPTS = "max-attempts" CHECK_PHONE_RESEND_LIMIT = "resend" @staticmethod def check_code(phone: str, code: int): with PhoneVerificationService.__lock: if phone not in PhoneVerificationService.__codes: return False, PhoneVerificationService.CHECK_PHONE_NOT_FOUND c = PhoneVerificationService.__codes[phone] print(f"verify struct: {c}") if c["code"] is None: print(f"[PhoneVerificationService] for phone {phone} code not received yet") return False, PhoneVerificationService.CHECK_PHONE_NOT_READY if c["code"] == "FAILED": PhoneVerificationService.__codes.pop(phone) return False, PhoneVerificationService.CHECK_PHONE_FAILED if c["attempts"] > 0: if c["code"] == code: PhoneVerificationService.__codes.pop(phone) return True, None else: print(f"invalid code! attempts: {c['attempts']}") PhoneVerificationService.__codes[phone]["attempts"] -= 1 return False, PhoneVerificationService.CHECK_PHONE_INVALID_CODE else: return False, PhoneVerificationService.CHECK_PHONE_MAX_ATTEMPTS @staticmethod def send_verify(phone: str): with PhoneVerificationService.__lock: if phone in PhoneVerificationService.__codes: c = PhoneVerificationService.__codes[phone] if (datetime.now() - c["time"]).total_seconds() <= PHONE_VERIFICATION_RESEND_TIME_SECS: return False, PhoneVerificationService.CHECK_PHONE_RESEND_LIMIT PhoneVerificationService.__to_verify.append(phone) PhoneVerificationService.__event.set() return True, None class SiteAccountManager(BaseUserManager): def create_user(self, email, name, surname, phone, password): user = self.model(email=email, name=name, surname=surname, phone=phone, password=password) user.set_password(password) user.is_staff = False user.is_superuser = False user.is_phone_verified = False user.save(using=self._db) return user def create_superuser(self, email, name, surname, phone, password): user = self.create_user(email=email, name=name, surname=surname, phone=phone, password=password) user.is_active = True user.is_staff = True user.is_superuser = True user.save(using=self._db) return user def get_by_natural_key(self, phone_): return self.model.get_by_natural_key(phone_) class SiteUser(AbstractBaseUser, PermissionsMixin): surname = models.CharField(max_length=60, verbose_name="Фамилия") name = models.CharField(max_length=60, verbose_name="Имя") email = models.EmailField(unique=True, verbose_name="Email") phone = models.CharField(unique=True, max_length=16, verbose_name="Телефон", validators=[ RegexValidator(regex="^\\+7[0-9]{10}$"), ]) is_staff = models.BooleanField(default=False, verbose_name="Разрешение на вход в админку") is_phone_verified = models.BooleanField(default=False, verbose_name="Телефон верифицирован") register_datetime = models.DateTimeField(default=datetime.now, editable=False) REQUIRED_FIELDS = ['name', 'surname', 'email'] USERNAME_FIELD = 'phone' objects = SiteAccountManager() def get_short_name(self): return self.email def natural_key(self): return self.phone @staticmethod def get_by_natural_key(key): # Гоша попросил запилить фичу, чтобы принимались номера: # +79991112233 # 79991112233 # 9991112233 if re.match("^[0-9]{10}$", key) is not None: key = f"+7{key}" elif re.match("^7[0-9]{10}$", key) is not None: key = f"+{key}" return SiteUser.objects.get(phone=key) def __str__(self): return f"{self.name} {self.surname}: {self.phone} ({self.email})"