mirror of
https://github.com/stardrophere/InsightRadar.git
synced 2026-06-05 23:56:36 +08:00
139 lines
4.1 KiB
Python
139 lines
4.1 KiB
Python
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import secrets
|
|
import time
|
|
from typing import Tuple
|
|
|
|
|
|
DEFAULT_PASSWORD_HASH_ITERATIONS = 120000
|
|
PASSWORD_HASH_ITERATIONS = int(
|
|
os.getenv("PASSWORD_HASH_ITERATIONS", str(DEFAULT_PASSWORD_HASH_ITERATIONS))
|
|
)
|
|
AUTH_SECRET_KEY = os.getenv("AUTH_SECRET_KEY", "change-this-secret-in-env")
|
|
DEFAULT_AUTH_TOKEN_EXPIRE_MINUTES = 10080
|
|
AUTH_TOKEN_EXPIRE_MINUTES = int(
|
|
os.getenv("AUTH_TOKEN_EXPIRE_MINUTES", str(DEFAULT_AUTH_TOKEN_EXPIRE_MINUTES))
|
|
)
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
salt = secrets.token_hex(16)
|
|
digest = hashlib.pbkdf2_hmac(
|
|
"sha256",
|
|
password.encode("utf-8"),
|
|
salt.encode("utf-8"),
|
|
PASSWORD_HASH_ITERATIONS,
|
|
)
|
|
return (
|
|
f"pbkdf2_sha256${PASSWORD_HASH_ITERATIONS}${salt}$"
|
|
f"{base64.urlsafe_b64encode(digest).decode('utf-8')}"
|
|
)
|
|
|
|
|
|
def verify_password(plain_password: str, password_hash: str) -> bool:
|
|
try:
|
|
algorithm, iterations, salt, expected = password_hash.split("$", 3)
|
|
if algorithm != "pbkdf2_sha256":
|
|
return False
|
|
|
|
digest = hashlib.pbkdf2_hmac(
|
|
"sha256",
|
|
plain_password.encode("utf-8"),
|
|
salt.encode("utf-8"),
|
|
int(iterations),
|
|
)
|
|
calculated = base64.urlsafe_b64encode(digest).decode("utf-8")
|
|
return hmac.compare_digest(calculated, expected)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def generate_verification_code(length: int = 6) -> str:
|
|
return "".join(secrets.choice("0123456789") for _ in range(length))
|
|
|
|
|
|
def hash_verification_code(code: str) -> str:
|
|
return hashlib.sha256(code.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def verify_verification_code(code: str, expected_hash: str) -> bool:
|
|
return hmac.compare_digest(hash_verification_code(code), expected_hash)
|
|
|
|
|
|
def _urlsafe_b64encode(raw: bytes) -> str:
|
|
return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=")
|
|
|
|
|
|
def _urlsafe_b64decode(raw: str) -> bytes:
|
|
padding = "=" * (-len(raw) % 4)
|
|
return base64.urlsafe_b64decode(raw + padding)
|
|
|
|
|
|
def create_access_token(user_id: int, email: str) -> Tuple[str, int]:
|
|
expires_in = AUTH_TOKEN_EXPIRE_MINUTES * 60
|
|
payload = {
|
|
"sub": str(user_id),
|
|
"email": email,
|
|
"exp": int(time.time()) + expires_in,
|
|
}
|
|
payload_bytes = json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
|
|
encoded_payload = _urlsafe_b64encode(payload_bytes)
|
|
signature = hmac.new(
|
|
AUTH_SECRET_KEY.encode("utf-8"),
|
|
encoded_payload.encode("utf-8"),
|
|
hashlib.sha256,
|
|
).digest()
|
|
token = f"{encoded_payload}.{_urlsafe_b64encode(signature)}"
|
|
return token, expires_in
|
|
|
|
|
|
def decode_access_token(token: str) -> Tuple[int, str]:
|
|
"""
|
|
解码并校验访问令牌,返回 (user_id, email)。
|
|
校验项包括:结构、签名、过期时间、字段完整性。
|
|
"""
|
|
try:
|
|
encoded_payload, encoded_signature = token.split(".", 1)
|
|
except ValueError as exc:
|
|
raise ValueError("Invalid token format") from exc
|
|
|
|
try:
|
|
provided_signature = _urlsafe_b64decode(encoded_signature)
|
|
except Exception as exc:
|
|
raise ValueError("Invalid token signature encoding") from exc
|
|
|
|
expected_signature = hmac.new(
|
|
AUTH_SECRET_KEY.encode("utf-8"),
|
|
encoded_payload.encode("utf-8"),
|
|
hashlib.sha256,
|
|
).digest()
|
|
if not hmac.compare_digest(provided_signature, expected_signature):
|
|
raise ValueError("Invalid token signature")
|
|
|
|
try:
|
|
payload_bytes = _urlsafe_b64decode(encoded_payload)
|
|
payload = json.loads(payload_bytes.decode("utf-8"))
|
|
except Exception as exc:
|
|
raise ValueError("Invalid token payload") from exc
|
|
|
|
sub = payload.get("sub")
|
|
email = payload.get("email")
|
|
exp = payload.get("exp")
|
|
|
|
if not sub or not email or exp is None:
|
|
raise ValueError("Token payload missing required fields")
|
|
|
|
try:
|
|
user_id = int(sub)
|
|
exp_ts = int(exp)
|
|
except (TypeError, ValueError) as exc:
|
|
raise ValueError("Invalid token payload types") from exc
|
|
|
|
if time.time() >= exp_ts:
|
|
raise ValueError("Token expired")
|
|
|
|
return user_id, str(email)
|