183 lines
8.0 KiB
Python
183 lines
8.0 KiB
Python
from django.db import models
|
||
from django.contrib.auth.models import PermissionsMixin, AbstractBaseUser, BaseUserManager
|
||
from django.core.validators import *
|
||
from arka.settings import PHONE_VERIFICATION_ENABLE, PHONE_VERIFICATION_ATTEMPTS,\
|
||
PHONE_VERIFICATION_APP_ID, PHONE_VERIFICATION_RESEND_TIME_SECS
|
||
|
||
from threading import Thread, Lock, Event
|
||
import random
|
||
import traceback
|
||
from datetime import datetime
|
||
import requests
|
||
|
||
|
||
class PhoneVerificationService:
|
||
__lock = Lock()
|
||
__event = Event()
|
||
|
||
# номера, для которых отправлен запрос верификации (на внешний ресурс)
|
||
# имеет структуру:
|
||
# "{phone}": {"code": None|int|"FAILED", "attempts": int}
|
||
# None в code означает что ответ еще не пришел, остальное вроде понятно
|
||
# attempts - сколько попыток верификации осталось (по умолчанию 5)
|
||
__codes = {}
|
||
|
||
# очередь номеров, которые требуют верификации
|
||
__to_verify = []
|
||
__instance = None
|
||
|
||
@staticmethod
|
||
def __service_run():
|
||
while True:
|
||
try:
|
||
PhoneVerificationService.__event.wait()
|
||
phones = None
|
||
with PhoneVerificationService.__lock:
|
||
if PhoneVerificationService.__event.is_set():
|
||
PhoneVerificationService.__event.clear()
|
||
phones = PhoneVerificationService.__to_verify.copy()
|
||
PhoneVerificationService.__to_verify.clear()
|
||
|
||
if phones is not None:
|
||
for phone in phones:
|
||
# тут должна быть проверка, есть ли телефон в списке кодов
|
||
|
||
obj = {
|
||
"code": None,
|
||
"attempts": PHONE_VERIFICATION_ATTEMPTS,
|
||
"time": datetime.now()
|
||
}
|
||
if PHONE_VERIFICATION_ENABLE:
|
||
with PhoneVerificationService.__lock:
|
||
PhoneVerificationService.__codes[phone] = obj
|
||
|
||
request_success = False
|
||
try:
|
||
params = {
|
||
"phone": phone,
|
||
"ip": -1,
|
||
"api_id": PHONE_VERIFICATION_APP_ID
|
||
}
|
||
res = requests.get("https://sms.ru/code/call", params=params, timeout=5)
|
||
res_json = res.json()
|
||
request_success = True
|
||
print(res.content)
|
||
if res_json["status"] == "OK":
|
||
with PhoneVerificationService.__lock:
|
||
PhoneVerificationService.__codes[phone]["code"] = res_json["code"]
|
||
print(f"Verify code for {phone}: {res_json['code']}")
|
||
else:
|
||
with PhoneVerificationService.__lock:
|
||
PhoneVerificationService.__codes[phone]["code"] = "FAILED"
|
||
except:
|
||
if not request_success:
|
||
with PhoneVerificationService.__lock:
|
||
PhoneVerificationService.__codes[phone]["code"] = "FAILED"
|
||
traceback.print_exc()
|
||
else:
|
||
with PhoneVerificationService.__lock:
|
||
obj["code"] = random.randint(1000, 9999)
|
||
PhoneVerificationService.__codes[phone] = obj
|
||
print(f"Verify code for {phone}: {obj['code']}")
|
||
except:
|
||
traceback.print_exc()
|
||
|
||
@staticmethod
|
||
def create():
|
||
if PhoneVerificationService.__instance is None:
|
||
PhoneVerificationService.__instance = Thread(target=PhoneVerificationService.__service_run, daemon=True)
|
||
PhoneVerificationService.__instance.start()
|
||
|
||
CHECK_PHONE_NOT_FOUND = "not-found"
|
||
CHECK_PHONE_NOT_READY = "not-ready"
|
||
CHECK_PHONE_FAILED = "failed"
|
||
CHECK_PHONE_INVALID_CODE = "invalid"
|
||
CHECK_PHONE_MAX_ATTEMPTS = "max-attempts"
|
||
CHECK_PHONE_RESEND_LIMIT = "resend"
|
||
|
||
@staticmethod
|
||
def check_code(phone: str, code: int):
|
||
with PhoneVerificationService.__lock:
|
||
if phone not in PhoneVerificationService.__codes:
|
||
return False, PhoneVerificationService.CHECK_PHONE_NOT_FOUND
|
||
c = PhoneVerificationService.__codes[phone]
|
||
print(f"verify struct: {c}")
|
||
|
||
if c["code"] is None:
|
||
print(f"[PhoneVerificationService] for phone {phone} code not received yet")
|
||
return False, PhoneVerificationService.CHECK_PHONE_NOT_READY
|
||
|
||
if c["code"] == "FAILED":
|
||
PhoneVerificationService.__codes.pop(phone)
|
||
return False, PhoneVerificationService.CHECK_PHONE_FAILED
|
||
|
||
if c["attempts"] > 0:
|
||
if c["code"] == code:
|
||
PhoneVerificationService.__codes.pop(phone)
|
||
return True, None
|
||
|
||
else:
|
||
print(f"invalid code! attempts: {c['attempts']}")
|
||
PhoneVerificationService.__codes[phone]["attempts"] -= 1
|
||
return False, PhoneVerificationService.CHECK_PHONE_INVALID_CODE
|
||
else:
|
||
return False, PhoneVerificationService.CHECK_PHONE_MAX_ATTEMPTS
|
||
|
||
@staticmethod
|
||
def send_verify(phone: str):
|
||
with PhoneVerificationService.__lock:
|
||
if phone in PhoneVerificationService.__codes:
|
||
c = PhoneVerificationService.__codes[phone]
|
||
if (datetime.now() - c["time"]).total_seconds() <= PHONE_VERIFICATION_RESEND_TIME_SECS:
|
||
return False, PhoneVerificationService.CHECK_PHONE_RESEND_LIMIT
|
||
|
||
PhoneVerificationService.__to_verify.append(phone)
|
||
PhoneVerificationService.__event.set()
|
||
return True, None
|
||
|
||
|
||
class SiteAccountManager(BaseUserManager):
|
||
def create_user(self, email, name, surname, phone, password):
|
||
user = self.model(email=email, name=name, surname=surname, phone=phone, password=password)
|
||
user.set_password(password)
|
||
user.is_staff = False
|
||
user.is_superuser = False
|
||
user.is_phone_verified = False
|
||
user.save(using=self._db)
|
||
return user
|
||
|
||
def create_superuser(self, email, name, surname, phone, password):
|
||
user = self.create_user(email=email, name=name, surname=surname, phone=phone, password=password)
|
||
user.is_active = True
|
||
user.is_staff = True
|
||
user.is_superuser = True
|
||
user.save(using=self._db)
|
||
return user
|
||
|
||
def get_by_natural_key(self, email_):
|
||
return self.get(email=email_)
|
||
|
||
|
||
class SiteUser(AbstractBaseUser, PermissionsMixin):
|
||
surname = models.CharField(max_length=60, verbose_name="Фамилия")
|
||
name = models.CharField(max_length=60, verbose_name="Имя")
|
||
email = models.EmailField(unique=True, verbose_name="Email")
|
||
phone = models.CharField(unique=True, max_length=16, verbose_name="Телефон", validators=[
|
||
RegexValidator(regex="^\\+7[0-9]{10}$"),
|
||
])
|
||
is_staff = models.BooleanField(default=False, verbose_name="Разрешение на вход в админку")
|
||
is_phone_verified = models.BooleanField(default=False, verbose_name="Телефон верифицирован")
|
||
REQUIRED_FIELDS = ['name', 'surname', 'phone']
|
||
USERNAME_FIELD = 'email'
|
||
|
||
objects = SiteAccountManager()
|
||
|
||
def get_short_name(self):
|
||
return self.email
|
||
|
||
def natural_key(self):
|
||
return self.email
|
||
|
||
def __str__(self):
|
||
return self.email
|