Coverage for /usr/lib/python3.10/site-packages/hyd/backend/user/api/v1.py: 66%
62 statements
« prev ^ index » next coverage.py v7.0.3, created at 2023-01-05 16:38 +0000
« prev ^ index » next coverage.py v7.0.3, created at 2023-01-05 16:38 +0000
1import datetime as dt
3from fastapi import APIRouter, Depends, HTTPException, Security, status
4from fastapi.security import OAuth2PasswordRequestForm
5from sqlalchemy.orm import Session
7import hyd.backend.token.service as token_service
8from hyd.backend.db import get_db
9from hyd.backend.exc import UnknownUserError
10from hyd.backend.security import SCOPES, Scopes, create_jwt, verify_password
11from hyd.backend.token.models import TokenSchema
12from hyd.backend.user.authentication import (
13 HTTPException_USER_DISABLED,
14 authenticate_user,
15)
16from hyd.backend.user.models import (
17 API_V1_CHANGE_PASSWORD__PATCH,
18 API_V1_GREET__GET,
19 API_V1_LOGIN__POST,
20 API_V1_LOGOUT__PATCH,
21 UserEntry,
22 UserResponseSchema,
23)
24from hyd.backend.user.service import read_users_by_username, update_user_pw_by_ref
25from hyd.backend.util.const import HEADERS, REMEMBER_ME_DURATION
26from hyd.backend.util.logger import HydLogger
28UTC = dt.timezone.utc
29LOGGER = HydLogger("UserAPI")
31v1_router = APIRouter(tags=["user"])
33####################################################################################################
34#### HTTP Exceptions
35####################################################################################################
37HTTPException_CREDENTIALS = HTTPException(
38 status_code=status.HTTP_401_UNAUTHORIZED,
39 detail="Could not validate credentials",
40 headers={"WWW-Authenticate": "Basic"},
41)
43HTTPException_PASSWORD_REPETITION = HTTPException(
44 status_code=status.HTTP_401_UNAUTHORIZED,
45 detail="The new password and the repetition must match each other!",
46 headers=HEADERS,
47)
49####################################################################################################
50#### Scope: No authentication required
51####################################################################################################
54@v1_router.post("/login", responses=API_V1_LOGIN__POST)
55async def _login(
56 remember_me: bool = False,
57 form_data: OAuth2PasswordRequestForm = Depends(),
58 db: Session = Depends(get_db),
59):
60 username = form_data.username
61 try:
62 user_entry: UserEntry = read_users_by_username(username=username, db=db)
63 except UnknownUserError:
64 LOGGER.warning(
65 "Unknown username {username: %s}",
66 username,
67 )
68 raise HTTPException_CREDENTIALS
70 if not verify_password(
71 plain_password=form_data.password, hashed_password=user_entry.hashed_password
72 ):
73 LOGGER.warning(
74 "Wrong password {user_id: %d, username: %s}",
75 user_entry.id,
76 username,
77 )
78 raise HTTPException_CREDENTIALS
80 if user_entry.is_disabled:
81 LOGGER.warning(
82 "Disabled {user_id: %d, username: %s}",
83 user_entry.id,
84 username,
85 )
86 raise HTTPException_USER_DISABLED
88 expires_on = dt.datetime.now(tz=UTC)
89 if remember_me:
90 expires_on = expires_on + REMEMBER_ME_DURATION
92 user_id = user_entry.id
93 token_entry = token_service.create_token(
94 user_id=user_id,
95 expires_on=expires_on,
96 scopes=SCOPES,
97 is_login_token=True,
98 project_id=None,
99 comment="HYD_LOGIN_TOKEN",
100 db=db,
101 )
103 access_token: str = create_jwt(
104 token_id=token_entry.id, user_id=user_id, username=username, scopes=SCOPES
105 )
106 LOGGER.info(
107 "{token_id: %d, user_id: %d, username: %s}",
108 token_entry.id,
109 user_entry.id,
110 username,
111 )
112 return TokenSchema(access_token=access_token)
115####################################################################################################
116#### Scope: USER
117####################################################################################################
120@v1_router.patch("/logout", responses=API_V1_LOGOUT__PATCH)
121async def _logout(
122 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.USER]),
123 db: Session = Depends(get_db),
124):
125 token_entry = user_entry.session_token_entry
126 token_service.revoke_token_by_ref(token_entry=token_entry, db=db)
128 LOGGER.info(
129 "{token_id: %d, user_id: %d, username: %s}",
130 token_entry.id,
131 user_entry.id,
132 user_entry.username,
133 )
134 return f"Logout {user_entry.username} :("
137@v1_router.patch("/change_password", responses=API_V1_CHANGE_PASSWORD__PATCH)
138async def _change_password(
139 current_password: str,
140 new_password: str,
141 new_password_repetition: str,
142 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.USER]),
143 db: Session = Depends(get_db),
144):
145 if not verify_password(
146 plain_password=current_password, hashed_password=user_entry.hashed_password
147 ):
148 LOGGER.info(
149 "{token_id: %d, user_id: %d, username: %s}",
150 user_entry.session_token_entry.id,
151 user_entry.id,
152 user_entry.username,
153 )
154 raise HTTPException_CREDENTIALS
156 if new_password != new_password_repetition:
157 raise HTTPException_PASSWORD_REPETITION
159 LOGGER.info(
160 "{token_id: %d, user_id: %d, username: %s}",
161 user_entry.session_token_entry.id,
162 user_entry.id,
163 user_entry.username,
164 )
165 update_user_pw_by_ref(user_entry=user_entry, new_password=new_password, db=db)
167 return f"Password changed!"
170@v1_router.get("/greet", responses=API_V1_GREET__GET)
171async def _greet(user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.USER])):
172 return f"Hello {user_entry.username} :)"
175####################################################################################################
176#### Util
177####################################################################################################
179# NOTE currently unused
180def _user_entry_to_response_schema(user_entry: UserEntry) -> UserResponseSchema:
181 return UserResponseSchema(
182 id=user_entry.id,
183 username=user_entry.username,
184 is_admin=user_entry.is_admin,
185 is_disabled=user_entry.is_disabled,
186 created_at=user_entry.created_at,
187 )