彻底删除数据库记录验证码

This commit is contained in:
2026-03-26 01:48:55 +08:00
parent 2335b62384
commit b18901a2d5
12 changed files with 444 additions and 417 deletions
+88 -370
View File
@@ -4,6 +4,7 @@
import json
import math
import os
import logging
from datetime import timedelta, timezone
from typing import Optional, Tuple
@@ -19,7 +20,7 @@ from app.core.security import (
verify_password,
verify_verification_code,
)
from app.models.models import AppUser, EmailVerificationCode, VerificationPurpose, utcnow
from app.models.models import AppUser, VerificationPurpose, utcnow
from app.schemas.auth_schema import (
AuthTokenResponse,
LoginCodeSendRequest,
@@ -32,9 +33,11 @@ from app.schemas.auth_schema import (
)
from app.utils.email_utils import send_html_email
from app.utils.redis_client import get_redis_client
from app.core.verification.email.verificationService import EmailVerificationService, get_verification_service, TooManyCodeRequestsError, CodeExpiredError, CodeInvalidError
router = APIRouter()
logger = logging.getLogger(__name__)
DEFAULT_REGISTER_CODE_EXPIRE_MINUTES = 10
DEFAULT_LOGIN_CODE_EXPIRE_MINUTES = 10
@@ -131,19 +134,12 @@ def _cache_code_in_redis(
"code_hash": code_hash,
"created_at": utcnow().isoformat(),
}
try:
client.set(
_redis_code_key(email, purpose),
json.dumps(payload),
ex=max(1, expire_minutes * 60),
)
except Exception as e:
if _is_redis_only():
# If redis fails but we're in redis_only, don't crash here.
# We already generated the code hash, but we won't cache it in redis.
# However, since code_record handling in the caller already fell back to DB
# if _require_redis_for_codes() failed, we should just let it pass.
pass
def _set_send_cooldown_in_redis(email: str, purpose: VerificationPurpose) -> None:
@@ -178,166 +174,6 @@ def _clear_code_in_redis(email: str, purpose: VerificationPurpose) -> None:
pass
def _verify_code_with_redis(
email: str,
purpose: VerificationPurpose,
code: str,
*,
strict: bool = False,
) -> Optional[bool]:
"""
Redis 验证码校验。
返回:
- True: 校验成功,且已消费验证码
- False: Redis 有验证码但校验失败
- None: Redis 不可用或无记录,调用方可按策略回退数据库
"""
client = _get_redis_for_codes()
if client is None:
if strict:
pass # allow fallback
return None
try:
raw = client.get(_redis_code_key(email, purpose))
except Exception as e:
if strict:
pass # fallthrough to let it try db instead of crashing
return None
if not raw:
return None
try:
payload = json.loads(raw)
expected_hash = str(payload.get("code_hash", ""))
except Exception:
# 不要轻易清除,可能是数据格式异常
return None
if not expected_hash:
return None
if not verify_verification_code(code, expected_hash):
# 注意:校验失败时不要直接清空 Redis,可能用户只是输错了
return False
_clear_code_in_redis(email, purpose)
return True
def _invalidate_unused_codes(db: Session, email: str, purpose: VerificationPurpose) -> None:
"""将同一邮箱、同一用途下未使用的旧验证码全部标记为已使用,避免重复使用"""
db.query(EmailVerificationCode).filter(
EmailVerificationCode.email == email,
EmailVerificationCode.purpose == purpose,
EmailVerificationCode.is_used.is_(False),
).update({EmailVerificationCode.is_used: True}, synchronize_session=False)
db.commit()
def _create_code_record(
db: Session,
*,
email: str,
purpose: VerificationPurpose,
expire_minutes: int,
) -> Tuple[EmailVerificationCode, str]:
"""在数据库中创建验证码记录,返回 (记录对象, 明文验证码)"""
code = generate_verification_code()
now = utcnow()
code_record = EmailVerificationCode(
email=email,
purpose=purpose,
code_hash=hash_verification_code(code),
expires_at=now + timedelta(minutes=expire_minutes),
)
db.add(code_record)
db.commit()
return code_record, code
def _get_latest_valid_code_record(
db: Session,
*,
email: str,
purpose: VerificationPurpose,
):
"""从数据库获取该邮箱该用途下最新且未过期、未使用的验证码记录"""
now = utcnow()
return (
db.query(EmailVerificationCode)
.filter(
EmailVerificationCode.email == email,
EmailVerificationCode.purpose == purpose,
EmailVerificationCode.is_used.is_(False),
EmailVerificationCode.expires_at >= now,
)
.order_by(EmailVerificationCode.created_at.desc())
.first()
)
def _enforce_code_send_cooldown(db: Session, email: str, purpose: VerificationPurpose) -> None:
"""限制同一邮箱同一用途验证码的发送频率。"""
if CODE_SEND_COOLDOWN_SECONDS <= 0:
return
client = _get_redis_for_codes()
if client is not None:
try:
ttl = client.ttl(_redis_cooldown_key(email, purpose))
if ttl is not None and ttl > 0:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Please wait {ttl}s before requesting another verification code",
headers={"Retry-After": str(ttl)},
)
if _is_redis_only():
return
except HTTPException:
raise
except Exception:
# redis failed during cooldown check, fallback to DB
pass
if _is_redis_only():
# Even if redis_only, we allow it to fallthrough if it's down.
# This aligns with our fallback logic.
try:
_require_redis_for_codes()
return
except HTTPException:
pass # fallback to db check
latest_record = (
db.query(EmailVerificationCode)
.filter(
EmailVerificationCode.email == email,
EmailVerificationCode.purpose == purpose,
)
.order_by(EmailVerificationCode.created_at.desc())
.first()
)
if not latest_record:
return
now = utcnow()
record_time = latest_record.created_at
if record_time.tzinfo is None:
record_time = record_time.replace(tzinfo=timezone.utc)
elapsed_seconds = (now - record_time).total_seconds()
if elapsed_seconds >= CODE_SEND_COOLDOWN_SECONDS:
return
retry_after_seconds = max(1, math.ceil(CODE_SEND_COOLDOWN_SECONDS - elapsed_seconds))
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Please wait {retry_after_seconds}s before requesting another verification code",
headers={"Retry-After": str(retry_after_seconds)},
)
def _build_auth_response(user: AppUser) -> AuthTokenResponse:
token, expires_in = create_access_token(user_id=user.id, email=user.email)
return AuthTokenResponse(
@@ -348,219 +184,115 @@ def _build_auth_response(user: AppUser) -> AuthTokenResponse:
@router.post("/register/send-code", response_model=MessageResponse)
async def send_register_code(payload: RegisterCodeSendRequest, db: Session = Depends(get_db)):
"""发送注册验证码:先校验邮箱未注册、冷却期,再生成并发送"""
async def send_register_code(
payload: RegisterCodeSendRequest,
db: Session = Depends(get_db),
service: EmailVerificationService = Depends(get_verification_service),
):
email = _normalize_email(payload.email)
existing_user = db.query(AppUser).filter(AppUser.email == email).first()
if existing_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email is already registered")
_enforce_code_send_cooldown(db, email, VerificationPurpose.REGISTER)
code_record = None
if _is_redis_only():
try:
_require_redis_for_codes()
code = generate_verification_code()
code_hash = hash_verification_code(code)
except HTTPException:
# If redis is down, temporarily fallback to DB even in redis_only mode
_invalidate_unused_codes(db, email, VerificationPurpose.REGISTER)
code_record, code = _create_code_record(
db,
email=email,
purpose=VerificationPurpose.REGISTER,
expire_minutes=REGISTER_CODE_EXPIRE_MINUTES,
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is already registered",
)
code_hash = code_record.code_hash
else:
_invalidate_unused_codes(db, email, VerificationPurpose.REGISTER)
code_record, code = _create_code_record(
db,
email=email,
purpose=VerificationPurpose.REGISTER,
expire_minutes=REGISTER_CODE_EXPIRE_MINUTES,
)
code_hash = code_record.code_hash
_cache_code_in_redis(
email=email,
purpose=VerificationPurpose.REGISTER,
code_hash=code_hash,
expire_minutes=REGISTER_CODE_EXPIRE_MINUTES,
)
_set_send_cooldown_in_redis(email, VerificationPurpose.REGISTER)
try:
email_sent = await send_html_email(
code = service.send_code(email, VerificationPurpose.REGISTER)
await send_html_email(
to_email=email,
subject=f"{code}】InsightRadar 注册验证码",
html_content=_build_verification_email(code, "注册", REGISTER_CODE_EXPIRE_MINUTES),
html_content=_build_verification_email(
code, "注册", REGISTER_CODE_EXPIRE_MINUTES
),
)
except TooManyCodeRequestsError as e:
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=str(e))
except Exception as e:
_clear_code_in_redis(email, VerificationPurpose.REGISTER)
# also clear cooldown if possible, so user can retry immediately
client = _get_redis_for_codes()
if client:
try:
client.delete(_redis_cooldown_key(email, VerificationPurpose.REGISTER))
except Exception:
pass
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
status_code=500,
detail=f"Failed to send verification code: {e}",
)
if not email_sent:
_clear_code_in_redis(email, VerificationPurpose.REGISTER)
client = _get_redis_for_codes()
if client:
try:
client.delete(_redis_cooldown_key(email, VerificationPurpose.REGISTER))
except Exception:
pass
if code_record is not None:
code_record.is_used = True
db.add(code_record)
db.commit()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to send verification code",
)
return MessageResponse(message="Verification code sent")
@router.post("/login/send-code", response_model=MessageResponse)
async def send_login_code(payload: LoginCodeSendRequest, db: Session = Depends(get_db)):
"""发送登录验证码:仅对已注册用户发送"""
async def send_login_code(
payload: LoginCodeSendRequest,
db: Session = Depends(get_db),
service: EmailVerificationService = Depends(get_verification_service),
):
email = _normalize_email(payload.email)
user = db.query(AppUser).filter(AppUser.email == email).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Email is not registered")
_enforce_code_send_cooldown(db, email, VerificationPurpose.LOGIN)
code_record = None
if _is_redis_only():
try:
_require_redis_for_codes()
code = generate_verification_code()
code_hash = hash_verification_code(code)
except HTTPException:
# If redis is down, temporarily fallback to DB even in redis_only mode
_invalidate_unused_codes(db, email, VerificationPurpose.LOGIN)
code_record, code = _create_code_record(
db,
email=email,
purpose=VerificationPurpose.LOGIN,
expire_minutes=LOGIN_CODE_EXPIRE_MINUTES,
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Email is not registered",
)
code_hash = code_record.code_hash
else:
_invalidate_unused_codes(db, email, VerificationPurpose.LOGIN)
code_record, code = _create_code_record(
db,
email=email,
purpose=VerificationPurpose.LOGIN,
expire_minutes=LOGIN_CODE_EXPIRE_MINUTES,
)
code_hash = code_record.code_hash
_cache_code_in_redis(
email=email,
purpose=VerificationPurpose.LOGIN,
code_hash=code_hash,
expire_minutes=LOGIN_CODE_EXPIRE_MINUTES,
)
_set_send_cooldown_in_redis(email, VerificationPurpose.LOGIN)
try:
email_sent = await send_html_email(
code = service.send_code(email, VerificationPurpose.LOGIN)
await send_html_email(
to_email=email,
subject=f"{code}】InsightRadar 登录验证码",
html_content=_build_verification_email(code, "登录", LOGIN_CODE_EXPIRE_MINUTES),
html_content=_build_verification_email(
code, "登录", LOGIN_CODE_EXPIRE_MINUTES
),
)
except TooManyCodeRequestsError as e:
raise HTTPException(status_code=429, detail=str(e))
except Exception as e:
_clear_code_in_redis(email, VerificationPurpose.LOGIN)
client = _get_redis_for_codes()
if client:
try:
client.delete(_redis_cooldown_key(email, VerificationPurpose.LOGIN))
except Exception:
pass
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
status_code=500,
detail=f"Failed to send verification code: {e}",
)
if not email_sent:
_clear_code_in_redis(email, VerificationPurpose.LOGIN)
client = _get_redis_for_codes()
if client:
try:
client.delete(_redis_cooldown_key(email, VerificationPurpose.LOGIN))
except Exception:
pass
if code_record is not None:
code_record.is_used = True
db.add(code_record)
db.commit()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to send verification code",
)
return MessageResponse(message="Verification code sent")
@router.post(
"/register",
response_model=AuthTokenResponse,
status_code=status.HTTP_201_CREATED,
)
async def register(payload: RegisterRequest, db: Session = Depends(get_db)):
"""用户注册:校验验证码(Redis 优先,失败则回退数据库)后创建用户"""
async def register(
payload: RegisterRequest,
db: Session = Depends(get_db),
service: EmailVerificationService = Depends(get_verification_service),
):
email = _normalize_email(payload.email)
existing_user = db.query(AppUser).filter(AppUser.email == email).first()
if existing_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email is already registered")
redis_result = _verify_code_with_redis(
email,
VerificationPurpose.REGISTER,
payload.verification_code,
strict=False, # Never be strict so we can fallback to DB if redis is down
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is already registered",
)
code_record = None
if redis_result is False:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code")
if redis_result is None:
# 即使在 _is_redis_only() 模式下,也去数据库兜底查找
# 这样如果Redis挂了时代码回退到了DB,验证时也能从DB拿出来。
code_record = _get_latest_valid_code_record(
db,
try:
service.verify_code(
email=email,
purpose=VerificationPurpose.REGISTER,
code=payload.verification_code,
)
if not code_record:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code does not exist or expired")
if not verify_verification_code(payload.verification_code, code_record.code_hash):
except CodeExpiredError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code expired")
except CodeInvalidError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code")
else:
# Redis 成功时尽量同步消费 DB 里的最新验证码,保持一致性。
# 即使在 _is_redis_only(),如果先前发生了降级,这里也顺手清理掉。
code_record = _get_latest_valid_code_record(
db,
email=email,
purpose=VerificationPurpose.REGISTER,
)
except TooManyCodeRequestsError:
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many attempts")
now = utcnow()
nickname = payload.nickname or email.split("@")[0]
user = AppUser(
email=email,
password_hash=hash_password(payload.password),
@@ -569,16 +301,11 @@ async def register(payload: RegisterRequest, db: Session = Depends(get_db)):
)
db.add(user)
if code_record is not None:
code_record.is_used = True
db.add(code_record)
db.commit()
db.refresh(user)
return _build_auth_response(user)
@router.post("/login", response_model=AuthTokenResponse)
async def login(payload: LoginRequest, db: Session = Depends(get_db)):
"""密码登录"""
@@ -595,49 +322,40 @@ async def login(payload: LoginRequest, db: Session = Depends(get_db)):
@router.post("/login/code", response_model=AuthTokenResponse)
async def login_with_code(payload: LoginWithCodeRequest, db: Session = Depends(get_db)):
"""验证码登录:Redis 校验优先,失败则从数据库兜底"""
async def login_with_code(
payload: LoginWithCodeRequest,
db: Session = Depends(get_db),
service: EmailVerificationService = Depends(get_verification_service),
):
email = _normalize_email(payload.email)
user = db.query(AppUser).filter(AppUser.email == email).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code")
redis_result = _verify_code_with_redis(
email,
VerificationPurpose.LOGIN,
payload.verification_code,
strict=False, # Never be strict so we can fallback to DB if redis is down
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or verification code",
)
code_record = None
if redis_result is False:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code")
if redis_result is None:
code_record = _get_latest_valid_code_record(
db,
try:
service.verify_code(
email=email,
purpose=VerificationPurpose.LOGIN,
code=payload.verification_code,
)
if not code_record:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code does not exist or expired")
if not verify_verification_code(payload.verification_code, code_record.code_hash):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code")
else:
# Redis 成功时尽量同步消费 DB 里的最新验证码,保持一致性。
# 即使在 _is_redis_only(),如果先前发生了降级,这里也顺手清理掉。
code_record = _get_latest_valid_code_record(
db,
email=email,
purpose=VerificationPurpose.LOGIN,
except CodeExpiredError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or verification code",
)
except CodeInvalidError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or verification code",
)
except TooManyCodeRequestsError:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many attempts",
)
if code_record is not None:
code_record.is_used = True
db.add(code_record)
db.commit()
return _build_auth_response(user)
@@ -0,0 +1,71 @@
# app/verification/backends/memory.py
from functools import lru_cache
import time
import json
import threading
from typing import Optional
from app.models.models import VerificationPurpose
from app.core.verification.email.verificationRepository import VerificationRepository
class MemoryRepository(VerificationRepository):
def __init__(self):
self._store = {}
self._lock = threading.Lock()
def _key(self, email: str, purpose: VerificationPurpose) -> str:
email = email.lower()
return f"verification:code:{purpose.value.lower()}:{email}"
def set_code(self, email: str, purpose: VerificationPurpose, code_hash: str, ttl: int) -> None:
key = self._key(email, purpose)
expire_at = time.time() + ttl
payload = {
"code_hash": code_hash
}
with self._lock:
self._store[key] = (json.dumps(payload), expire_at)
def consume_code(self, email: str, purpose: VerificationPurpose) -> Optional[str]:
key = self._key(email, purpose)
with self._lock:
data = self._store.get(key)
if not data:
return None
value, expire_at = data
if time.time() > expire_at:
del self._store[key]
return None
del self._store[key]
try:
payload = json.loads(value)
return payload.get("code_hash")
except Exception:
return None
def incr(self, key: str, ttl: int) -> int:
now = time.time()
with self._lock:
value, expire = self._store.get(key, (0, 0))
if now > expire:
value = 0
value += 1
self._store[key] = (value, now + ttl)
return value
@lru_cache
def get_memory_repo():
return MemoryRepository()
@@ -0,0 +1,88 @@
from functools import lru_cache
import os
import logging
import json
import datetime
import redis
from typing import Optional, TYPE_CHECKING
from app.models.models import VerificationPurpose
from app.core.verification.email.verificationRepository import VerificationRepository
from app.utils.redis_client import get_redis_client
from app.core.security import hash_verification_code
logger = logging.getLogger(__name__)
AUTH_CODE_REDIS_PREFIX = os.getenv("AUTH_CODE_REDIS_PREFIX", "insightradar:auth_code").strip()
class RedisRepository(VerificationRepository):
_consume_lua = """ local val = redis.call("GET", KEYS[1]) if val then redis.call("DEL", KEYS[1]) end return val """
def __init__(self, client: redis.Redis):
self.client = client
self._consume_script = self.client.register_script(self._consume_lua)
def _key(self, email, purpose):
return f"{AUTH_CODE_REDIS_PREFIX}:{purpose.value.lower()}:{email}:code"
def set_code(self, email: str, purpose: VerificationPurpose, code_hash: str, ttl: int) -> None:
"""store the code into the redis
Args:
email (str): email of user
purpose (VerificationPurpose): purpose of the code, such as "login", "register"
code_hash: the hash of the code
ttl: duration of the code
"""
key = self._key(email, purpose)
payload = json.dumps({
"code_hash": code_hash,
"exp": datetime.datetime.now().timestamp()
})
self.client.set(key, payload, ex=ttl)
def consume_code(self, email: str, purpose: VerificationPurpose) -> Optional[str]:
"""consume the code of email
Args:
email (str): email of user
purpose (VerificationPurpose): purpose of the code, such as "login", "register"
Returns:
_type_: if email has a code which has not been consumed, return the code, else return None
"""
key = self._key(email, purpose)
data = self._consume_script(keys=[key])
if not data:
return None
try:
payload = json.loads(data) # type: ignore
return payload.get("code_hash")
except Exception:
return None
def incr(self, key: str, ttl: int) -> int:
super().incr(key, ttl)
value = self.client.incr(key)
if value == 1:
self.client.expire(key, ttl)
return value # type: ignore
@lru_cache
def get_redis_repo():
client = get_redis_client()
if client is None:
return None
return RedisRepository(client)
@@ -0,0 +1,54 @@
# app/verification/backends/hybrid.py
from functools import lru_cache
import logging
from typing import Optional
from app.models.models import VerificationPurpose
from app.core.verification.email.verificationRepository import VerificationRepository
from app.core.verification.email.RespositoryImpl.MemoryRepository import get_memory_repo
from app.core.verification.email.RespositoryImpl.RedisRepository import get_redis_repo
logger = logging.getLogger(__name__)
class HybridRepository(VerificationRepository):
def __init__(self, redis_repo: Optional[VerificationRepository], memory_repo: VerificationRepository):
self.redis = redis_repo
self.memory = memory_repo
def set_code(self, email: str, purpose: VerificationPurpose, code_hash: str, ttl: int) -> None:
if self.redis:
try:
self.redis.set_code(email, purpose, code_hash, ttl)
return
except Exception as e:
logger.warning("Redis set_code failed, fallback to memory: %s", e)
self.memory.set_code(email, purpose, code_hash, ttl)
def consume_code(self, email: str, purpose: VerificationPurpose) -> Optional[str]:
if self.redis:
try:
return self.redis.consume_code(email, purpose)
except Exception as e:
logger.warning("Redis consume_code failed, fallback to memory: %s", e)
return self.memory.consume_code(email, purpose)
def incr(self, key: str, ttl: int) -> int:
if self.redis:
try:
return self.redis.incr(key, ttl)
except Exception as e:
logger.warning("Redis incr failed, fallback to memory: %s", e)
return self.memory.incr(key, ttl)
@lru_cache
def get_verification_repository():
redis_repo = get_redis_repo()
memory_repo = get_memory_repo()
return HybridRepository(redis_repo, memory_repo)
@@ -0,0 +1,44 @@
from abc import abstractmethod, ABC
from typing import Optional
from app.models.models import VerificationPurpose
class VerificationRepository(ABC):
"""验证码持久层抽象基类"""
@abstractmethod
def set_code(self, email: str, purpose: VerificationPurpose, code_hash: str, ttl: int) -> None:
"""write the code into the storage
Args:
email (str): email of user
purpose (VerificationPurpose): the purpose of the code, such as, "login", "register"
code_hash (str): hash of the code
ttl (int): duration
"""
pass
@abstractmethod
def consume_code(self, email: str, purpose: VerificationPurpose) -> Optional[str]:
"""consume the code atomically
Args:
email (str): email of user
purpose (VerificationPurpose): the purpose of the code, such as, "login", "register"
Returns:
Optional[str]: if success return the code, else return None
"""
pass
@abstractmethod
def incr(self, key: str, ttl: int) -> int:
"""create a counter in the storage, it will be delete after ttl
Args:
key (str): key of the counter
ttl (int): duration
"""
pass
@@ -0,0 +1,79 @@
from functools import lru_cache
import os
import logging
from typing import Optional
from app.models.models import VerificationPurpose
from app.core.verification.email.verificationRepository import VerificationRepository
from app.core.security import generate_verification_code, hash_verification_code
from app.core.verification.email.RespositoryImpl.hybirdRepository import get_verification_repository
logger = logging.getLogger(__name__)
# 注册验证码有效期(分钟)
REGISTER_CODE_EXPIRE_MINUTES = os.getenv("REGISTER_CODE_EXPIRE_MINUTES", 5)
# 登录验证码有效期(分钟)
LOGIN_CODE_EXPIRE_MINUTES = os.getenv("LOGIN_CODE_EXPIRE_MINUTES",5)
# 同一邮箱发送验证码的冷却间隔(秒)
CODE_SEND_COOLDOWN_SECONDS = os.getenv("CODE_SEND_COOLDOWN_SECONDS",60)
class CodeExpiredError(Exception):
"""code has been expired"""
pass
class CodeInvalidError(Exception):
"""code is not right"""
pass
class TooManyCodeRequestsError(Exception):
"""User request too many times when they verificate the email"""
pass
def get_ttl(purpose: VerificationPurpose)->int:
if purpose == VerificationPurpose.LOGIN:
return int(LOGIN_CODE_EXPIRE_MINUTES) * 60
else:
return int(REGISTER_CODE_EXPIRE_MINUTES) * 60
class EmailVerificationService:
def __init__(self, repo: VerificationRepository) -> None:
self.repo = repo
def _cooldown_key(self, email: str, purpose: VerificationPurpose) -> str:
return f"verification:cooldown:{purpose.value}:{email.lower()}"
def send_code(self, email: str, purpose: VerificationPurpose) -> str:
email = email.lower()
count = self.repo.incr(self._cooldown_key(email, purpose), int(CODE_SEND_COOLDOWN_SECONDS))
if count > 1:
raise TooManyCodeRequestsError("Please wait before requesting another code")
code = generate_verification_code()
code_hash = hash_verification_code(code)
self.repo.set_code(email, purpose, code_hash, get_ttl(purpose))
return code
def verify_code(self,email: str, code: str, purpose: VerificationPurpose):
email = email.lower()
stored_hash: Optional[str] = self.repo.consume_code(email, purpose)
if not stored_hash:
raise CodeExpiredError("Code expired or not found")
if stored_hash != hash_verification_code(code):
raise CodeInvalidError("Invalid code")
return True
@lru_cache
def get_verification_service():
repo = get_verification_repository()
return EmailVerificationService(repo)
+1 -1
View File
@@ -8,7 +8,7 @@ from dotenv import load_dotenv
# 统一配置日志格式和级别,确保 delivery_service 等的 INFO 日志可见
logging.basicConfig(
level=logging.INFO,
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
-16
View File
@@ -321,22 +321,6 @@ class AppUser(Base):
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, onupdate=utcnow)
class EmailVerificationCode(Base):
__tablename__ = "email_verification_codes"
__table_args__ = (
Index("idx_email_code_lookup", "email", "purpose", "is_used", "expires_at"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
email: Mapped[str] = mapped_column(String(150), index=True, nullable=False)
purpose: Mapped[VerificationPurpose] = mapped_column(Enum(VerificationPurpose), nullable=False)
code_hash: Mapped[str] = mapped_column(String(64), nullable=False)
is_used: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, onupdate=utcnow)
class UserPushEndpoint(Base):
"""
多渠道推送端点配置表 (高可用解耦设计)
+6 -17
View File
@@ -1,27 +1,21 @@
from functools import lru_cache
import logging
import os
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from redis import Redis
import redis
logger = logging.getLogger(__name__)
try:
import redis # type: ignore
except ImportError: # pragma: no cover
redis = None # type: ignore
REDIS_URL = os.getenv("REDIS_URL", "").strip()
REDIS_CONNECT_TIMEOUT_SECONDS = float(os.getenv("REDIS_CONNECT_TIMEOUT_SECONDS", "2"))
REDIS_SOCKET_TIMEOUT_SECONDS = float(os.getenv("REDIS_SOCKET_TIMEOUT_SECONDS", "2"))
_redis_client: Optional["Redis"] = None
_redis_client: Optional["redis.Redis"] = None
_initialized = False
def get_redis_client() -> Optional["Redis"]:
@lru_cache
def get_redis_client() -> Optional["redis.Redis"]:
"""Return a singleton Redis client, or None when Redis is unavailable."""
global _redis_client, _initialized
@@ -31,12 +25,7 @@ def get_redis_client() -> Optional["Redis"]:
_initialized = True
if not REDIS_URL:
logger.info("REDIS_URL 未配置,验证码将回退到数据库存储")
_redis_client = None
return _redis_client
if redis is None:
logger.warning("未安装 redis 包,验证码将回退到数据库存储")
logger.info("REDIS_URL 未配置,验证码将回退到内存存储")
_redis_client = None
return _redis_client