import datetime as dt
from fastapi import status
from pydantic import BaseModel
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, Session, relationship
from hyd.backend.db import EXTEND_EXISTING, DeclarativeMeta
from hyd.backend.util.const import (
LOGIN_DURATION_AFTER_LAST_REQUEST,
MAX_LENGTH_STR_COMMENT,
MAX_LENGTH_TOKEN_SCOPE,
)
from hyd.backend.util.models import (
BASE_API_RESPONSE_SCHEMA,
DETAIL_STR,
CommentStr,
PrimaryKey,
TimeStampMixin,
)
UTC = dt.timezone.utc
####################################################################################################
#### SQLAlchemy table definitions
####################################################################################################
[docs]class TokenEntry(DeclarativeMeta, TimeStampMixin):
__tablename__ = "token_table"
__table_args__ = {"extend_existing": EXTEND_EXISTING}
id: Mapped[PrimaryKey] = Column(Integer, primary_key=True)
user_id: Mapped[PrimaryKey] = Column(Integer, ForeignKey("user_table.id"))
is_login_token: Mapped[bool] = Column(Boolean)
is_expired: Mapped[bool] = Column(Boolean, default=False)
project_id: Mapped[PrimaryKey] = Column(Integer, ForeignKey("project_table.id"), nullable=True)
_expires_on: Mapped[dt.datetime] = Column(DateTime, nullable=True)
_last_request: Mapped[dt.datetime] = Column(DateTime, default=dt.datetime.utcnow)
_revoked_at: Mapped[dt.datetime] = Column(DateTime, nullable=True, default=None)
comment: Mapped[CommentStr] = Column(String(length=MAX_LENGTH_STR_COMMENT))
scope_entries: Mapped[list["TokenScopeEntry"]] = relationship(
"TokenScopeEntry", back_populates="token_entry", cascade="all,delete"
)
user_entry: Mapped["UserEntry"] = relationship("UserEntry", back_populates="token_entries")
@property
def expires_on(self) -> dt.datetime | None:
return None if self._expires_on is None else self._expires_on.replace(tzinfo=UTC)
@expires_on.setter
def expires_on(self, val: dt.datetime) -> None:
self._expires_on = val
@property
def last_request(self) -> dt.datetime:
return self._last_request.replace(tzinfo=UTC)
@last_request.setter
def last_request(self, val: dt.datetime) -> None:
self._last_request = val
@property
def revoked_at(self) -> dt.datetime | None:
return None if self._revoked_at is None else self._revoked_at.replace(tzinfo=UTC)
@revoked_at.setter
def revoked_at(self, val: dt.datetime) -> None:
self._revoked_at = val
def check_expiration(self, *, db: Session) -> bool:
if self.is_expired:
return True
if self.is_login_token:
# graciously expire token after expiration datetime is reached
# and the last request is older than a given threshold
if dt.datetime.now(tz=UTC) < self.expires_on:
return False
if LOGIN_DURATION_AFTER_LAST_REQUEST < (dt.datetime.now(tz=UTC) - self.last_request):
self.is_expired = True
db.commit()
return True
else:
# expire token after the given datetime is reached
if self.expires_on is None:
return False
if self.expires_on <= dt.datetime.now(tz=UTC):
self.is_expired = True
db.commit()
return True
return False
[docs]class TokenScopeEntry(DeclarativeMeta):
__tablename__ = "scope_table"
__table_args__ = {"extend_existing": EXTEND_EXISTING}
id = Column(Integer, primary_key=True)
token_id = Column(Integer, ForeignKey("token_table.id"))
scope = Column(String(length=MAX_LENGTH_TOKEN_SCOPE))
token_entry = relationship("TokenEntry", back_populates="scope_entries")
####################################################################################################
#### Response schema
####################################################################################################
[docs]class TokenSchema(BaseModel):
access_token: str
token_type: str = "bearer"
[docs]class TokenResponseSchema(BaseModel):
token_id: PrimaryKey
user_id: PrimaryKey
created_at: dt.datetime
revoked_at: dt.datetime | None
expires_on: dt.datetime | None
is_expired: bool
is_login_token: bool
scopes: list[str]
project_id: PrimaryKey | None
comment: str
# TODO: Use the shorter version if it stops bugging sphinx!
# class FullTokenResponseSchemaBaseModel(TokenSchema, TokenResponseSchema):
# ...
[docs]class FullTokenResponseSchema(BaseModel):
access_token: str
token_type: str = "bearer"
token_id: PrimaryKey
user_id: PrimaryKey
created_at: dt.datetime
revoked_at: dt.datetime | None
expires_on: dt.datetime | None
is_expired: bool
is_login_token: bool
scopes: list[str]
project_id: PrimaryKey | None
comment: str
####################################################################################################
#### OpenAPI definitions
####################################################################################################
API_V1_CREATE__POST = {
**BASE_API_RESPONSE_SCHEMA,
status.HTTP_200_OK: {"model": FullTokenResponseSchema},
status.HTTP_400_BAD_REQUEST: DETAIL_STR,
}
API_V1_LIST__GET = {
**BASE_API_RESPONSE_SCHEMA,
status.HTTP_200_OK: {"model": list[TokenResponseSchema]},
}
API_V1_REVOKE__PATCH = {
**BASE_API_RESPONSE_SCHEMA,
status.HTTP_200_OK: {"model": TokenResponseSchema},
status.HTTP_400_BAD_REQUEST: DETAIL_STR,
}