Coverage for /usr/lib/python3.10/site-packages/hyd/backend/token/api/v1.py: 48%

56 statements  

« prev     ^ index     » next       coverage.py v7.0.3, created at 2023-02-05 02:26 +0000

1import datetime as dt 

2 

3from fastapi import APIRouter, Depends, HTTPException, Security, status 

4from sqlalchemy.orm import Session 

5 

6from hyd.backend.db import get_db 

7from hyd.backend.exc import ( 

8 HTTPException_UNKNOWN_PROJECT, 

9 UnknownProjectError, 

10 UnknownTokenError, 

11) 

12from hyd.backend.project import service as project_services 

13from hyd.backend.security import Scopes, create_jwt 

14from hyd.backend.token.models import ( 

15 API_V1_CREATE__POST, 

16 API_V1_LIST__GET, 

17 API_V1_REVOKE__PATCH, 

18 FullTokenResponseSchema, 

19 TokenEntry, 

20 TokenResponseSchema, 

21 TokenSchema, 

22) 

23from hyd.backend.token.service import create_token, read_token, revoke_token_by_ref 

24from hyd.backend.user.authentication import authenticate_user 

25from hyd.backend.user.models import UserEntry 

26from hyd.backend.util.const import HEADERS 

27from hyd.backend.util.logger import HydLogger 

28from hyd.backend.util.models import PrimaryKey 

29 

30UTC = dt.timezone.utc 

31LOGGER = HydLogger("TokenAPI") 

32 

33v1_router = APIRouter(tags=["token"]) 

34 

35#################################################################################################### 

36#### HTTP Exceptions 

37#################################################################################################### 

38 

39HTTPException_TIMEZONE_AWARE = HTTPException( 

40 status_code=status.HTTP_400_BAD_REQUEST, 

41 detail="Expiration datetime must be timezone aware!", 

42 headers=HEADERS, 

43) 

44 

45HTTPException_UNKNOWN_TOKEN = HTTPException( 

46 status_code=status.HTTP_400_BAD_REQUEST, 

47 detail="Unknown token!", 

48 headers=HEADERS, 

49) 

50 

51HTTPException_TOKEN_ALREADY_REVOKED = HTTPException( 

52 status_code=status.HTTP_400_BAD_REQUEST, 

53 detail="Token has already been revoked!", 

54 headers=HEADERS, 

55) 

56 

57#################################################################################################### 

58#### Scope: TOKEN 

59#################################################################################################### 

60 

61 

62@v1_router.post("/create", responses=API_V1_CREATE__POST) 

63async def _create( 

64 project_id: PrimaryKey | None, 

65 expires_on: dt.datetime | dt.timedelta | None = None, 

66 comment: str = "", 

67 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.TOKEN]), 

68 db: Session = Depends(get_db), 

69) -> FullTokenResponseSchema: 

70 if isinstance(expires_on, dt.datetime): 

71 if expires_on.tzinfo is None: 

72 raise HTTPException_TIMEZONE_AWARE 

73 expires_on = expires_on.astimezone(UTC) 

74 elif isinstance(expires_on, dt.timedelta): 

75 expires_on = dt.datetime.now(tz=UTC) + expires_on 

76 

77 try: 

78 _ = project_services.read_project(project_id=project_id, db=db) 

79 except UnknownProjectError: 

80 raise HTTPException_UNKNOWN_PROJECT 

81 

82 user_id = user_entry.id 

83 scopes = [Scopes.PROJECT, Scopes.VERSION, Scopes.TAG] 

84 token_entry = create_token( 

85 user_id=user_id, 

86 expires_on=expires_on, 

87 scopes=scopes, 

88 is_login_token=False, 

89 project_id=project_id, 

90 comment=comment, 

91 db=db, 

92 ) 

93 

94 access_token: str = create_jwt( 

95 token_id=token_entry.id, user_id=user_id, username=user_entry.username, scopes=scopes 

96 ) 

97 

98 LOGGER.info( 

99 "{token_id: %d, user_id: %d, username: %s, project_id: %d}", 

100 user_entry.session_token_entry.id, 

101 user_entry.id, 

102 user_entry.username, 

103 project_id if project_id else 0, 

104 ) 

105 return FullTokenResponseSchema( 

106 access_token=access_token, 

107 token_id=token_entry.id, 

108 user_id=token_entry.user_id, 

109 created_at=token_entry.created_at, 

110 is_login_token=token_entry.is_login_token, 

111 is_expired=token_entry.is_expired, 

112 revoked_at=token_entry._revoked_at, 

113 comment=comment, 

114 scopes=[entry.scope for entry in token_entry.scope_entries], 

115 project_id=token_entry.project_id, 

116 ) 

117 

118 

119@v1_router.post("/list", responses=API_V1_LIST__GET) 

120async def _list( 

121 include_expired_revoked: bool = False, 

122 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.TOKEN]), 

123 db: Session = Depends(get_db), 

124) -> list[TokenResponseSchema]: 

125 if include_expired_revoked: 

126 return [_token_entry_to_response_schema(entry) for entry in user_entry.token_entries] 

127 else: 

128 return [ 

129 _token_entry_to_response_schema(entry) 

130 for entry in user_entry.token_entries 

131 if (not entry._revoked_at and not entry.check_expiration(db=db)) 

132 ] 

133 

134 

135@v1_router.patch("/revoke", responses=API_V1_REVOKE__PATCH) 

136async def _revoke( 

137 token_id: PrimaryKey, 

138 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.TOKEN]), 

139 db: Session = Depends(get_db), 

140) -> TokenResponseSchema: 

141 try: 

142 token_entry = read_token(token_id=token_id, db=db) 

143 except UnknownTokenError: 

144 raise HTTPException_UNKNOWN_TOKEN 

145 

146 if token_entry.revoked_at: 

147 raise HTTPException_TOKEN_ALREADY_REVOKED 

148 

149 revoke_token_by_ref(token_entry=token_entry, db=db) 

150 

151 LOGGER.info( 

152 "{token_id: %d, user_id: %d, username: %s, revoked_token_id: %d}", 

153 user_entry.session_token_entry.id, 

154 user_entry.id, 

155 user_entry.username, 

156 token_id, 

157 ) 

158 return _token_entry_to_response_schema(token_entry) 

159 

160 

161#################################################################################################### 

162#### Util 

163#################################################################################################### 

164 

165 

166def _token_entry_to_response_schema(token_entry: TokenEntry) -> TokenResponseSchema: 

167 return TokenResponseSchema( 

168 token_id=token_entry.id, 

169 user_id=token_entry.user_id, 

170 created_at=token_entry.created_at, 

171 is_login_token=token_entry.is_login_token, 

172 is_expired=token_entry.is_expired, 

173 revoked_at=token_entry._revoked_at, 

174 scopes=[entry.scope for entry in token_entry.scope_entries], 

175 project_id=token_entry.project_id, 

176 comment=token_entry.comment, 

177 )