Files
InsightRadar/backend/app/core/verification/email/verificationService.py
T
2026-03-26 02:12:29 +08:00

90 lines
3.0 KiB
Python

from functools import lru_cache
import os
import logging
from typing import Optional
from app.models.models import VerificationPurpose
from app.core.verification.email.verificationRepository import VerificationRepository
from app.core.security import generate_verification_code, hash_verification_code
from app.core.verification.email.RespositoryImpl.hybirdRepository import get_verification_repository
logger = logging.getLogger(__name__)
# 注册验证码有效期(分钟)
REGISTER_CODE_EXPIRE_MINUTES = os.getenv("REGISTER_CODE_EXPIRE_MINUTES", 5)
# 登录验证码有效期(分钟)
LOGIN_CODE_EXPIRE_MINUTES = os.getenv("LOGIN_CODE_EXPIRE_MINUTES",5)
# 同一邮箱发送验证码的冷却间隔(秒)
CODE_SEND_COOLDOWN_SECONDS = os.getenv("CODE_SEND_COOLDOWN_SECONDS",60)
CODE_VERIFICATE_ATTEMP_SECONDS = os.getenv("CODE_VERIFICATE_ATTEMP_SECONDS", 60)
CODE_VERIFICATE_ATTEMP_COUNT = os.getenv("CODE_VERIFICATE_ATTEMP_COUNT", 10)
class CodeExpiredError(Exception):
"""code has been expired"""
pass
class CodeInvalidError(Exception):
"""code is not right"""
pass
class TooManyCodeRequestsError(Exception):
"""User request too many times when they verificate the email"""
pass
def get_ttl(purpose: VerificationPurpose)->int:
if purpose == VerificationPurpose.LOGIN:
return int(LOGIN_CODE_EXPIRE_MINUTES) * 60
else:
return int(REGISTER_CODE_EXPIRE_MINUTES) * 60
class EmailVerificationService:
def __init__(self, repo: VerificationRepository) -> None:
self.repo = repo
def _cooldown_key(self, email: str, purpose: VerificationPurpose) -> str:
return f"verification:cooldown:{purpose.value}:{email.lower()}"
def send_code(self, email: str, purpose: VerificationPurpose) -> str:
email = email.lower()
count = self.repo.incr(self._cooldown_key(email, purpose), int(CODE_SEND_COOLDOWN_SECONDS))
if count > 1:
raise TooManyCodeRequestsError("Please wait before requesting another code")
code = generate_verification_code()
code_hash = hash_verification_code(code)
self.repo.set_code(email, purpose, code_hash, get_ttl(purpose))
return code
def verify_code(self,email: str, code: str, purpose: VerificationPurpose):
email = email.lower()
key = f"verification:attempts:{purpose.value.lower()}:{email}"
code_hash = hash_verification_code(code)
attempts = self.repo.incr(key, int(CODE_VERIFICATE_ATTEMP_SECONDS))
if attempts > int(CODE_VERIFICATE_ATTEMP_COUNT):
raise TooManyCodeRequestsError("Too many attempts")
stored = self.repo.compare_and_consume(email, purpose, code_hash)
if stored == False:
raise CodeInvalidError("Invalid code")
if not stored:
raise CodeExpiredError("Code expired or not found")
return True
@lru_cache
def get_verification_service():
repo = get_verification_repository()
return EmailVerificationService(repo)