mirror of
https://github.com/stardrophere/InsightRadar.git
synced 2026-06-06 00:57:51 +08:00
login+ai cluster
This commit is contained in:
@@ -0,0 +1,79 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import os
|
||||
import secrets
|
||||
import time
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
PASSWORD_HASH_ITERATIONS = int(os.getenv("PASSWORD_HASH_ITERATIONS", "120000"))
|
||||
AUTH_SECRET_KEY = os.getenv("AUTH_SECRET_KEY", "change-this-secret-in-env")
|
||||
AUTH_TOKEN_EXPIRE_MINUTES = int(os.getenv("AUTH_TOKEN_EXPIRE_MINUTES", "10080"))
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
salt = secrets.token_hex(16)
|
||||
digest = hashlib.pbkdf2_hmac(
|
||||
"sha256",
|
||||
password.encode("utf-8"),
|
||||
salt.encode("utf-8"),
|
||||
PASSWORD_HASH_ITERATIONS,
|
||||
)
|
||||
return (
|
||||
f"pbkdf2_sha256${PASSWORD_HASH_ITERATIONS}${salt}$"
|
||||
f"{base64.urlsafe_b64encode(digest).decode('utf-8')}"
|
||||
)
|
||||
|
||||
|
||||
def verify_password(plain_password: str, password_hash: str) -> bool:
|
||||
try:
|
||||
algorithm, iterations, salt, expected = password_hash.split("$", 3)
|
||||
if algorithm != "pbkdf2_sha256":
|
||||
return False
|
||||
|
||||
digest = hashlib.pbkdf2_hmac(
|
||||
"sha256",
|
||||
plain_password.encode("utf-8"),
|
||||
salt.encode("utf-8"),
|
||||
int(iterations),
|
||||
)
|
||||
calculated = base64.urlsafe_b64encode(digest).decode("utf-8")
|
||||
return hmac.compare_digest(calculated, expected)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def generate_verification_code(length: int = 6) -> str:
|
||||
return "".join(secrets.choice("0123456789") for _ in range(length))
|
||||
|
||||
|
||||
def hash_verification_code(code: str) -> str:
|
||||
return hashlib.sha256(code.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def verify_verification_code(code: str, expected_hash: str) -> bool:
|
||||
return hmac.compare_digest(hash_verification_code(code), expected_hash)
|
||||
|
||||
|
||||
def _urlsafe_b64encode(raw: bytes) -> str:
|
||||
return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=")
|
||||
|
||||
|
||||
def create_access_token(user_id: int, email: str) -> Tuple[str, int]:
|
||||
expires_in = AUTH_TOKEN_EXPIRE_MINUTES * 60
|
||||
payload = {
|
||||
"sub": str(user_id),
|
||||
"email": email,
|
||||
"exp": int(time.time()) + expires_in,
|
||||
}
|
||||
payload_bytes = json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
|
||||
encoded_payload = _urlsafe_b64encode(payload_bytes)
|
||||
signature = hmac.new(
|
||||
AUTH_SECRET_KEY.encode("utf-8"),
|
||||
encoded_payload.encode("utf-8"),
|
||||
hashlib.sha256,
|
||||
).digest()
|
||||
token = f"{encoded_payload}.{_urlsafe_b64encode(signature)}"
|
||||
return token, expires_in
|
||||
Reference in New Issue
Block a user