Source code for hyd.backend.user.authentication

import datetime as dt

from fastapi import Depends, HTTPException, status
from fastapi.security import SecurityScopes
from sqlalchemy.orm import Session

import hyd.backend.token.service as token_service
from hyd.backend.db import get_db
from hyd.backend.exc import (
    HTTPException_NO_PERMISSION,
    HTTPException_USER_DISABLED,
    VerificationError,
)
from hyd.backend.security import JWT, OAUTH2_SCHEME, verify_jwt
from hyd.backend.token.models import TokenEntry
from hyd.backend.user.models import UserEntry
from hyd.backend.util.const import HEADERS
from hyd.backend.util.logger import HydLogger

UTC = dt.timezone.utc
LOGGER = HydLogger("Authentication")

####################################################################################################
#### HTTP Exceptions
####################################################################################################

HTTPException_VALIDATION = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="Token unknown, expired, revoked or corrupted!",
    headers=HEADERS,
)

####################################################################################################
#### Authentication logic
####################################################################################################


[docs]async def authenticate_user( security_scopes: SecurityScopes, token: str = Depends(OAUTH2_SCHEME), db: Session = Depends(get_db), ) -> UserEntry: user_entry, _token_entry = _authenticate(security_scopes, token, db) return user_entry
def _authenticate( security_scopes: SecurityScopes, token: str, db: Session ) -> tuple[UserEntry, TokenEntry]: try: jwt: JWT = verify_jwt(token=token) except VerificationError as err: LOGGER.warning( "Token verification failed! {token: %s, error: %s}", token, err, ) raise HTTPException_VALIDATION token_entry: TokenEntry = token_service.read_token(token_id=jwt.id, db=db) # TODO raise if None if token_entry is None: LOGGER.warning( "Verified token not found in database! {token: %s, error: %s}", token, err, ) raise HTTPException_VALIDATION permitted_scopes: list[str] = [entry.scope for entry in token_entry.scope_entries] user_entry: UserEntry = token_entry.user_entry # check if an user is disabled, because one login tokens expire while disabling an user if user_entry.is_disabled: LOGGER.warning( "Token belongs to deactivated user! {token_id: %d, user_id: %d, username: %s}", token_entry.id, user_entry.id, user_entry.username, ) raise HTTPException_USER_DISABLED # check if a token is expired, login and api tokens if token_entry.revoked_at or token_entry.check_expiration(db=db): LOGGER.warning( "Revoked or expired token used! {token_id: %d, user_id: %d, username: %s}", token_entry.id, user_entry.id, user_entry.username, ) raise HTTPException_VALIDATION # check scopes for permission handling for scope in security_scopes.scopes: if scope not in permitted_scopes: LOGGER.warning( "Token lacks scopes! {token_id: %d, user_id: %d, username: %s}", token_entry.id, user_entry.id, user_entry.username, ) raise HTTPException_NO_PERMISSION user_entry._session_token_entry = token_entry user_entry._session_permitted_scopes = permitted_scopes token_entry._last_request = dt.datetime.now(tz=UTC) LOGGER.info( "{token_id: %d, user_id: %d, username: %s, scopes: %s}", jwt.id, user_entry.id, user_entry.username, permitted_scopes, ) return user_entry, token_entry