Coverage for /usr/lib/python3.10/site-packages/hyd/backend/token/models.py: 86%

95 statements  

« prev     ^ index     » next       coverage.py v7.0.3, created at 2023-02-05 17:14 +0000

1import datetime as dt 

2 

3from fastapi import status 

4from pydantic import BaseModel 

5from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String 

6from sqlalchemy.orm import Mapped, Session, relationship 

7 

8from hyd.backend.db import EXTEND_EXISTING, DeclarativeMeta 

9from hyd.backend.util.const import ( 

10 LOGIN_DURATION_AFTER_LAST_REQUEST, 

11 MAX_LENGTH_STR_COMMENT, 

12 MAX_LENGTH_TOKEN_SCOPE, 

13) 

14from hyd.backend.util.models import ( 

15 BASE_API_RESPONSE_SCHEMA, 

16 DETAIL_STR, 

17 CommentStr, 

18 PrimaryKey, 

19 TimeStampMixin, 

20) 

21 

22UTC = dt.timezone.utc 

23 

24 

25#################################################################################################### 

26#### SQLAlchemy table definitions 

27#################################################################################################### 

28 

29 

30class TokenEntry(DeclarativeMeta, TimeStampMixin): 

31 __tablename__ = "token_table" 

32 __table_args__ = {"extend_existing": EXTEND_EXISTING} 

33 id: Mapped[PrimaryKey] = Column(Integer, primary_key=True) 

34 user_id: Mapped[PrimaryKey] = Column(Integer, ForeignKey("user_table.id")) 

35 is_login_token: Mapped[bool] = Column(Boolean) 

36 is_expired: Mapped[bool] = Column(Boolean, default=False) 

37 project_id: Mapped[PrimaryKey] = Column(Integer, ForeignKey("project_table.id"), nullable=True) 

38 _expires_on: Mapped[dt.datetime] = Column(DateTime, nullable=True) 

39 _last_request: Mapped[dt.datetime] = Column(DateTime, default=dt.datetime.utcnow) 

40 _revoked_at: Mapped[dt.datetime] = Column(DateTime, nullable=True, default=None) 

41 comment: Mapped[CommentStr] = Column(String(length=MAX_LENGTH_STR_COMMENT)) 

42 

43 scope_entries: Mapped[list["TokenScopeEntry"]] = relationship( 

44 "TokenScopeEntry", back_populates="token_entry", cascade="all,delete" 

45 ) 

46 user_entry: Mapped["UserEntry"] = relationship("UserEntry", back_populates="token_entries") 

47 

48 @property 

49 def expires_on(self) -> dt.datetime | None: 

50 return None if self._expires_on is None else self._expires_on.replace(tzinfo=UTC) 

51 

52 @expires_on.setter 

53 def expires_on(self, val: dt.datetime) -> None: 

54 self._expires_on = val 

55 

56 @property 

57 def last_request(self) -> dt.datetime: 

58 return self._last_request.replace(tzinfo=UTC) 

59 

60 @last_request.setter 

61 def last_request(self, val: dt.datetime) -> None: 

62 self._last_request = val 

63 

64 @property 

65 def revoked_at(self) -> dt.datetime | None: 

66 return None if self._revoked_at is None else self._revoked_at.replace(tzinfo=UTC) 

67 

68 @revoked_at.setter 

69 def revoked_at(self, val: dt.datetime) -> None: 

70 self._revoked_at = val 

71 

72 def check_expiration(self, *, db: Session) -> bool: 

73 if self.is_expired: 

74 return True 

75 

76 if self.is_login_token: 

77 # graciously expire token after expiration datetime is reached 

78 # and the last request is older than a given threshold 

79 

80 if dt.datetime.now(tz=UTC) < self.expires_on: 

81 return False 

82 if LOGIN_DURATION_AFTER_LAST_REQUEST < (dt.datetime.now(tz=UTC) - self.last_request): 

83 self.is_expired = True 

84 db.commit() 

85 return True 

86 else: 

87 # expire token after the given datetime is reached 

88 if self.expires_on is None: 

89 return False 

90 

91 if self.expires_on <= dt.datetime.now(tz=UTC): 

92 self.is_expired = True 

93 db.commit() 

94 return True 

95 

96 return False 

97 

98 

99class TokenScopeEntry(DeclarativeMeta): 

100 __tablename__ = "scope_table" 

101 __table_args__ = {"extend_existing": EXTEND_EXISTING} 

102 id = Column(Integer, primary_key=True) 

103 token_id = Column(Integer, ForeignKey("token_table.id")) 

104 scope = Column(String(length=MAX_LENGTH_TOKEN_SCOPE)) 

105 

106 token_entry = relationship("TokenEntry", back_populates="scope_entries") 

107 

108 

109#################################################################################################### 

110#### Response schema 

111#################################################################################################### 

112 

113 

114class TokenSchema(BaseModel): 

115 access_token: str 

116 token_type: str = "bearer" 

117 

118 

119class TokenResponseSchema(BaseModel): 

120 token_id: PrimaryKey 

121 user_id: PrimaryKey 

122 created_at: dt.datetime 

123 revoked_at: dt.datetime | None 

124 expires_on: dt.datetime | None 

125 is_expired: bool 

126 is_login_token: bool 

127 scopes: list[str] 

128 project_id: PrimaryKey | None 

129 comment: str 

130 

131 

132# TODO: Use the shorter version if it stops bugging sphinx! 

133# class FullTokenResponseSchemaBaseModel(TokenSchema, TokenResponseSchema): 

134# ... 

135class FullTokenResponseSchema(BaseModel): 

136 access_token: str 

137 token_type: str = "bearer" 

138 token_id: PrimaryKey 

139 user_id: PrimaryKey 

140 created_at: dt.datetime 

141 revoked_at: dt.datetime | None 

142 expires_on: dt.datetime | None 

143 is_expired: bool 

144 is_login_token: bool 

145 scopes: list[str] 

146 project_id: PrimaryKey | None 

147 comment: str 

148 

149 

150#################################################################################################### 

151#### OpenAPI definitions 

152#################################################################################################### 

153 

154 

155API_V1_CREATE__POST = { 

156 **BASE_API_RESPONSE_SCHEMA, 

157 status.HTTP_200_OK: {"model": FullTokenResponseSchema}, 

158 status.HTTP_400_BAD_REQUEST: DETAIL_STR, 

159} 

160 

161 

162API_V1_LIST__GET = { 

163 **BASE_API_RESPONSE_SCHEMA, 

164 status.HTTP_200_OK: {"model": list[TokenResponseSchema]}, 

165} 

166 

167 

168API_V1_REVOKE__PATCH = { 

169 **BASE_API_RESPONSE_SCHEMA, 

170 status.HTTP_200_OK: {"model": TokenResponseSchema}, 

171 status.HTTP_400_BAD_REQUEST: DETAIL_STR, 

172}