Coverage for server/bp/oauth.py: 57%

281 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-01-26 23:58 +0100

1 

2from flask import Blueprint, request, url_for, render_template, redirect, render_template_string, send_file 

3 

4import time 

5from flask import Blueprint, request, session, url_for 

6from flask import render_template, redirect, jsonify 

7from werkzeug.security import gen_salt 

8 

9from authlib.integrations.flask_oauth2 import current_token 

10from authlib.oauth2 import OAuth2Error 

11 

12# VR 11/11/25 copy all from lib.auth_oauth.oauth2 

13#from lib.auth_oauth.models import db, User, OAuth2Client 

14#from lib.auth_oauth.oauth2 import authorization, require_oauth 

15 

16 

17 

18 

19 

20 

21import os 

22from flask import Flask 

23 

24# New library 11/11/25 from https://github.com/authlib/example-oauth2-server/blob/master/website/ 

25#from lib.auth_oauth.models import db 

26 

27 

28# 11/11/25 FROM https://github.com/authlib/example-oauth2-server/blob/master/website/routes.py 

29 

30# Maybe for connexion 

31def current_user_loc(): 

32 from flask_login import current_user 

33 return current_user 

34# VR TODO 11/11/25 Proposed by authlib/example-oauth2-server 

35# if 'id' in session: 

36# uid = session['id'] 

37# return User.query.get(uid) 

38# return None 

39 

40# For client creation : non blocking because we hard-code client in first dev steps 

41def split_by_crlf(s): 

42 return [v for v in s.splitlines() if v] 

43 

44 

45 

46def create_oauth_bp(app, db): 

47 from flask_openapi3 import APIBlueprint 

48 bp = APIBlueprint('oauth', __name__, url_prefix='') 

49 from sqlalchemy.orm import relationship 

50 

51 # Use the db instance passed as parameter, not creating new engine/base 

52 

53# dupliquer 

54# class User(Base): 

55# __table__ = Table("flask_user", Base.metadata, autoload_with=engine, schema='mtruser') 

56 

57 # from server.safia import User 

58 

59 

60 

61 @bp.route("/oauth_index") 

62 def index(): 

63 return "This is just a test that needs to be renamed !" 

64 

65 from flask_sqlalchemy import SQLAlchemy 

66 

67 # Legacy 

68# from flask_oauthlib.provider import OAuth2Provider 

69 

70 

71 # New library 11/11/25 => but this is in sub files in lib.auth_oauth 

72# from authlib.integrations.flask_oauth2 import AuthorizationServer 

73# from authlib.oauth2 import OAuth2Error 

74 

75 

76 # legacy 

77# oauth = OAuth2Provider(app) 

78# oauth = OAuth2ClientMixin(app) 

79 app.debug = True 

80 

81 # COPY VR 11/11/25 Moved from models 

82 import time 

83 from authlib.integrations.sqla_oauth2 import ( 

84 OAuth2ClientMixin, 

85 OAuth2AuthorizationCodeMixin, 

86 OAuth2TokenMixin, 

87 ) 

88 

89 # CA c'est Claude le 11/11/25 : Note: User model is defined in auth.py 

90 # We retrieve it from the db.Model.registry or define a minimal reference 

91 # Since we're using string-based foreign keys, we don't need the actual class 

92 # But for query operations, we need to get the User model 

93 # The safest way is to query it dynamically 

94 

95 # We'll define a helper to get User model at runtime 

96 User = None # Will be set later when needed 

97 

98 def get_user_model(): 

99 """Get User model from the registry at runtime""" 

100 # Import here to avoid circular dependency 

101 import sys 

102 if 'server.safia' in sys.modules: 

103 return sys.modules['server.safia'].User 

104 # Fallback: search in registry 

105 for attr_name in dir(db.Model): 

106 attr = getattr(db.Model, attr_name, None) 

107 if hasattr(attr, '__tablename__') and attr.__tablename__ == 'flask_user': 

108 return attr 

109 return None 

110 

111 class OAuth2Client(db.Model, OAuth2ClientMixin): 

112 __tablename__ = 'oauth2_client' 

113 __table_args__ = {'extend_existing': True, 

114 "schema": "mtruser"} 

115 

116 id = db.Column(db.Integer, primary_key=True) 

117 # Use string reference for ForeignKey to avoid circular import issues 

118 user_id = db.Column( 

119 db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE') 

120 ) 

121 # Use string reference in relationship as well 

122 user = db.relationship('User', foreign_keys=[user_id]) 

123 

124 

125# user_id = db.Column(db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE')) 

126# user = db.relationship('User', backref='oauth2_client', foreign_keys=[user_id]) 

127 

128# sqlalchemy.exc.InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'Mapper[OAuth2Client(oauth2_client)]'. Original exception was: Error creating backref 'oauth2_client' on relationship 'OAuth2Client.user': property of that name exists on mapper 'Mapper[User(flask_user)]' 

129 

130# GPT pourquoi un s user = db.relationship('User', backref='oauth2_clients', foreign_keys=[user_id]) 

131 

132 

133 class OAuth2AuthorizationCode(db.Model, OAuth2AuthorizationCodeMixin): 

134 __tablename__ = 'oauth2_code' 

135 __table_args__ = {'extend_existing': True, 

136 "schema": "mtruser"} 

137 

138 id = db.Column(db.Integer, primary_key=True) 

139 # Use string reference for ForeignKey to avoid circular import issues 

140 user_id = db.Column( 

141 db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE') 

142 ) 

143 user = db.relationship('User', foreign_keys=[user_id]) 

144 

145 

146# user_id = db.Column(db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE')) 

147# user = db.relationship('User', foreign_keys=[user_id]) 

148 

149 class OAuth2Token(db.Model, OAuth2TokenMixin): 

150 __tablename__ = 'oauth2_token' 

151 __table_args__ = {'extend_existing': True, 

152 "schema": "mtruser"} 

153 

154 id = db.Column(db.Integer, primary_key=True) 

155 # user_id = db.Column( 

156 # db.Integer, db.ForeignKey('user.id', ondelete='CASCADE')) 

157 # user = db.relationship('User') 

158 

159 

160 user_id = db.Column(db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE')) 

161 user = db.relationship('User', foreign_keys=[user_id]) 

162 

163 def is_refresh_token_active(self): 

164 # access_token_revoked_at | refresh_token_revoked_at 

165 

166# if self.revoked: 

167# return False 

168 expires_at = self.issued_at + self.expires_in * 2 

169 return expires_at >= time.time() 

170 

171 #from server.safia import app 

172 with app.app_context(): 

173 db.create_all() # extend_existing = True 

174 db.session.commit() 

175 

176 # COPY VR 11/11/25 Moved from oauth2 in lib.auth_oauth 

177 from authlib.integrations.flask_oauth2 import ( 

178 AuthorizationServer, 

179 ResourceProtector, 

180) 

181 from authlib.integrations.sqla_oauth2 import ( 

182 create_query_client_func, 

183 create_save_token_func, 

184 create_revocation_endpoint, 

185 create_bearer_token_validator, 

186 ) 

187 from authlib.oauth2.rfc6749 import grants 

188 from authlib.oauth2.rfc7636 import CodeChallenge 

189 #from .models import db, User 

190 #from .models import OAuth2Client, OAuth2AuthorizationCode, OAuth2Token 

191 

192# 'client_secret_post', 'none', 

193 class AuthorizationCodeGrant(grants.AuthorizationCodeGrant): 

194 TOKEN_ENDPOINT_AUTH_METHODS = [ 

195 'client_secret_basic', 'client_secret_post', 'none' 

196 ] 

197 

198 def save_authorization_code(self, code, request): 

199 code_challenge = request.data.get('code_challenge') 

200 code_challenge_method = request.data.get('code_challenge_method') 

201 auth_code = OAuth2AuthorizationCode( 

202 code=code, 

203 client_id=request.client.client_id, 

204 redirect_uri=request.redirect_uri, 

205 scope=request.scope, 

206 user_id=request.user.id, 

207 code_challenge=code_challenge, 

208 code_challenge_method=code_challenge_method, 

209 ) 

210 db.session.add(auth_code) 

211 db.session.commit() 

212 return auth_code 

213 

214 def query_authorization_code(self, code, client): 

215 auth_code = OAuth2AuthorizationCode.query.filter_by( 

216 code=code, client_id=client.client_id).first() 

217 if auth_code and not auth_code.is_expired(): 

218 return auth_code 

219 

220 def delete_authorization_code(self, authorization_code): 

221 db.session.delete(authorization_code) 

222 db.session.commit() 

223 

224 def authenticate_user(self, authorization_code): 

225 UserModel = get_user_model() 

226 if not UserModel: 

227 return None 

228 # Use db.session.get() which works across different SQLAlchemy instances 

229 return db.session.get(UserModel, authorization_code.user_id) 

230 

231 

232 class PasswordGrant(grants.ResourceOwnerPasswordCredentialsGrant): 

233 def authenticate_user(self, username, password): 

234 UserModel = get_user_model() 

235 if not UserModel: 

236 return None 

237 # Use db.session.execute() with select() for SQLAlchemy 2.0 compatibility 

238 user = db.session.execute( 

239 db.select(UserModel).filter_by(username=username) 

240 ).scalar_one_or_none() 

241 if user is not None and user.check_password(password): 

242 return user 

243 return None 

244 

245 

246 class RefreshTokenGrant(grants.RefreshTokenGrant): 

247 INCLUDE_NEW_REFRESH_TOKEN = True 

248 TOKEN_ENDPOINT_AUTH_METHODS = [ 

249 'client_secret_basic', 'client_secret_post', 'none' 

250 ] 

251 

252 def authenticate_refresh_token(self, refresh_token): 

253 token = OAuth2Token.query.filter_by(refresh_token=refresh_token).first() 

254 if token and token.is_refresh_token_active(): 

255 return token 

256 

257 def authenticate_user(self, credential): 

258 UserModel = get_user_model() 

259 if not UserModel: 

260 return None 

261 # Use db.session.get() which works across different SQLAlchemy instances 

262 return db.session.get(UserModel, credential.user_id) 

263 

264 def revoke_old_credential(self, credential): 

265 credential.revoked = True 

266 db.session.add(credential) 

267 db.session.commit() 

268 

269 

270 query_client = create_query_client_func(db.session, OAuth2Client) 

271 save_token = create_save_token_func(db.session, OAuth2Token) 

272 

273 

274 # VR TODO bullshit de duck.ai 

275 # Autoriser la génération d'un nouveau refresh token 

276 class MyAuthorizationServer(AuthorizationServer): 

277 def create_refresh_token(self, client_id, grant_type): 

278 # Logique pour créer un nouveau refresh token 

279 return super().create_refresh_token(client_id, grant_type) 

280# authorization = MyAuthorizationServer( 

281# query_client=query_client, 

282# save_token=save_token, 

283# ) 

284 

285 authorization = AuthorizationServer( 

286 query_client=query_client, 

287 save_token=save_token, 

288 ) 

289 

290 require_oauth = ResourceProtector() 

291 

292 def config_oauth(app): 

293 authorization.init_app(app) 

294 

295 # support all grants 

296 authorization.register_grant(grants.ImplicitGrant) 

297 authorization.register_grant(grants.ClientCredentialsGrant) 

298 authorization.register_grant(AuthorizationCodeGrant, [CodeChallenge(required=True)]) 

299 authorization.register_grant(PasswordGrant) 

300 authorization.register_grant(RefreshTokenGrant) 

301# authorization.register_token_generator("") 

302 

303 # support revocation 

304 revocation_cls = create_revocation_endpoint(db.session, OAuth2Token) 

305 authorization.register_endpoint(revocation_cls) 

306 

307 # protect resource 

308 bearer_cls = create_bearer_token_validator(db.session, OAuth2Token) 

309 require_oauth.register_token_validator(bearer_cls()) 

310 

311 from oauthlib.oauth2 import OAuth2Error 

312 #from oauthlib import authorization 

313 # from authlib.integrations.flask_oauth2 import AuthorizationServer 

314 

315 

316 import logging 

317 loggger = logging.getLogger() 

318 from flask import make_response, render_template 

319 

320 # VR Step 11/11/25 00am04 

321 # - [x] home.home maybe to change login.login 

322 # - [x] render template 'authorize.html' maybe to change  

323 # - [ ] auth/oauth/confirm.html to deprecate since authorize.html copied in templates/auth/oauth in safia 

324 # - [ ] Test all the flows again 

325 # - [ ] Add all the needed routes from https://github.com/authlib/example-oauth2-server/blob/master/website/routes.py 

326 # - [ ] def check_password(self, password): HACK HACK => je ne sais pas pourquoi et peut-etre plus en fait VR 11/11/25 12pm 

327 

328 

329 

330 @bp.route('/api/client', methods=['GET', 'POST']) 

331# @oauth.authorize_handler 

332 def api_client(*args, **kwargs): 

333 return "Je vous dis quoi !" 

334 

335 @bp.route('/api/user') 

336 @require_oauth('profile') 

337 def user_profile(): 

338 # if Token model has `.user` foreign key 

339 user = current_token.user 

340 return jsonify({"username": user.username, "id": user.id}) 

341 

342 

343 from flask_openapi3 import Info, Tag 

344 from pydantic import BaseModel, Field 

345 from typing import Optional 

346 

347 oauth_tag = Tag(name="saxia_oauth", description="Saxia Oauth") 

348 

349 # Modèles pour la documentation OpenAPI 

350 class AuthorizeGetQuery(BaseModel): 

351 """Paramètres de la requête GET pour l'autorisation OAuth""" 

352 response_type: str = Field(..., description="Type de réponse OAuth (code, token)") 

353 client_id: str = Field(..., description="ID du client OAuth") 

354 redirect_uri: Optional[str] = Field(None, description="URI de redirection après autorisation") 

355 scope: Optional[str] = Field(None, description="Portée demandée (profile, email, etc.)") 

356 state: Optional[str] = Field(None, description="État pour prévenir CSRF") 

357 code_challenge: Optional[str] = Field(None, description="Challenge PKCE") 

358 code_challenge_method: Optional[str] = Field(None, description="Méthode PKCE (S256, plain)") 

359 

360 class AuthorizePostForm(BaseModel): 

361 """Paramètres du formulaire POST pour confirmer l'autorisation""" 

362 confirm: str = Field(..., description="Confirmation de l'utilisateur (true/false)") 

363 username: Optional[str] = Field(None, description="Nom d'utilisateur si non connecté") 

364 

365 # Modèles pour l'endpoint /oauth/token 

366 class TokenRequestAuthorizationCode(BaseModel): 

367 """Requête de token avec authorization_code""" 

368 grant_type: str = Field(..., description="Type de grant (authorization_code)", example="authorization_code") 

369 code: str = Field(..., description="Code d'autorisation reçu du serveur", example="abc123def456") 

370 redirect_uri: str = Field(..., description="URI de redirection (doit correspondre à celle utilisée pour l'autorisation)") 

371 client_id: str = Field(..., description="ID du client OAuth") 

372 client_secret: Optional[str] = Field(None, description="Secret du client (si requis)") 

373 code_verifier: Optional[str] = Field(None, description="Vérificateur PKCE") 

374 

375 class TokenRequestRefreshToken(BaseModel): 

376 """Requête de token avec refresh_token""" 

377 grant_type: str = Field(..., description="Type de grant (refresh_token)", example="refresh_token") 

378 refresh_token: str = Field(..., description="Refresh token reçu lors de l'obtention du token initial", example="def456ghi789jkl012") 

379 client_id: str = Field(..., description="ID du client OAuth") 

380 client_secret: Optional[str] = Field(None, description="Secret du client (si requis)") 

381 scope: Optional[str] = Field(None, description="Portée demandée (optionnel, par défaut même portée que le token initial)") 

382 

383 class TokenRequestPassword(BaseModel): 

384 """Requête de token avec password grant""" 

385 grant_type: str = Field(..., description="Type de grant (password)", example="password") 

386 username: str = Field(..., description="Nom d'utilisateur") 

387 password: str = Field(..., description="Mot de passe") 

388 client_id: str = Field(..., description="ID du client OAuth") 

389 client_secret: Optional[str] = Field(None, description="Secret du client (si requis)") 

390 scope: Optional[str] = Field(None, description="Portée demandée") 

391 

392 class TokenResponse(BaseModel): 

393 """Réponse contenant le token OAuth2""" 

394 access_token: str = Field(..., description="Token d'accès OAuth2", example="YMWvsUJE15LGdsDKzQxHaRqEJxiIwZhBhpSvZqsC3o") 

395 token_type: str = Field(..., description="Type de token", example="Bearer") 

396 expires_in: int = Field(..., description="Durée de validité en secondes", example=3600) 

397 refresh_token: Optional[str] = Field(None, description="Token de rafraîchissement", example="def456ghi789jkl012") 

398 scope: Optional[str] = Field(None, description="Portées accordées", example="profile email") 

399 

400 class TokenErrorResponse(BaseModel): 

401 """Réponse d'erreur OAuth2""" 

402 error: str = Field(..., description="Code d'erreur OAuth2", example="invalid_grant") 

403 error_description: Optional[str] = Field(None, description="Description de l'erreur", example="Invalid authorization code") 

404 

405 # Modèles pour l'endpoint /oauth/revoke 

406 class RevokeTokenRequest(BaseModel): 

407 """Requête de révocation de token""" 

408 token: str = Field(..., description="Token à révoquer (access_token ou refresh_token)", example="YMWvsUJE15LGdsDKzQxHaRqEJxiIwZhBhpSvZqsC3o") 

409 token_type_hint: Optional[str] = Field(None, description="Type de token (access_token ou refresh_token)", example="access_token") 

410 client_id: Optional[str] = Field(None, description="ID du client OAuth (requis si le client a un secret)") 

411 client_secret: Optional[str] = Field(None, description="Secret du client OAuth") 

412 

413 class RevokeTokenResponse(BaseModel): 

414 """Réponse de révocation de token (succès)""" 

415 status: str = Field(default="revoked", description="Statut de la révocation", example="revoked") 

416 

417#@app.errorhandler(401) 

418#@app.route('/api/v1/uploader', methods = ['GET', 'POST']) 

419# @app.get(rule = '/static/<path:path>', tags = [oauth_tag]) 

420 

421 def _fix_request_scheme(): 

422 """Fonction helper pour forcer le schéma HTTPS""" 

423 loggger.warning(" request.scheme : " + str(request.scheme)) 

424 print(" p request.scheme : " + str(request.scheme)) 

425 loggger.warning(" request.url : " + str(request.url)) 

426 print(" p request.url : " + str(request.url)) 

427 request.scheme = "https" 

428 request.url = 'https://app2.ultradocia.fr/oauth/authorize' 

429 loggger.warning(" request.scheme : " + str(request.scheme)) 

430 print(" p request.scheme : " + str(request.scheme)) 

431 loggger.warning(" request.url : " + str(request.url)) 

432 print(" p request.url : " + str(request.url)) 

433 

434 @bp.get('/oauth/authorize', 

435 tags=[oauth_tag], 

436 summary="Afficher le formulaire de consentement OAuth", 

437 description="Affiche le formulaire de consentement pour l'autorisation OAuth. L'utilisateur doit être authentifié.") 

438 def authorize_get(query: AuthorizeGetQuery): 

439 """ 

440 Endpoint GET pour afficher le formulaire d'autorisation OAuth. 

441 

442 Redirige vers la page de login si l'utilisateur n'est pas authentifié. 

443 Affiche le formulaire de consentement si l'utilisateur est authentifié. 

444 """ 

445 _fix_request_scheme() 

446 user = current_user_loc() 

447 # if user log status is not true (Auth server), then to log it in 

448 if not user or user.is_anonymous: 

449 return redirect(url_for('login.login', next=request.url)) 

450 

451 grant = None 

452 try: 

453 request.scheme = "https" 

454 grant = authorization.get_consent_grant(end_user=user, request=request) 

455 except OAuth2Error as error: 

456 return error.error 

457 except Exception as e: 

458 print(str(e)) 

459 print("Grant is maybe none : " + str(grant)) 

460 return render_template('auth/oauth/authorize.html', user=user, grant=grant) 

461 

462 @bp.post('/oauth/authorize', 

463 tags=[oauth_tag], 

464 summary="Confirmer ou refuser l'autorisation OAuth", 

465 description="Traite la confirmation de l'utilisateur pour l'autorisation OAuth et retourne le code d'autorisation.") 

466 def authorize_post(form: AuthorizePostForm): 

467 """ 

468 Endpoint POST pour traiter la confirmation de l'autorisation OAuth. 

469 

470 L'utilisateur confirme ou refuse l'accès au client OAuth. 

471 Retourne un code d'autorisation si accepté, ou une erreur si refusé. 

472 """ 

473 _fix_request_scheme() 

474 user = current_user_loc() 

475 # if user log status is not true (Auth server), then to log it in 

476 if not user or user.is_anonymous: 

477 return redirect(url_for('login.login', next=request.url)) 

478 

479 if not user and 'username' in request.form: 

480 username = request.form.get('username') 

481 UserModel = get_user_model() 

482 if UserModel: 

483 user = db.session.execute( 

484 db.select(UserModel).filter_by(username=username) 

485 ).scalar_one_or_none() 

486 else: 

487 user = None 

488 if request.form['confirm']: 

489 grant_user = user 

490 else: 

491 grant_user = None 

492 return authorization.create_authorization_response(grant_user=grant_user) 

493 

494 

495 @bp.post('/oauth/token', 

496 tags=[oauth_tag], 

497 summary="Obtenir un access token OAuth2", 

498 description=""" 

499Endpoint pour obtenir un access token OAuth2. Supporte plusieurs grant types : 

500 

501### 1. Authorization Code Grant 

502Échange un code d'autorisation contre un access token. 

503```bash 

504curl -X POST https://app2.ultradocia.fr/oauth/token \\ 

505 -H "Content-Type: application/x-www-form-urlencoded" \\ 

506 -d "grant_type=authorization_code" \\ 

507 -d "code=abc123def456" \\ 

508 -d "redirect_uri=https://client.example.com/callback" \\ 

509 -d "client_id=YOUR_CLIENT_ID" \\ 

510 -d "client_secret=YOUR_CLIENT_SECRET" \\ 

511 -d "code_verifier=PKCE_CODE_VERIFIER" 

512``` 

513 

514### 2. Refresh Token Grant (Rafraîchissement du token) 

515Utilise un refresh_token pour obtenir un nouveau access token sans redemander l'autorisation. 

516```bash 

517curl -X POST https://app2.ultradocia.fr/oauth/token \\ 

518 -H "Content-Type: application/x-www-form-urlencoded" \\ 

519 -d "grant_type=refresh_token" \\ 

520 -d "refresh_token=def456ghi789jkl012" \\ 

521 -d "client_id=YOUR_CLIENT_ID" \\ 

522 -d "client_secret=YOUR_CLIENT_SECRET" 

523``` 

524 

525### 3. Password Grant 

526Obtient un token directement avec username/password (à utiliser avec précaution). 

527```bash 

528curl -X POST https://app2.ultradocia.fr/oauth/token \\ 

529 -H "Content-Type: application/x-www-form-urlencoded" \\ 

530 -d "grant_type=password" \\ 

531 -d "username=user@example.com" \\ 

532 -d "password=secret" \\ 

533 -d "client_id=YOUR_CLIENT_ID" 

534``` 

535 

536**Note importante sur le refresh_token :** 

537- Le refresh_token permet d'obtenir un nouveau access_token sans redemander l'autorisation à l'utilisateur 

538- Conservez le refresh_token en sécurité (il a une durée de vie plus longue que l'access_token) 

539- Un nouveau refresh_token peut être retourné lors du rafraîchissement (INCLUDE_NEW_REFRESH_TOKEN = True) 

540- Le refresh_token a une durée de validité de 2x la durée de l'access_token 

541 """, 

542 responses={ 

543 "200": TokenResponse, 

544 "400": TokenErrorResponse, 

545 "401": TokenErrorResponse 

546 }) 

547 def issue_token(): 

548 """ 

549 Issue an OAuth2 access token. 

550 

551 This endpoint supports multiple grant types: 

552 - authorization_code: Exchange authorization code for token 

553 - refresh_token: Get new access token using refresh token 

554 - password: Get token using username/password (Resource Owner Password Credentials) 

555 - client_credentials: Get token using client credentials only 

556 """ 

557 loggger.warning(" request.scheme : " + str(request.scheme)) 

558 print(" p request.scheme : " + str(request.scheme)) 

559 loggger.warning(" request.url : " + str(request.url)) 

560 print(" p request.url : " + str(request.url)) 

561 request.scheme = "https" 

562 request.url = 'https://app2.ultradocia.fr/oauth/token' 

563 loggger.warning(" request.scheme : " + str(request.scheme)) 

564 print(" p request.scheme : " + str(request.scheme)) 

565 loggger.warning(" request.url : " + str(request.url)) 

566 print(" p request.url : " + str(request.url)) 

567 return authorization.create_token_response() 

568 

569 

570 @bp.post('/oauth/revoke', 

571 tags=[oauth_tag], 

572 summary="Révoquer un token OAuth2", 

573 description=""" 

574Endpoint pour révoquer un access token ou refresh token OAuth2. 

575 

576Conforme à la spécification OAuth 2.0 Token Revocation (RFC 7009). 

577 

578### Cas d'usage : 

579- L'utilisateur se déconnecte de l'application 

580- L'application veut invalider un token compromis 

581- Révocation d'un refresh_token pour empêcher le renouvellement 

582 

583### Comportement : 

584- La révocation d'un token est **définitive** et **irréversible** 

585- Si le token n'existe pas ou est déjà révoqué, l'endpoint retourne quand même un succès (RFC 7009) 

586- La révocation d'un refresh_token peut également invalider les access_tokens associés (selon configuration) 

587 

588**Example curl command (révoquer un access_token):** 

589```bash 

590curl -X POST https://app2.ultradocia.fr/oauth/revoke \\ 

591 -H "Content-Type: application/x-www-form-urlencoded" \\ 

592 -d "token=YMWvsUJE15LGdsDKzQxHaRqEJxiIwZhBhpSvZqsC3o" \\ 

593 -d "token_type_hint=access_token" \\ 

594 -d "client_id=YOUR_CLIENT_ID" \\ 

595 -d "client_secret=YOUR_CLIENT_SECRET" 

596``` 

597 

598**Example curl command (révoquer un refresh_token):** 

599```bash 

600curl -X POST https://app2.ultradocia.fr/oauth/revoke \\ 

601 -H "Content-Type: application/x-www-form-urlencoded" \\ 

602 -d "token=def456ghi789jkl012" \\ 

603 -d "token_type_hint=refresh_token" \\ 

604 -d "client_id=YOUR_CLIENT_ID" \\ 

605 -d "client_secret=YOUR_CLIENT_SECRET" 

606``` 

607 

608**Paramètres:** 

609- `token` (requis) : Le token à révoquer (access_token ou refresh_token) 

610- `token_type_hint` (optionnel) : Type de token pour optimiser la recherche ("access_token" ou "refresh_token") 

611- `client_id` (optionnel/requis) : ID du client (requis si le client a un secret) 

612- `client_secret` (optionnel) : Secret du client pour l'authentification 

613 

614**Important :** 

615- Après révocation d'un access_token, toute tentative d'utilisation retournera une erreur 401 

616- Après révocation d'un refresh_token, il ne sera plus possible de rafraîchir l'access_token 

617- L'endpoint retourne toujours 200 OK même si le token n'existe pas (sécurité par obscurité) 

618 """, 

619 responses={ 

620 "200": RevokeTokenResponse, 

621 "400": TokenErrorResponse, 

622 "401": TokenErrorResponse 

623 }) 

624 def revoke_token(): 

625 """ 

626 Revoke an OAuth2 token (access_token or refresh_token). 

627 

628 Implements RFC 7009 - OAuth 2.0 Token Revocation. 

629 Returns 200 OK regardless of whether the token exists (security by obscurity). 

630 """ 

631 return authorization.create_endpoint_response('revocation') 

632 

633 

634 # Admin/Debug endpoints for OAuth2 client management 

635 @bp.route('/oauth/clients', methods=['GET']) 

636 def list_clients(): 

637 """List all OAuth2 clients (for debugging)""" 

638 clients = OAuth2Client.query.all() 

639 result = [] 

640 for client in clients: 

641 result.append({ 

642 'client_id': client.client_id, 

643 'client_name': getattr(client, 'client_name', 'N/A'), 

644 'redirect_uris': client.redirect_uris.split() if hasattr(client, 'redirect_uris') else [], 

645 'grant_types': client.grant_types.split() if hasattr(client, 'grant_types') else [], 

646 'scope': client.scope 

647 }) 

648 return jsonify(result) 

649 

650 @bp.route('/oauth/init_test_client', methods=['POST', 'GET']) 

651 def init_test_client(): 

652 """Initialize a test OAuth2 client for development""" 

653 from werkzeug.security import gen_salt 

654 

655 # Check if test client already exists 

656 client = OAuth2Client.query.filter_by(client_id='dev').first() 

657 

658 if client: 

659 return jsonify({ 

660 'status': 'exists', 

661 'message': 'Test client already exists', 

662 'client_id': client.client_id, 

663 'redirect_uris': client.redirect_uris.split() if hasattr(client, 'redirect_uris') else [] 

664 }) 

665 

666 # Get a test user (first user or create one) 

667 UserModel = get_user_model() 

668 if not UserModel: 

669 return jsonify({'error': 'User model not found'}), 500 

670 

671 user = db.session.execute(db.select(UserModel)).scalar_one_or_none() 

672 if not user: 

673 return jsonify({'error': 'No users found. Please create a user first.'}), 400 

674 

675 # Create test client 

676 client = OAuth2Client( 

677 client_id='dev', 

678 client_secret='toto2', # In production, this should be hashed 

679 user_id=user.id 

680 ) 

681 

682 # Set client metadata 

683 client_metadata = { 

684 'client_name': 'Development Test Client', 

685 'client_uri': 'https://safia.rubbia.fr', 

686 'grant_types': ['authorization_code', 'password', 'client_credentials', 'refresh_token'], 

687 'redirect_uris': [ 

688 'https://safia.rubbia.fr/authorize', 

689 'https://app2.ultradocia.fr/authorize', 

690 'http://localhost:8000/authorize' 

691 ], 

692 'response_types': ['code', 'token'], 

693 'scope': 'profile email openid', 

694 'token_endpoint_auth_method': 'client_secret_post' 

695 } 

696 client.set_client_metadata(client_metadata) 

697 

698 db.session.add(client) 

699 db.session.commit() 

700 

701 return jsonify({ 

702 'status': 'created', 

703 'message': 'Test client created successfully', 

704 'client_id': client.client_id, 

705 'client_secret': 'toto2', 

706 'redirect_uris': client_metadata['redirect_uris'] 

707 }) 

708 

709 

710# db.init_app(app) 

711 

712 

713 with app.app_context(): 

714 db.create_all() # extend_existing = True 

715 db.session.commit() 

716 

717 

718 config_oauth(app) 

719 return bp, require_oauth 

720 

721import os 

722# os.environ['AUTHLIB_INSECURE_TRANSPORT'] = 'true' 

723import os 

724# os.environ['DEBUG'] = 'true'