Coverage for /usr/lib/python3.10/site-packages/hyd/backend/token/api/v1.py: 48%
56 statements
« prev ^ index » next coverage.py v7.0.3, created at 2023-02-05 02:26 +0000
« prev ^ index » next coverage.py v7.0.3, created at 2023-02-05 02:26 +0000
1import datetime as dt
3from fastapi import APIRouter, Depends, HTTPException, Security, status
4from sqlalchemy.orm import Session
6from hyd.backend.db import get_db
7from hyd.backend.exc import (
8 HTTPException_UNKNOWN_PROJECT,
9 UnknownProjectError,
10 UnknownTokenError,
11)
12from hyd.backend.project import service as project_services
13from hyd.backend.security import Scopes, create_jwt
14from hyd.backend.token.models import (
15 API_V1_CREATE__POST,
16 API_V1_LIST__GET,
17 API_V1_REVOKE__PATCH,
18 FullTokenResponseSchema,
19 TokenEntry,
20 TokenResponseSchema,
21 TokenSchema,
22)
23from hyd.backend.token.service import create_token, read_token, revoke_token_by_ref
24from hyd.backend.user.authentication import authenticate_user
25from hyd.backend.user.models import UserEntry
26from hyd.backend.util.const import HEADERS
27from hyd.backend.util.logger import HydLogger
28from hyd.backend.util.models import PrimaryKey
30UTC = dt.timezone.utc
31LOGGER = HydLogger("TokenAPI")
33v1_router = APIRouter(tags=["token"])
35####################################################################################################
36#### HTTP Exceptions
37####################################################################################################
39HTTPException_TIMEZONE_AWARE = HTTPException(
40 status_code=status.HTTP_400_BAD_REQUEST,
41 detail="Expiration datetime must be timezone aware!",
42 headers=HEADERS,
43)
45HTTPException_UNKNOWN_TOKEN = HTTPException(
46 status_code=status.HTTP_400_BAD_REQUEST,
47 detail="Unknown token!",
48 headers=HEADERS,
49)
51HTTPException_TOKEN_ALREADY_REVOKED = HTTPException(
52 status_code=status.HTTP_400_BAD_REQUEST,
53 detail="Token has already been revoked!",
54 headers=HEADERS,
55)
57####################################################################################################
58#### Scope: TOKEN
59####################################################################################################
62@v1_router.post("/create", responses=API_V1_CREATE__POST)
63async def _create(
64 project_id: PrimaryKey | None,
65 expires_on: dt.datetime | dt.timedelta | None = None,
66 comment: str = "",
67 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.TOKEN]),
68 db: Session = Depends(get_db),
69) -> FullTokenResponseSchema:
70 if isinstance(expires_on, dt.datetime):
71 if expires_on.tzinfo is None:
72 raise HTTPException_TIMEZONE_AWARE
73 expires_on = expires_on.astimezone(UTC)
74 elif isinstance(expires_on, dt.timedelta):
75 expires_on = dt.datetime.now(tz=UTC) + expires_on
77 try:
78 _ = project_services.read_project(project_id=project_id, db=db)
79 except UnknownProjectError:
80 raise HTTPException_UNKNOWN_PROJECT
82 user_id = user_entry.id
83 scopes = [Scopes.PROJECT, Scopes.VERSION, Scopes.TAG]
84 token_entry = create_token(
85 user_id=user_id,
86 expires_on=expires_on,
87 scopes=scopes,
88 is_login_token=False,
89 project_id=project_id,
90 comment=comment,
91 db=db,
92 )
94 access_token: str = create_jwt(
95 token_id=token_entry.id, user_id=user_id, username=user_entry.username, scopes=scopes
96 )
98 LOGGER.info(
99 "{token_id: %d, user_id: %d, username: %s, project_id: %d}",
100 user_entry.session_token_entry.id,
101 user_entry.id,
102 user_entry.username,
103 project_id if project_id else 0,
104 )
105 return FullTokenResponseSchema(
106 access_token=access_token,
107 token_id=token_entry.id,
108 user_id=token_entry.user_id,
109 created_at=token_entry.created_at,
110 is_login_token=token_entry.is_login_token,
111 is_expired=token_entry.is_expired,
112 revoked_at=token_entry._revoked_at,
113 comment=comment,
114 scopes=[entry.scope for entry in token_entry.scope_entries],
115 project_id=token_entry.project_id,
116 )
119@v1_router.post("/list", responses=API_V1_LIST__GET)
120async def _list(
121 include_expired_revoked: bool = False,
122 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.TOKEN]),
123 db: Session = Depends(get_db),
124) -> list[TokenResponseSchema]:
125 if include_expired_revoked:
126 return [_token_entry_to_response_schema(entry) for entry in user_entry.token_entries]
127 else:
128 return [
129 _token_entry_to_response_schema(entry)
130 for entry in user_entry.token_entries
131 if (not entry._revoked_at and not entry.check_expiration(db=db))
132 ]
135@v1_router.patch("/revoke", responses=API_V1_REVOKE__PATCH)
136async def _revoke(
137 token_id: PrimaryKey,
138 user_entry: UserEntry = Security(authenticate_user, scopes=[Scopes.TOKEN]),
139 db: Session = Depends(get_db),
140) -> TokenResponseSchema:
141 try:
142 token_entry = read_token(token_id=token_id, db=db)
143 except UnknownTokenError:
144 raise HTTPException_UNKNOWN_TOKEN
146 if token_entry.revoked_at:
147 raise HTTPException_TOKEN_ALREADY_REVOKED
149 revoke_token_by_ref(token_entry=token_entry, db=db)
151 LOGGER.info(
152 "{token_id: %d, user_id: %d, username: %s, revoked_token_id: %d}",
153 user_entry.session_token_entry.id,
154 user_entry.id,
155 user_entry.username,
156 token_id,
157 )
158 return _token_entry_to_response_schema(token_entry)
161####################################################################################################
162#### Util
163####################################################################################################
166def _token_entry_to_response_schema(token_entry: TokenEntry) -> TokenResponseSchema:
167 return TokenResponseSchema(
168 token_id=token_entry.id,
169 user_id=token_entry.user_id,
170 created_at=token_entry.created_at,
171 is_login_token=token_entry.is_login_token,
172 is_expired=token_entry.is_expired,
173 revoked_at=token_entry._revoked_at,
174 scopes=[entry.scope for entry in token_entry.scope_entries],
175 project_id=token_entry.project_id,
176 comment=token_entry.comment,
177 )