diff --git a/backend/app/api/endpoints/auth.py b/backend/app/api/endpoints/auth.py index 8895a1c..e09d9a9 100644 --- a/backend/app/api/endpoints/auth.py +++ b/backend/app/api/endpoints/auth.py @@ -4,6 +4,7 @@ import json import math import os +import logging from datetime import timedelta, timezone from typing import Optional, Tuple @@ -19,7 +20,7 @@ from app.core.security import ( verify_password, verify_verification_code, ) -from app.models.models import AppUser, EmailVerificationCode, VerificationPurpose, utcnow +from app.models.models import AppUser, VerificationPurpose, utcnow from app.schemas.auth_schema import ( AuthTokenResponse, LoginCodeSendRequest, @@ -32,9 +33,11 @@ from app.schemas.auth_schema import ( ) from app.utils.email_utils import send_html_email from app.utils.redis_client import get_redis_client +from app.core.verification.email.verificationService import EmailVerificationService, get_verification_service, TooManyCodeRequestsError, CodeExpiredError, CodeInvalidError router = APIRouter() +logger = logging.getLogger(__name__) DEFAULT_REGISTER_CODE_EXPIRE_MINUTES = 10 DEFAULT_LOGIN_CODE_EXPIRE_MINUTES = 10 @@ -131,19 +134,12 @@ def _cache_code_in_redis( "code_hash": code_hash, "created_at": utcnow().isoformat(), } - try: - client.set( - _redis_code_key(email, purpose), - json.dumps(payload), - ex=max(1, expire_minutes * 60), - ) - except Exception as e: - if _is_redis_only(): - # If redis fails but we're in redis_only, don't crash here. - # We already generated the code hash, but we won't cache it in redis. - # However, since code_record handling in the caller already fell back to DB - # if _require_redis_for_codes() failed, we should just let it pass. - pass + + client.set( + _redis_code_key(email, purpose), + json.dumps(payload), + ex=max(1, expire_minutes * 60), + ) def _set_send_cooldown_in_redis(email: str, purpose: VerificationPurpose) -> None: @@ -178,166 +174,6 @@ def _clear_code_in_redis(email: str, purpose: VerificationPurpose) -> None: pass -def _verify_code_with_redis( - email: str, - purpose: VerificationPurpose, - code: str, - *, - strict: bool = False, -) -> Optional[bool]: - """ - Redis 验证码校验。 - 返回: - - True: 校验成功,且已消费验证码 - - False: Redis 有验证码但校验失败 - - None: Redis 不可用或无记录,调用方可按策略回退数据库 - """ - client = _get_redis_for_codes() - if client is None: - if strict: - pass # allow fallback - return None - - try: - raw = client.get(_redis_code_key(email, purpose)) - except Exception as e: - if strict: - pass # fallthrough to let it try db instead of crashing - return None - - if not raw: - return None - - try: - payload = json.loads(raw) - expected_hash = str(payload.get("code_hash", "")) - except Exception: - # 不要轻易清除,可能是数据格式异常 - return None - - if not expected_hash: - return None - - if not verify_verification_code(code, expected_hash): - # 注意:校验失败时不要直接清空 Redis,可能用户只是输错了 - return False - - _clear_code_in_redis(email, purpose) - return True - - -def _invalidate_unused_codes(db: Session, email: str, purpose: VerificationPurpose) -> None: - """将同一邮箱、同一用途下未使用的旧验证码全部标记为已使用,避免重复使用""" - db.query(EmailVerificationCode).filter( - EmailVerificationCode.email == email, - EmailVerificationCode.purpose == purpose, - EmailVerificationCode.is_used.is_(False), - ).update({EmailVerificationCode.is_used: True}, synchronize_session=False) - db.commit() - - -def _create_code_record( - db: Session, - *, - email: str, - purpose: VerificationPurpose, - expire_minutes: int, -) -> Tuple[EmailVerificationCode, str]: - """在数据库中创建验证码记录,返回 (记录对象, 明文验证码)""" - code = generate_verification_code() - now = utcnow() - code_record = EmailVerificationCode( - email=email, - purpose=purpose, - code_hash=hash_verification_code(code), - expires_at=now + timedelta(minutes=expire_minutes), - ) - db.add(code_record) - db.commit() - return code_record, code - - -def _get_latest_valid_code_record( - db: Session, - *, - email: str, - purpose: VerificationPurpose, -): - """从数据库获取该邮箱该用途下最新且未过期、未使用的验证码记录""" - now = utcnow() - return ( - db.query(EmailVerificationCode) - .filter( - EmailVerificationCode.email == email, - EmailVerificationCode.purpose == purpose, - EmailVerificationCode.is_used.is_(False), - EmailVerificationCode.expires_at >= now, - ) - .order_by(EmailVerificationCode.created_at.desc()) - .first() - ) - - -def _enforce_code_send_cooldown(db: Session, email: str, purpose: VerificationPurpose) -> None: - """限制同一邮箱同一用途验证码的发送频率。""" - if CODE_SEND_COOLDOWN_SECONDS <= 0: - return - - client = _get_redis_for_codes() - if client is not None: - try: - ttl = client.ttl(_redis_cooldown_key(email, purpose)) - if ttl is not None and ttl > 0: - raise HTTPException( - status_code=status.HTTP_429_TOO_MANY_REQUESTS, - detail=f"Please wait {ttl}s before requesting another verification code", - headers={"Retry-After": str(ttl)}, - ) - if _is_redis_only(): - return - except HTTPException: - raise - except Exception: - # redis failed during cooldown check, fallback to DB - pass - - if _is_redis_only(): - # Even if redis_only, we allow it to fallthrough if it's down. - # This aligns with our fallback logic. - try: - _require_redis_for_codes() - return - except HTTPException: - pass # fallback to db check - - latest_record = ( - db.query(EmailVerificationCode) - .filter( - EmailVerificationCode.email == email, - EmailVerificationCode.purpose == purpose, - ) - .order_by(EmailVerificationCode.created_at.desc()) - .first() - ) - if not latest_record: - return - - now = utcnow() - record_time = latest_record.created_at - if record_time.tzinfo is None: - record_time = record_time.replace(tzinfo=timezone.utc) - elapsed_seconds = (now - record_time).total_seconds() - if elapsed_seconds >= CODE_SEND_COOLDOWN_SECONDS: - return - - retry_after_seconds = max(1, math.ceil(CODE_SEND_COOLDOWN_SECONDS - elapsed_seconds)) - raise HTTPException( - status_code=status.HTTP_429_TOO_MANY_REQUESTS, - detail=f"Please wait {retry_after_seconds}s before requesting another verification code", - headers={"Retry-After": str(retry_after_seconds)}, - ) - - def _build_auth_response(user: AppUser) -> AuthTokenResponse: token, expires_in = create_access_token(user_id=user.id, email=user.email) return AuthTokenResponse( @@ -348,219 +184,115 @@ def _build_auth_response(user: AppUser) -> AuthTokenResponse: @router.post("/register/send-code", response_model=MessageResponse) -async def send_register_code(payload: RegisterCodeSendRequest, db: Session = Depends(get_db)): - """发送注册验证码:先校验邮箱未注册、冷却期,再生成并发送""" +async def send_register_code( + payload: RegisterCodeSendRequest, + db: Session = Depends(get_db), + service: EmailVerificationService = Depends(get_verification_service), +): email = _normalize_email(payload.email) existing_user = db.query(AppUser).filter(AppUser.email == email).first() if existing_user: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email is already registered") - - _enforce_code_send_cooldown(db, email, VerificationPurpose.REGISTER) - - code_record = None - if _is_redis_only(): - try: - _require_redis_for_codes() - code = generate_verification_code() - code_hash = hash_verification_code(code) - except HTTPException: - # If redis is down, temporarily fallback to DB even in redis_only mode - _invalidate_unused_codes(db, email, VerificationPurpose.REGISTER) - code_record, code = _create_code_record( - db, - email=email, - purpose=VerificationPurpose.REGISTER, - expire_minutes=REGISTER_CODE_EXPIRE_MINUTES, - ) - code_hash = code_record.code_hash - else: - _invalidate_unused_codes(db, email, VerificationPurpose.REGISTER) - code_record, code = _create_code_record( - db, - email=email, - purpose=VerificationPurpose.REGISTER, - expire_minutes=REGISTER_CODE_EXPIRE_MINUTES, + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Email is already registered", ) - code_hash = code_record.code_hash - - _cache_code_in_redis( - email=email, - purpose=VerificationPurpose.REGISTER, - code_hash=code_hash, - expire_minutes=REGISTER_CODE_EXPIRE_MINUTES, - ) - _set_send_cooldown_in_redis(email, VerificationPurpose.REGISTER) try: - email_sent = await send_html_email( + code = service.send_code(email, VerificationPurpose.REGISTER) + + await send_html_email( to_email=email, subject=f"【{code}】InsightRadar 注册验证码", - html_content=_build_verification_email(code, "注册", REGISTER_CODE_EXPIRE_MINUTES), + html_content=_build_verification_email( + code, "注册", REGISTER_CODE_EXPIRE_MINUTES + ), ) + + except TooManyCodeRequestsError as e: + raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=str(e)) + except Exception as e: - _clear_code_in_redis(email, VerificationPurpose.REGISTER) - # also clear cooldown if possible, so user can retry immediately - client = _get_redis_for_codes() - if client: - try: - client.delete(_redis_cooldown_key(email, VerificationPurpose.REGISTER)) - except Exception: - pass raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + status_code=500, detail=f"Failed to send verification code: {e}", ) - if not email_sent: - _clear_code_in_redis(email, VerificationPurpose.REGISTER) - client = _get_redis_for_codes() - if client: - try: - client.delete(_redis_cooldown_key(email, VerificationPurpose.REGISTER)) - except Exception: - pass - if code_record is not None: - code_record.is_used = True - db.add(code_record) - db.commit() - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to send verification code", - ) return MessageResponse(message="Verification code sent") @router.post("/login/send-code", response_model=MessageResponse) -async def send_login_code(payload: LoginCodeSendRequest, db: Session = Depends(get_db)): - """发送登录验证码:仅对已注册用户发送""" +async def send_login_code( + payload: LoginCodeSendRequest, + db: Session = Depends(get_db), + service: EmailVerificationService = Depends(get_verification_service), +): email = _normalize_email(payload.email) + user = db.query(AppUser).filter(AppUser.email == email).first() - if not user: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Email is not registered") - - _enforce_code_send_cooldown(db, email, VerificationPurpose.LOGIN) - - code_record = None - if _is_redis_only(): - try: - _require_redis_for_codes() - code = generate_verification_code() - code_hash = hash_verification_code(code) - except HTTPException: - # If redis is down, temporarily fallback to DB even in redis_only mode - _invalidate_unused_codes(db, email, VerificationPurpose.LOGIN) - code_record, code = _create_code_record( - db, - email=email, - purpose=VerificationPurpose.LOGIN, - expire_minutes=LOGIN_CODE_EXPIRE_MINUTES, - ) - code_hash = code_record.code_hash - else: - _invalidate_unused_codes(db, email, VerificationPurpose.LOGIN) - code_record, code = _create_code_record( - db, - email=email, - purpose=VerificationPurpose.LOGIN, - expire_minutes=LOGIN_CODE_EXPIRE_MINUTES, + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Email is not registered", ) - code_hash = code_record.code_hash - - _cache_code_in_redis( - email=email, - purpose=VerificationPurpose.LOGIN, - code_hash=code_hash, - expire_minutes=LOGIN_CODE_EXPIRE_MINUTES, - ) - _set_send_cooldown_in_redis(email, VerificationPurpose.LOGIN) try: - email_sent = await send_html_email( + code = service.send_code(email, VerificationPurpose.LOGIN) + + await send_html_email( to_email=email, subject=f"【{code}】InsightRadar 登录验证码", - html_content=_build_verification_email(code, "登录", LOGIN_CODE_EXPIRE_MINUTES), + html_content=_build_verification_email( + code, "登录", LOGIN_CODE_EXPIRE_MINUTES + ), ) + + except TooManyCodeRequestsError as e: + raise HTTPException(status_code=429, detail=str(e)) + except Exception as e: - _clear_code_in_redis(email, VerificationPurpose.LOGIN) - client = _get_redis_for_codes() - if client: - try: - client.delete(_redis_cooldown_key(email, VerificationPurpose.LOGIN)) - except Exception: - pass raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + status_code=500, detail=f"Failed to send verification code: {e}", ) - if not email_sent: - _clear_code_in_redis(email, VerificationPurpose.LOGIN) - client = _get_redis_for_codes() - if client: - try: - client.delete(_redis_cooldown_key(email, VerificationPurpose.LOGIN)) - except Exception: - pass - if code_record is not None: - code_record.is_used = True - db.add(code_record) - db.commit() - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to send verification code", - ) return MessageResponse(message="Verification code sent") - @router.post( "/register", response_model=AuthTokenResponse, status_code=status.HTTP_201_CREATED, ) -async def register(payload: RegisterRequest, db: Session = Depends(get_db)): - """用户注册:校验验证码(Redis 优先,失败则回退数据库)后创建用户""" +async def register( + payload: RegisterRequest, + db: Session = Depends(get_db), + service: EmailVerificationService = Depends(get_verification_service), +): email = _normalize_email(payload.email) + existing_user = db.query(AppUser).filter(AppUser.email == email).first() if existing_user: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email is already registered") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Email is already registered", + ) - redis_result = _verify_code_with_redis( - email, - VerificationPurpose.REGISTER, - payload.verification_code, - strict=False, # Never be strict so we can fallback to DB if redis is down - ) - code_record = None - - if redis_result is False: + try: + service.verify_code( + email=email, + purpose=VerificationPurpose.REGISTER, + code=payload.verification_code, + ) + except CodeExpiredError: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code expired") + except CodeInvalidError: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code") - - if redis_result is None: - # 即使在 _is_redis_only() 模式下,也去数据库兜底查找 - # 这样如果Redis挂了时代码回退到了DB,验证时也能从DB拿出来。 - code_record = _get_latest_valid_code_record( - db, - email=email, - purpose=VerificationPurpose.REGISTER, - ) - if not code_record: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code does not exist or expired") - - if not verify_verification_code(payload.verification_code, code_record.code_hash): - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code") - else: - # Redis 成功时尽量同步消费 DB 里的最新验证码,保持一致性。 - # 即使在 _is_redis_only(),如果先前发生了降级,这里也顺手清理掉。 - code_record = _get_latest_valid_code_record( - db, - email=email, - purpose=VerificationPurpose.REGISTER, - ) + except TooManyCodeRequestsError: + raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many attempts") now = utcnow() nickname = payload.nickname or email.split("@")[0] + user = AppUser( email=email, password_hash=hash_password(payload.password), @@ -569,16 +301,11 @@ async def register(payload: RegisterRequest, db: Session = Depends(get_db)): ) db.add(user) - if code_record is not None: - code_record.is_used = True - db.add(code_record) - db.commit() db.refresh(user) return _build_auth_response(user) - @router.post("/login", response_model=AuthTokenResponse) async def login(payload: LoginRequest, db: Session = Depends(get_db)): """密码登录""" @@ -595,49 +322,40 @@ async def login(payload: LoginRequest, db: Session = Depends(get_db)): @router.post("/login/code", response_model=AuthTokenResponse) -async def login_with_code(payload: LoginWithCodeRequest, db: Session = Depends(get_db)): - """验证码登录:Redis 校验优先,失败则从数据库兜底""" +async def login_with_code( + payload: LoginWithCodeRequest, + db: Session = Depends(get_db), + service: EmailVerificationService = Depends(get_verification_service), +): email = _normalize_email(payload.email) + user = db.query(AppUser).filter(AppUser.email == email).first() - if not user: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code") - - redis_result = _verify_code_with_redis( - email, - VerificationPurpose.LOGIN, - payload.verification_code, - strict=False, # Never be strict so we can fallback to DB if redis is down - ) - code_record = None - - if redis_result is False: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code") - - if redis_result is None: - code_record = _get_latest_valid_code_record( - db, - email=email, - purpose=VerificationPurpose.LOGIN, - ) - if not code_record: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code does not exist or expired") - - if not verify_verification_code(payload.verification_code, code_record.code_hash): - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code") - else: - # Redis 成功时尽量同步消费 DB 里的最新验证码,保持一致性。 - # 即使在 _is_redis_only(),如果先前发生了降级,这里也顺手清理掉。 - code_record = _get_latest_valid_code_record( - db, - email=email, - purpose=VerificationPurpose.LOGIN, + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid email or verification code", ) - if code_record is not None: - code_record.is_used = True - db.add(code_record) - - db.commit() + try: + service.verify_code( + email=email, + purpose=VerificationPurpose.LOGIN, + code=payload.verification_code, + ) + except CodeExpiredError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid email or verification code", + ) + except CodeInvalidError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid email or verification code", + ) + except TooManyCodeRequestsError: + raise HTTPException( + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + detail="Too many attempts", + ) return _build_auth_response(user) diff --git a/backend/app/core/verification/__init__.py b/backend/app/core/verification/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/core/verification/email/RespositoryImpl/MemoryRepository.py b/backend/app/core/verification/email/RespositoryImpl/MemoryRepository.py new file mode 100644 index 0000000..211a9fa --- /dev/null +++ b/backend/app/core/verification/email/RespositoryImpl/MemoryRepository.py @@ -0,0 +1,71 @@ +# app/verification/backends/memory.py + +from functools import lru_cache +import time +import json +import threading +from typing import Optional + +from app.models.models import VerificationPurpose +from app.core.verification.email.verificationRepository import VerificationRepository + +class MemoryRepository(VerificationRepository): + def __init__(self): + self._store = {} + self._lock = threading.Lock() + + def _key(self, email: str, purpose: VerificationPurpose) -> str: + email = email.lower() + return f"verification:code:{purpose.value.lower()}:{email}" + + def set_code(self, email: str, purpose: VerificationPurpose, code_hash: str, ttl: int) -> None: + key = self._key(email, purpose) + expire_at = time.time() + ttl + + payload = { + "code_hash": code_hash + } + + with self._lock: + self._store[key] = (json.dumps(payload), expire_at) + + def consume_code(self, email: str, purpose: VerificationPurpose) -> Optional[str]: + key = self._key(email, purpose) + + with self._lock: + data = self._store.get(key) + + if not data: + return None + + value, expire_at = data + + if time.time() > expire_at: + del self._store[key] + return None + + del self._store[key] + + try: + payload = json.loads(value) + return payload.get("code_hash") + except Exception: + return None + + def incr(self, key: str, ttl: int) -> int: + now = time.time() + + with self._lock: + value, expire = self._store.get(key, (0, 0)) + + if now > expire: + value = 0 + + value += 1 + self._store[key] = (value, now + ttl) + + return value + +@lru_cache +def get_memory_repo(): + return MemoryRepository() diff --git a/backend/app/core/verification/email/RespositoryImpl/RedisRepository.py b/backend/app/core/verification/email/RespositoryImpl/RedisRepository.py new file mode 100644 index 0000000..c2e15a8 --- /dev/null +++ b/backend/app/core/verification/email/RespositoryImpl/RedisRepository.py @@ -0,0 +1,88 @@ +from functools import lru_cache +import os +import logging +import json +import datetime +import redis +from typing import Optional, TYPE_CHECKING + +from app.models.models import VerificationPurpose +from app.core.verification.email.verificationRepository import VerificationRepository +from app.utils.redis_client import get_redis_client +from app.core.security import hash_verification_code + +logger = logging.getLogger(__name__) + +AUTH_CODE_REDIS_PREFIX = os.getenv("AUTH_CODE_REDIS_PREFIX", "insightradar:auth_code").strip() + + +class RedisRepository(VerificationRepository): + _consume_lua = """ local val = redis.call("GET", KEYS[1]) if val then redis.call("DEL", KEYS[1]) end return val """ + + + def __init__(self, client: redis.Redis): + self.client = client + self._consume_script = self.client.register_script(self._consume_lua) + + + + def _key(self, email, purpose): + return f"{AUTH_CODE_REDIS_PREFIX}:{purpose.value.lower()}:{email}:code" + + def set_code(self, email: str, purpose: VerificationPurpose, code_hash: str, ttl: int) -> None: + """store the code into the redis + + Args: + email (str): email of user + purpose (VerificationPurpose): purpose of the code, such as "login", "register" + code_hash: the hash of the code + ttl: duration of the code + + """ + key = self._key(email, purpose) + + payload = json.dumps({ + "code_hash": code_hash, + "exp": datetime.datetime.now().timestamp() + }) + + self.client.set(key, payload, ex=ttl) + + def consume_code(self, email: str, purpose: VerificationPurpose) -> Optional[str]: + """consume the code of email + + Args: + email (str): email of user + purpose (VerificationPurpose): purpose of the code, such as "login", "register" + + Returns: + _type_: if email has a code which has not been consumed, return the code, else return None + """ + + key = self._key(email, purpose) + data = self._consume_script(keys=[key]) + + if not data: + return None + + try: + payload = json.loads(data) # type: ignore + return payload.get("code_hash") + except Exception: + return None + + def incr(self, key: str, ttl: int) -> int: + super().incr(key, ttl) + value = self.client.incr(key) + + if value == 1: + self.client.expire(key, ttl) + + return value # type: ignore + +@lru_cache +def get_redis_repo(): + client = get_redis_client() + if client is None: + return None + return RedisRepository(client) diff --git a/backend/app/core/verification/email/RespositoryImpl/__init__.py b/backend/app/core/verification/email/RespositoryImpl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/core/verification/email/RespositoryImpl/hybirdRepository.py b/backend/app/core/verification/email/RespositoryImpl/hybirdRepository.py new file mode 100644 index 0000000..5d2e67d --- /dev/null +++ b/backend/app/core/verification/email/RespositoryImpl/hybirdRepository.py @@ -0,0 +1,54 @@ +# app/verification/backends/hybrid.py + +from functools import lru_cache +import logging +from typing import Optional + +from app.models.models import VerificationPurpose +from app.core.verification.email.verificationRepository import VerificationRepository +from app.core.verification.email.RespositoryImpl.MemoryRepository import get_memory_repo +from app.core.verification.email.RespositoryImpl.RedisRepository import get_redis_repo + +logger = logging.getLogger(__name__) + + +class HybridRepository(VerificationRepository): + + def __init__(self, redis_repo: Optional[VerificationRepository], memory_repo: VerificationRepository): + self.redis = redis_repo + self.memory = memory_repo + + def set_code(self, email: str, purpose: VerificationPurpose, code_hash: str, ttl: int) -> None: + if self.redis: + try: + self.redis.set_code(email, purpose, code_hash, ttl) + return + except Exception as e: + logger.warning("Redis set_code failed, fallback to memory: %s", e) + + self.memory.set_code(email, purpose, code_hash, ttl) + + def consume_code(self, email: str, purpose: VerificationPurpose) -> Optional[str]: + if self.redis: + try: + return self.redis.consume_code(email, purpose) + except Exception as e: + logger.warning("Redis consume_code failed, fallback to memory: %s", e) + + return self.memory.consume_code(email, purpose) + + def incr(self, key: str, ttl: int) -> int: + if self.redis: + try: + return self.redis.incr(key, ttl) + except Exception as e: + logger.warning("Redis incr failed, fallback to memory: %s", e) + + return self.memory.incr(key, ttl) + +@lru_cache +def get_verification_repository(): + redis_repo = get_redis_repo() + memory_repo = get_memory_repo() + + return HybridRepository(redis_repo, memory_repo) \ No newline at end of file diff --git a/backend/app/core/verification/email/__init__.py b/backend/app/core/verification/email/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/core/verification/email/verificationRepository.py b/backend/app/core/verification/email/verificationRepository.py new file mode 100644 index 0000000..9a74524 --- /dev/null +++ b/backend/app/core/verification/email/verificationRepository.py @@ -0,0 +1,44 @@ +from abc import abstractmethod, ABC +from typing import Optional + +from app.models.models import VerificationPurpose + +class VerificationRepository(ABC): + """验证码持久层抽象基类""" + + @abstractmethod + def set_code(self, email: str, purpose: VerificationPurpose, code_hash: str, ttl: int) -> None: + """write the code into the storage + + Args: + email (str): email of user + purpose (VerificationPurpose): the purpose of the code, such as, "login", "register" + code_hash (str): hash of the code + ttl (int): duration + """ + pass + + + @abstractmethod + def consume_code(self, email: str, purpose: VerificationPurpose) -> Optional[str]: + """consume the code atomically + + Args: + email (str): email of user + purpose (VerificationPurpose): the purpose of the code, such as, "login", "register" + + Returns: + Optional[str]: if success return the code, else return None + """ + pass + + @abstractmethod + def incr(self, key: str, ttl: int) -> int: + """create a counter in the storage, it will be delete after ttl + + Args: + key (str): key of the counter + ttl (int): duration + """ + pass + \ No newline at end of file diff --git a/backend/app/core/verification/email/verificationService.py b/backend/app/core/verification/email/verificationService.py new file mode 100644 index 0000000..230911e --- /dev/null +++ b/backend/app/core/verification/email/verificationService.py @@ -0,0 +1,79 @@ +from functools import lru_cache +import os +import logging +from typing import Optional + +from app.models.models import VerificationPurpose +from app.core.verification.email.verificationRepository import VerificationRepository +from app.core.security import generate_verification_code, hash_verification_code +from app.core.verification.email.RespositoryImpl.hybirdRepository import get_verification_repository + +logger = logging.getLogger(__name__) + +# 注册验证码有效期(分钟) +REGISTER_CODE_EXPIRE_MINUTES = os.getenv("REGISTER_CODE_EXPIRE_MINUTES", 5) + +# 登录验证码有效期(分钟) +LOGIN_CODE_EXPIRE_MINUTES = os.getenv("LOGIN_CODE_EXPIRE_MINUTES",5) + +# 同一邮箱发送验证码的冷却间隔(秒) +CODE_SEND_COOLDOWN_SECONDS = os.getenv("CODE_SEND_COOLDOWN_SECONDS",60) + +class CodeExpiredError(Exception): + """code has been expired""" + pass + +class CodeInvalidError(Exception): + """code is not right""" + pass + +class TooManyCodeRequestsError(Exception): + """User request too many times when they verificate the email""" + pass + +def get_ttl(purpose: VerificationPurpose)->int: + if purpose == VerificationPurpose.LOGIN: + return int(LOGIN_CODE_EXPIRE_MINUTES) * 60 + else: + return int(REGISTER_CODE_EXPIRE_MINUTES) * 60 + +class EmailVerificationService: + + def __init__(self, repo: VerificationRepository) -> None: + self.repo = repo + + def _cooldown_key(self, email: str, purpose: VerificationPurpose) -> str: + return f"verification:cooldown:{purpose.value}:{email.lower()}" + + def send_code(self, email: str, purpose: VerificationPurpose) -> str: + email = email.lower() + + count = self.repo.incr(self._cooldown_key(email, purpose), int(CODE_SEND_COOLDOWN_SECONDS)) + + if count > 1: + raise TooManyCodeRequestsError("Please wait before requesting another code") + + code = generate_verification_code() + code_hash = hash_verification_code(code) + + self.repo.set_code(email, purpose, code_hash, get_ttl(purpose)) + + return code + + def verify_code(self,email: str, code: str, purpose: VerificationPurpose): + email = email.lower() + + stored_hash: Optional[str] = self.repo.consume_code(email, purpose) + + if not stored_hash: + raise CodeExpiredError("Code expired or not found") + + if stored_hash != hash_verification_code(code): + raise CodeInvalidError("Invalid code") + + return True + +@lru_cache +def get_verification_service(): + repo = get_verification_repository() + return EmailVerificationService(repo) diff --git a/backend/app/main.py b/backend/app/main.py index 1362664..896d8e5 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,7 +8,7 @@ from dotenv import load_dotenv # 统一配置日志格式和级别,确保 delivery_service 等的 INFO 日志可见 logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) diff --git a/backend/app/models/models.py b/backend/app/models/models.py index 30bab54..a70942d 100644 --- a/backend/app/models/models.py +++ b/backend/app/models/models.py @@ -321,22 +321,6 @@ class AppUser(Base): updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, onupdate=utcnow) -class EmailVerificationCode(Base): - __tablename__ = "email_verification_codes" - __table_args__ = ( - Index("idx_email_code_lookup", "email", "purpose", "is_used", "expires_at"), - ) - - id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) - email: Mapped[str] = mapped_column(String(150), index=True, nullable=False) - purpose: Mapped[VerificationPurpose] = mapped_column(Enum(VerificationPurpose), nullable=False) - code_hash: Mapped[str] = mapped_column(String(64), nullable=False) - is_used: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) - expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) - created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) - updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, onupdate=utcnow) - - class UserPushEndpoint(Base): """ 多渠道推送端点配置表 (高可用解耦设计) diff --git a/backend/app/utils/redis_client.py b/backend/app/utils/redis_client.py index e86df80..ca5c4cd 100644 --- a/backend/app/utils/redis_client.py +++ b/backend/app/utils/redis_client.py @@ -1,27 +1,21 @@ +from functools import lru_cache import logging import os from typing import Optional, TYPE_CHECKING - -if TYPE_CHECKING: - from redis import Redis +import redis logger = logging.getLogger(__name__) -try: - import redis # type: ignore -except ImportError: # pragma: no cover - redis = None # type: ignore - REDIS_URL = os.getenv("REDIS_URL", "").strip() REDIS_CONNECT_TIMEOUT_SECONDS = float(os.getenv("REDIS_CONNECT_TIMEOUT_SECONDS", "2")) REDIS_SOCKET_TIMEOUT_SECONDS = float(os.getenv("REDIS_SOCKET_TIMEOUT_SECONDS", "2")) -_redis_client: Optional["Redis"] = None +_redis_client: Optional["redis.Redis"] = None _initialized = False - -def get_redis_client() -> Optional["Redis"]: +@lru_cache +def get_redis_client() -> Optional["redis.Redis"]: """Return a singleton Redis client, or None when Redis is unavailable.""" global _redis_client, _initialized @@ -31,12 +25,7 @@ def get_redis_client() -> Optional["Redis"]: _initialized = True if not REDIS_URL: - logger.info("REDIS_URL 未配置,验证码将回退到数据库存储") - _redis_client = None - return _redis_client - - if redis is None: - logger.warning("未安装 redis 包,验证码将回退到数据库存储") + logger.info("REDIS_URL 未配置,验证码将回退到内存存储") _redis_client = None return _redis_client