Source code for hyd.backend.security

from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from jose.jwt import decode, encode
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError

from hyd.backend.exc import HydError, VerificationError
from hyd.backend.util.const import ROOT_PATH, SECRET_KEY
from hyd.backend.util.models import NameStr, PrimaryKey

# https://docs.python.org/3/library/secrets.html#how-many-bytes-should-tokens-use
if len(SECRET_KEY) < 64:
    raise HydError("SECRET_KEY is too short, use 32 bytes or more!")

ALGORITHM = "HS256"

_pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


[docs]class Scopes: USER = "user" TOKEN = "token" PROJECT = "project" VERSION = "version" TAG = "tag"
SCOPES = [ Scopes.USER, Scopes.TOKEN, Scopes.PROJECT, Scopes.VERSION, Scopes.TAG, ] OAUTH2_SCHEME = OAuth2PasswordBearer( tokenUrl=ROOT_PATH + "/api/v1/user/login", scopes={ Scopes.USER: "Basic user operations.", Scopes.TOKEN: "Manage tokens.", Scopes.PROJECT: "Create, delete and list projects.", Scopes.VERSION: "Manage versions for a project.", Scopes.TAG: "Manage tags for a project.", }, )
[docs]class JWT(BaseModel): id: PrimaryKey user_id: PrimaryKey # key value pair just for debugging username: NameStr # key value pair just for debugging scopes: list[str] # key value pair just for debugging
[docs]def hash_password(*, password: str | bytes) -> bytes: return _pwd_context.hash(password).encode()
[docs]def verify_password(*, plain_password: str | bytes, hashed_password: str | bytes) -> bool: return _pwd_context.verify(plain_password, hashed_password)
[docs]def create_jwt( *, token_id: PrimaryKey, user_id: PrimaryKey, username: NameStr, scopes: list[str] ) -> str: return encode( {"jti": str(token_id), "uid": str(user_id), "sub": username, "scopes": scopes}, SECRET_KEY, algorithm=ALGORITHM, )
[docs]def verify_jwt(*, token: str) -> JWT: try: # check signature payload = decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError as err: raise VerificationError(err) try: # check for missing fields token_id: PrimaryKey = int(payload["jti"]) user_id: NameStr = int(payload["uid"]) username: NameStr = payload["sub"] scopes: list[str] = payload["scopes"] except KeyError as err: raise VerificationError(err) try: # check for data type validity return JWT(id=token_id, user_id=user_id, username=username, scopes=scopes) except ValidationError as err: raise VerificationError(err)