import base64 import hashlib import hmac import json import os import secrets import time from typing import Tuple DEFAULT_PASSWORD_HASH_ITERATIONS = 120000 PASSWORD_HASH_ITERATIONS = int( os.getenv("PASSWORD_HASH_ITERATIONS", str(DEFAULT_PASSWORD_HASH_ITERATIONS)) ) AUTH_SECRET_KEY = os.getenv("AUTH_SECRET_KEY", "change-this-secret-in-env") DEFAULT_AUTH_TOKEN_EXPIRE_MINUTES = 10080 AUTH_TOKEN_EXPIRE_MINUTES = int( os.getenv("AUTH_TOKEN_EXPIRE_MINUTES", str(DEFAULT_AUTH_TOKEN_EXPIRE_MINUTES)) ) def hash_password(password: str) -> str: salt = secrets.token_hex(16) digest = hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), salt.encode("utf-8"), PASSWORD_HASH_ITERATIONS, ) return ( f"pbkdf2_sha256${PASSWORD_HASH_ITERATIONS}${salt}$" f"{base64.urlsafe_b64encode(digest).decode('utf-8')}" ) def verify_password(plain_password: str, password_hash: str) -> bool: try: algorithm, iterations, salt, expected = password_hash.split("$", 3) if algorithm != "pbkdf2_sha256": return False digest = hashlib.pbkdf2_hmac( "sha256", plain_password.encode("utf-8"), salt.encode("utf-8"), int(iterations), ) calculated = base64.urlsafe_b64encode(digest).decode("utf-8") return hmac.compare_digest(calculated, expected) except Exception: return False def generate_verification_code(length: int = 6) -> str: return "".join(secrets.choice("0123456789") for _ in range(length)) def hash_verification_code(code: str) -> str: return hashlib.sha256(code.encode("utf-8")).hexdigest() def verify_verification_code(code: str, expected_hash: str) -> bool: return hmac.compare_digest(hash_verification_code(code), expected_hash) def _urlsafe_b64encode(raw: bytes) -> str: return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=") def _urlsafe_b64decode(raw: str) -> bytes: padding = "=" * (-len(raw) % 4) return base64.urlsafe_b64decode(raw + padding) def create_access_token(user_id: int, email: str) -> Tuple[str, int]: expires_in = AUTH_TOKEN_EXPIRE_MINUTES * 60 payload = { "sub": str(user_id), "email": email, "exp": int(time.time()) + expires_in, } payload_bytes = json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8") encoded_payload = _urlsafe_b64encode(payload_bytes) signature = hmac.new( AUTH_SECRET_KEY.encode("utf-8"), encoded_payload.encode("utf-8"), hashlib.sha256, ).digest() token = f"{encoded_payload}.{_urlsafe_b64encode(signature)}" return token, expires_in def decode_access_token(token: str) -> Tuple[int, str]: """ 解码并校验访问令牌,返回 (user_id, email)。 校验项包括:结构、签名、过期时间、字段完整性。 """ try: encoded_payload, encoded_signature = token.split(".", 1) except ValueError as exc: raise ValueError("Invalid token format") from exc try: provided_signature = _urlsafe_b64decode(encoded_signature) except Exception as exc: raise ValueError("Invalid token signature encoding") from exc expected_signature = hmac.new( AUTH_SECRET_KEY.encode("utf-8"), encoded_payload.encode("utf-8"), hashlib.sha256, ).digest() if not hmac.compare_digest(provided_signature, expected_signature): raise ValueError("Invalid token signature") try: payload_bytes = _urlsafe_b64decode(encoded_payload) payload = json.loads(payload_bytes.decode("utf-8")) except Exception as exc: raise ValueError("Invalid token payload") from exc sub = payload.get("sub") email = payload.get("email") exp = payload.get("exp") if not sub or not email or exp is None: raise ValueError("Token payload missing required fields") try: user_id = int(sub) exp_ts = int(exp) except (TypeError, ValueError) as exc: raise ValueError("Invalid token payload types") from exc if time.time() >= exp_ts: raise ValueError("Token expired") return user_id, str(email)