Files
2026-03-13 23:48:49 +08:00

145 lines
4.3 KiB
Python

import base64
import hashlib
import hmac
import json
import os
import secrets
import time
from typing import Tuple
DEFAULT_PASSWORD_HASH_ITERATIONS = 120000
PASSWORD_HASH_ITERATIONS = int(
os.getenv("PASSWORD_HASH_ITERATIONS", str(DEFAULT_PASSWORD_HASH_ITERATIONS))
)
AUTH_SECRET_KEY = os.getenv("AUTH_SECRET_KEY", "change-this-secret-in-env")
DEFAULT_AUTH_TOKEN_EXPIRE_MINUTES = 10080
AUTH_TOKEN_EXPIRE_MINUTES = int(
os.getenv("AUTH_TOKEN_EXPIRE_MINUTES", str(DEFAULT_AUTH_TOKEN_EXPIRE_MINUTES))
)
def hash_password(password: str) -> str:
salt = secrets.token_hex(16)
digest = hashlib.pbkdf2_hmac(
"sha256",
password.encode("utf-8"),
salt.encode("utf-8"),
PASSWORD_HASH_ITERATIONS,
)
return (
f"pbkdf2_sha256${PASSWORD_HASH_ITERATIONS}${salt}$"
f"{base64.urlsafe_b64encode(digest).decode('utf-8')}"
)
def verify_password(plain_password: str, password_hash: str) -> bool:
try:
algorithm, iterations, salt, expected = password_hash.split("$", 3)
if algorithm != "pbkdf2_sha256":
return False
digest = hashlib.pbkdf2_hmac(
"sha256",
plain_password.encode("utf-8"),
salt.encode("utf-8"),
int(iterations),
)
calculated = base64.urlsafe_b64encode(digest).decode("utf-8")
return hmac.compare_digest(calculated, expected)
except Exception:
return False
def generate_verification_code(length: int = 6) -> str:
return "".join(secrets.choice("0123456789") for _ in range(length))
def hash_verification_code(code: str) -> str:
return hashlib.sha256(code.encode("utf-8")).hexdigest()
def verify_verification_code(code: str, expected_hash: str) -> bool:
try:
# compare against string to avoid type issues with hmac.compare_digest
code_hash = str(hash_verification_code(code))
expected = str(expected_hash)
return hmac.compare_digest(code_hash, expected)
except Exception:
return False
def _urlsafe_b64encode(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=")
def _urlsafe_b64decode(raw: str) -> bytes:
padding = "=" * (-len(raw) % 4)
return base64.urlsafe_b64decode(raw + padding)
def create_access_token(user_id: int, email: str) -> Tuple[str, int]:
expires_in = AUTH_TOKEN_EXPIRE_MINUTES * 60
payload = {
"sub": str(user_id),
"email": email,
"exp": int(time.time()) + expires_in,
}
payload_bytes = json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
encoded_payload = _urlsafe_b64encode(payload_bytes)
signature = hmac.new(
AUTH_SECRET_KEY.encode("utf-8"),
encoded_payload.encode("utf-8"),
hashlib.sha256,
).digest()
token = f"{encoded_payload}.{_urlsafe_b64encode(signature)}"
return token, expires_in
def decode_access_token(token: str) -> Tuple[int, str]:
"""
解码并校验访问令牌,返回 (user_id, email)。
校验项包括:结构、签名、过期时间、字段完整性。
"""
try:
encoded_payload, encoded_signature = token.split(".", 1)
except ValueError as exc:
raise ValueError("Invalid token format") from exc
try:
provided_signature = _urlsafe_b64decode(encoded_signature)
except Exception as exc:
raise ValueError("Invalid token signature encoding") from exc
expected_signature = hmac.new(
AUTH_SECRET_KEY.encode("utf-8"),
encoded_payload.encode("utf-8"),
hashlib.sha256,
).digest()
if not hmac.compare_digest(provided_signature, expected_signature):
raise ValueError("Invalid token signature")
try:
payload_bytes = _urlsafe_b64decode(encoded_payload)
payload = json.loads(payload_bytes.decode("utf-8"))
except Exception as exc:
raise ValueError("Invalid token payload") from exc
sub = payload.get("sub")
email = payload.get("email")
exp = payload.get("exp")
if not sub or not email or exp is None:
raise ValueError("Token payload missing required fields")
try:
user_id = int(sub)
exp_ts = int(exp)
except (TypeError, ValueError) as exc:
raise ValueError("Invalid token payload types") from exc
if time.time() >= exp_ts:
raise ValueError("Token expired")
return user_id, str(email)