Coverage for /usr/lib/python3.10/site-packages/hyd/backend/user/api/v1.py: 66%

62 statements  

« prev     ^ index     » next       coverage.py v7.0.3, created at 2023-01-05 16:38 +0000

1import datetime as dt 

2 

3from fastapi import APIRouter, Depends, HTTPException, Security, status 

4from fastapi.security import OAuth2PasswordRequestForm 

5from sqlalchemy.orm import Session 

6 

7import hyd.backend.token.service as token_service 

8from hyd.backend.db import get_db 

9from hyd.backend.exc import UnknownUserError 

10from hyd.backend.security import SCOPES, Scopes, create_jwt, verify_password 

11from hyd.backend.token.models import TokenSchema 

12from hyd.backend.user.authentication import ( 

13 HTTPException_USER_DISABLED, 

14 authenticate_user, 

15) 

16from hyd.backend.user.models import ( 

17 API_V1_CHANGE_PASSWORD__PATCH, 

18 API_V1_GREET__GET, 

19 API_V1_LOGIN__POST, 

20 API_V1_LOGOUT__PATCH, 

21 UserEntry, 

22 UserResponseSchema, 

23) 

24from hyd.backend.user.service import read_users_by_username, update_user_pw_by_ref 

25from hyd.backend.util.const import HEADERS, REMEMBER_ME_DURATION 

26from hyd.backend.util.logger import HydLogger 

27 

28UTC = dt.timezone.utc 

29LOGGER = HydLogger("UserAPI") 

30 

31v1_router = APIRouter(tags=["user"]) 

32 

33#################################################################################################### 

34#### HTTP Exceptions 

35#################################################################################################### 

36 

37HTTPException_CREDENTIALS = HTTPException( 

38 status_code=status.HTTP_401_UNAUTHORIZED, 

39 detail="Could not validate credentials", 

40 headers={"WWW-Authenticate": "Basic"}, 

41) 

42 

43HTTPException_PASSWORD_REPETITION = HTTPException( 

44 status_code=status.HTTP_401_UNAUTHORIZED, 

45 detail="The new password and the repetition must match each other!", 

46 headers=HEADERS, 

47) 

48 

49#################################################################################################### 

50#### Scope: No authentication required 

51#################################################################################################### 

52 

53 

54@v1_router.post("/login", responses=API_V1_LOGIN__POST) 

55async def _login( 

56 remember_me: bool = False, 

57 form_data: OAuth2PasswordRequestForm = Depends(), 

58 db: Session = Depends(get_db), 

59): 

60 username = form_data.username 

61 try: 

62 user_entry: UserEntry = read_users_by_username(username=username, db=db) 

63 except UnknownUserError: 

64 LOGGER.warning( 

65 "Unknown username {username: %s}", 

66 username, 

67 ) 

68 raise HTTPException_CREDENTIALS 

69 

70 if not verify_password( 

71 plain_password=form_data.password, hashed_password=user_entry.hashed_password 

72 ): 

73 LOGGER.warning( 

74 "Wrong password {user_id: %d, username: %s}", 

75 user_entry.id, 

76 username, 

77 ) 

78 raise HTTPException_CREDENTIALS 

79 

80 if user_entry.is_disabled: 

81 LOGGER.warning( 

82 "Disabled {user_id: %d, username: %s}", 

83 user_entry.id, 

84 username, 

85 ) 

86 raise HTTPException_USER_DISABLED 

87 

88 expires_on = dt.datetime.now(tz=UTC) 

89 if remember_me: 

90 expires_on = expires_on + REMEMBER_ME_DURATION 

91 

92 user_id = user_entry.id 

93 token_entry = token_service.create_token( 

94 user_id=user_id, 

95 expires_on=expires_on, 

96 scopes=SCOPES, 

97 is_login_token=True, 

98 project_id=None, 

99 comment="HYD_LOGIN_TOKEN", 

100 db=db, 

101 ) 

102 

103 access_token: str = create_jwt( 

104 token_id=token_entry.id, user_id=user_id, username=username, scopes=SCOPES 

105 ) 

106 LOGGER.info( 

107 "{token_id: %d, user_id: %d, username: %s}", 

108 token_entry.id, 

109 user_entry.id, 

110 username, 

111 ) 

112 return TokenSchema(access_token=access_token) 

113 

114 

115#################################################################################################### 

116#### Scope: USER 

117#################################################################################################### 

118 

119 

120@v1_router.patch("/logout", responses=API_V1_LOGOUT__PATCH) 

121async def _logout( 

122 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.USER]), 

123 db: Session = Depends(get_db), 

124): 

125 token_entry = user_entry.session_token_entry 

126 token_service.revoke_token_by_ref(token_entry=token_entry, db=db) 

127 

128 LOGGER.info( 

129 "{token_id: %d, user_id: %d, username: %s}", 

130 token_entry.id, 

131 user_entry.id, 

132 user_entry.username, 

133 ) 

134 return f"Logout {user_entry.username} :(" 

135 

136 

137@v1_router.patch("/change_password", responses=API_V1_CHANGE_PASSWORD__PATCH) 

138async def _change_password( 

139 current_password: str, 

140 new_password: str, 

141 new_password_repetition: str, 

142 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.USER]), 

143 db: Session = Depends(get_db), 

144): 

145 if not verify_password( 

146 plain_password=current_password, hashed_password=user_entry.hashed_password 

147 ): 

148 LOGGER.info( 

149 "{token_id: %d, user_id: %d, username: %s}", 

150 user_entry.session_token_entry.id, 

151 user_entry.id, 

152 user_entry.username, 

153 ) 

154 raise HTTPException_CREDENTIALS 

155 

156 if new_password != new_password_repetition: 

157 raise HTTPException_PASSWORD_REPETITION 

158 

159 LOGGER.info( 

160 "{token_id: %d, user_id: %d, username: %s}", 

161 user_entry.session_token_entry.id, 

162 user_entry.id, 

163 user_entry.username, 

164 ) 

165 update_user_pw_by_ref(user_entry=user_entry, new_password=new_password, db=db) 

166 

167 return f"Password changed!" 

168 

169 

170@v1_router.get("/greet", responses=API_V1_GREET__GET) 

171async def _greet(user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.USER])): 

172 return f"Hello {user_entry.username} :)" 

173 

174 

175#################################################################################################### 

176#### Util 

177#################################################################################################### 

178 

179# NOTE currently unused 

180def _user_entry_to_response_schema(user_entry: UserEntry) -> UserResponseSchema: 

181 return UserResponseSchema( 

182 id=user_entry.id, 

183 username=user_entry.username, 

184 is_admin=user_entry.is_admin, 

185 is_disabled=user_entry.is_disabled, 

186 created_at=user_entry.created_at, 

187 )