Coverage for /usr/lib/python3.10/site-packages/hyd/backend/token/models.py: 86%
95 statements
« prev ^ index » next coverage.py v7.0.3, created at 2023-02-05 17:14 +0000
« prev ^ index » next coverage.py v7.0.3, created at 2023-02-05 17:14 +0000
1import datetime as dt
3from fastapi import status
4from pydantic import BaseModel
5from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
6from sqlalchemy.orm import Mapped, Session, relationship
8from hyd.backend.db import EXTEND_EXISTING, DeclarativeMeta
9from hyd.backend.util.const import (
10 LOGIN_DURATION_AFTER_LAST_REQUEST,
11 MAX_LENGTH_STR_COMMENT,
12 MAX_LENGTH_TOKEN_SCOPE,
13)
14from hyd.backend.util.models import (
15 BASE_API_RESPONSE_SCHEMA,
16 DETAIL_STR,
17 CommentStr,
18 PrimaryKey,
19 TimeStampMixin,
20)
22UTC = dt.timezone.utc
25####################################################################################################
26#### SQLAlchemy table definitions
27####################################################################################################
30class TokenEntry(DeclarativeMeta, TimeStampMixin):
31 __tablename__ = "token_table"
32 __table_args__ = {"extend_existing": EXTEND_EXISTING}
33 id: Mapped[PrimaryKey] = Column(Integer, primary_key=True)
34 user_id: Mapped[PrimaryKey] = Column(Integer, ForeignKey("user_table.id"))
35 is_login_token: Mapped[bool] = Column(Boolean)
36 is_expired: Mapped[bool] = Column(Boolean, default=False)
37 project_id: Mapped[PrimaryKey] = Column(Integer, ForeignKey("project_table.id"), nullable=True)
38 _expires_on: Mapped[dt.datetime] = Column(DateTime, nullable=True)
39 _last_request: Mapped[dt.datetime] = Column(DateTime, default=dt.datetime.utcnow)
40 _revoked_at: Mapped[dt.datetime] = Column(DateTime, nullable=True, default=None)
41 comment: Mapped[CommentStr] = Column(String(length=MAX_LENGTH_STR_COMMENT))
43 scope_entries: Mapped[list["TokenScopeEntry"]] = relationship(
44 "TokenScopeEntry", back_populates="token_entry", cascade="all,delete"
45 )
46 user_entry: Mapped["UserEntry"] = relationship("UserEntry", back_populates="token_entries")
48 @property
49 def expires_on(self) -> dt.datetime | None:
50 return None if self._expires_on is None else self._expires_on.replace(tzinfo=UTC)
52 @expires_on.setter
53 def expires_on(self, val: dt.datetime) -> None:
54 self._expires_on = val
56 @property
57 def last_request(self) -> dt.datetime:
58 return self._last_request.replace(tzinfo=UTC)
60 @last_request.setter
61 def last_request(self, val: dt.datetime) -> None:
62 self._last_request = val
64 @property
65 def revoked_at(self) -> dt.datetime | None:
66 return None if self._revoked_at is None else self._revoked_at.replace(tzinfo=UTC)
68 @revoked_at.setter
69 def revoked_at(self, val: dt.datetime) -> None:
70 self._revoked_at = val
72 def check_expiration(self, *, db: Session) -> bool:
73 if self.is_expired:
74 return True
76 if self.is_login_token:
77 # graciously expire token after expiration datetime is reached
78 # and the last request is older than a given threshold
80 if dt.datetime.now(tz=UTC) < self.expires_on:
81 return False
82 if LOGIN_DURATION_AFTER_LAST_REQUEST < (dt.datetime.now(tz=UTC) - self.last_request):
83 self.is_expired = True
84 db.commit()
85 return True
86 else:
87 # expire token after the given datetime is reached
88 if self.expires_on is None:
89 return False
91 if self.expires_on <= dt.datetime.now(tz=UTC):
92 self.is_expired = True
93 db.commit()
94 return True
96 return False
99class TokenScopeEntry(DeclarativeMeta):
100 __tablename__ = "scope_table"
101 __table_args__ = {"extend_existing": EXTEND_EXISTING}
102 id = Column(Integer, primary_key=True)
103 token_id = Column(Integer, ForeignKey("token_table.id"))
104 scope = Column(String(length=MAX_LENGTH_TOKEN_SCOPE))
106 token_entry = relationship("TokenEntry", back_populates="scope_entries")
109####################################################################################################
110#### Response schema
111####################################################################################################
114class TokenSchema(BaseModel):
115 access_token: str
116 token_type: str = "bearer"
119class TokenResponseSchema(BaseModel):
120 token_id: PrimaryKey
121 user_id: PrimaryKey
122 created_at: dt.datetime
123 revoked_at: dt.datetime | None
124 expires_on: dt.datetime | None
125 is_expired: bool
126 is_login_token: bool
127 scopes: list[str]
128 project_id: PrimaryKey | None
129 comment: str
132# TODO: Use the shorter version if it stops bugging sphinx!
133# class FullTokenResponseSchemaBaseModel(TokenSchema, TokenResponseSchema):
134# ...
135class FullTokenResponseSchema(BaseModel):
136 access_token: str
137 token_type: str = "bearer"
138 token_id: PrimaryKey
139 user_id: PrimaryKey
140 created_at: dt.datetime
141 revoked_at: dt.datetime | None
142 expires_on: dt.datetime | None
143 is_expired: bool
144 is_login_token: bool
145 scopes: list[str]
146 project_id: PrimaryKey | None
147 comment: str
150####################################################################################################
151#### OpenAPI definitions
152####################################################################################################
155API_V1_CREATE__POST = {
156 **BASE_API_RESPONSE_SCHEMA,
157 status.HTTP_200_OK: {"model": FullTokenResponseSchema},
158 status.HTTP_400_BAD_REQUEST: DETAIL_STR,
159}
162API_V1_LIST__GET = {
163 **BASE_API_RESPONSE_SCHEMA,
164 status.HTTP_200_OK: {"model": list[TokenResponseSchema]},
165}
168API_V1_REVOKE__PATCH = {
169 **BASE_API_RESPONSE_SCHEMA,
170 status.HTTP_200_OK: {"model": TokenResponseSchema},
171 status.HTTP_400_BAD_REQUEST: DETAIL_STR,
172}