mirror of
https://github.com/stardrophere/InsightRadar.git
synced 2026-06-06 01:57:51 +08:00
90 lines
3.0 KiB
Python
90 lines
3.0 KiB
Python
from functools import lru_cache
|
|
import os
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from app.models.models import VerificationPurpose
|
|
from app.core.verification.email.verificationRepository import VerificationRepository
|
|
from app.core.security import generate_verification_code, hash_verification_code
|
|
from app.core.verification.email.RespositoryImpl.hybirdRepository import get_verification_repository
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# 注册验证码有效期(分钟)
|
|
REGISTER_CODE_EXPIRE_MINUTES = os.getenv("REGISTER_CODE_EXPIRE_MINUTES", 5)
|
|
|
|
# 登录验证码有效期(分钟)
|
|
LOGIN_CODE_EXPIRE_MINUTES = os.getenv("LOGIN_CODE_EXPIRE_MINUTES",5)
|
|
|
|
# 同一邮箱发送验证码的冷却间隔(秒)
|
|
CODE_SEND_COOLDOWN_SECONDS = os.getenv("CODE_SEND_COOLDOWN_SECONDS",60)
|
|
|
|
CODE_VERIFICATE_ATTEMP_SECONDS = os.getenv("CODE_VERIFICATE_ATTEMP_SECONDS", 60)
|
|
|
|
CODE_VERIFICATE_ATTEMP_COUNT = os.getenv("CODE_VERIFICATE_ATTEMP_COUNT", 10)
|
|
|
|
class CodeExpiredError(Exception):
|
|
"""code has been expired"""
|
|
pass
|
|
|
|
class CodeInvalidError(Exception):
|
|
"""code is not right"""
|
|
pass
|
|
|
|
class TooManyCodeRequestsError(Exception):
|
|
"""User request too many times when they verificate the email"""
|
|
pass
|
|
|
|
def get_ttl(purpose: VerificationPurpose)->int:
|
|
if purpose == VerificationPurpose.LOGIN:
|
|
return int(LOGIN_CODE_EXPIRE_MINUTES) * 60
|
|
else:
|
|
return int(REGISTER_CODE_EXPIRE_MINUTES) * 60
|
|
|
|
class EmailVerificationService:
|
|
|
|
def __init__(self, repo: VerificationRepository) -> None:
|
|
self.repo = repo
|
|
|
|
def _cooldown_key(self, email: str, purpose: VerificationPurpose) -> str:
|
|
return f"verification:cooldown:{purpose.value}:{email.lower()}"
|
|
|
|
def send_code(self, email: str, purpose: VerificationPurpose) -> str:
|
|
email = email.lower()
|
|
|
|
count = self.repo.incr(self._cooldown_key(email, purpose), int(CODE_SEND_COOLDOWN_SECONDS))
|
|
|
|
if count > 1:
|
|
raise TooManyCodeRequestsError("Please wait before requesting another code")
|
|
|
|
code = generate_verification_code()
|
|
code_hash = hash_verification_code(code)
|
|
|
|
self.repo.set_code(email, purpose, code_hash, get_ttl(purpose))
|
|
|
|
return code
|
|
|
|
def verify_code(self,email: str, code: str, purpose: VerificationPurpose):
|
|
email = email.lower()
|
|
key = f"verification:attempts:{purpose.value.lower()}:{email}"
|
|
code_hash = hash_verification_code(code)
|
|
|
|
attempts = self.repo.incr(key, int(CODE_VERIFICATE_ATTEMP_SECONDS))
|
|
if attempts > int(CODE_VERIFICATE_ATTEMP_COUNT):
|
|
raise TooManyCodeRequestsError("Too many attempts")
|
|
|
|
stored = self.repo.compare_and_consume(email, purpose, code_hash)
|
|
|
|
if stored == False:
|
|
raise CodeInvalidError("Invalid code")
|
|
|
|
if not stored:
|
|
raise CodeExpiredError("Code expired or not found")
|
|
|
|
return True
|
|
|
|
@lru_cache
|
|
def get_verification_service():
|
|
repo = get_verification_repository()
|
|
return EmailVerificationService(repo)
|