Coverage for server/bp/oauth.py: 57%
281 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-01-26 23:58 +0100
« prev ^ index » next coverage.py v7.9.1, created at 2026-01-26 23:58 +0100
2from flask import Blueprint, request, url_for, render_template, redirect, render_template_string, send_file
4import time
5from flask import Blueprint, request, session, url_for
6from flask import render_template, redirect, jsonify
7from werkzeug.security import gen_salt
9from authlib.integrations.flask_oauth2 import current_token
10from authlib.oauth2 import OAuth2Error
12# VR 11/11/25 copy all from lib.auth_oauth.oauth2
13#from lib.auth_oauth.models import db, User, OAuth2Client
14#from lib.auth_oauth.oauth2 import authorization, require_oauth
21import os
22from flask import Flask
24# New library 11/11/25 from https://github.com/authlib/example-oauth2-server/blob/master/website/
25#from lib.auth_oauth.models import db
28# 11/11/25 FROM https://github.com/authlib/example-oauth2-server/blob/master/website/routes.py
30# Maybe for connexion
31def current_user_loc():
32 from flask_login import current_user
33 return current_user
34# VR TODO 11/11/25 Proposed by authlib/example-oauth2-server
35# if 'id' in session:
36# uid = session['id']
37# return User.query.get(uid)
38# return None
40# For client creation : non blocking because we hard-code client in first dev steps
41def split_by_crlf(s):
42 return [v for v in s.splitlines() if v]
46def create_oauth_bp(app, db):
47 from flask_openapi3 import APIBlueprint
48 bp = APIBlueprint('oauth', __name__, url_prefix='')
49 from sqlalchemy.orm import relationship
51 # Use the db instance passed as parameter, not creating new engine/base
53# dupliquer
54# class User(Base):
55# __table__ = Table("flask_user", Base.metadata, autoload_with=engine, schema='mtruser')
57 # from server.safia import User
61 @bp.route("/oauth_index")
62 def index():
63 return "This is just a test that needs to be renamed !"
65 from flask_sqlalchemy import SQLAlchemy
67 # Legacy
68# from flask_oauthlib.provider import OAuth2Provider
71 # New library 11/11/25 => but this is in sub files in lib.auth_oauth
72# from authlib.integrations.flask_oauth2 import AuthorizationServer
73# from authlib.oauth2 import OAuth2Error
76 # legacy
77# oauth = OAuth2Provider(app)
78# oauth = OAuth2ClientMixin(app)
79 app.debug = True
81 # COPY VR 11/11/25 Moved from models
82 import time
83 from authlib.integrations.sqla_oauth2 import (
84 OAuth2ClientMixin,
85 OAuth2AuthorizationCodeMixin,
86 OAuth2TokenMixin,
87 )
89 # CA c'est Claude le 11/11/25 : Note: User model is defined in auth.py
90 # We retrieve it from the db.Model.registry or define a minimal reference
91 # Since we're using string-based foreign keys, we don't need the actual class
92 # But for query operations, we need to get the User model
93 # The safest way is to query it dynamically
95 # We'll define a helper to get User model at runtime
96 User = None # Will be set later when needed
98 def get_user_model():
99 """Get User model from the registry at runtime"""
100 # Import here to avoid circular dependency
101 import sys
102 if 'server.safia' in sys.modules:
103 return sys.modules['server.safia'].User
104 # Fallback: search in registry
105 for attr_name in dir(db.Model):
106 attr = getattr(db.Model, attr_name, None)
107 if hasattr(attr, '__tablename__') and attr.__tablename__ == 'flask_user':
108 return attr
109 return None
111 class OAuth2Client(db.Model, OAuth2ClientMixin):
112 __tablename__ = 'oauth2_client'
113 __table_args__ = {'extend_existing': True,
114 "schema": "mtruser"}
116 id = db.Column(db.Integer, primary_key=True)
117 # Use string reference for ForeignKey to avoid circular import issues
118 user_id = db.Column(
119 db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE')
120 )
121 # Use string reference in relationship as well
122 user = db.relationship('User', foreign_keys=[user_id])
125# user_id = db.Column(db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE'))
126# user = db.relationship('User', backref='oauth2_client', foreign_keys=[user_id])
128# sqlalchemy.exc.InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'Mapper[OAuth2Client(oauth2_client)]'. Original exception was: Error creating backref 'oauth2_client' on relationship 'OAuth2Client.user': property of that name exists on mapper 'Mapper[User(flask_user)]'
130# GPT pourquoi un s user = db.relationship('User', backref='oauth2_clients', foreign_keys=[user_id])
133 class OAuth2AuthorizationCode(db.Model, OAuth2AuthorizationCodeMixin):
134 __tablename__ = 'oauth2_code'
135 __table_args__ = {'extend_existing': True,
136 "schema": "mtruser"}
138 id = db.Column(db.Integer, primary_key=True)
139 # Use string reference for ForeignKey to avoid circular import issues
140 user_id = db.Column(
141 db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE')
142 )
143 user = db.relationship('User', foreign_keys=[user_id])
146# user_id = db.Column(db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE'))
147# user = db.relationship('User', foreign_keys=[user_id])
149 class OAuth2Token(db.Model, OAuth2TokenMixin):
150 __tablename__ = 'oauth2_token'
151 __table_args__ = {'extend_existing': True,
152 "schema": "mtruser"}
154 id = db.Column(db.Integer, primary_key=True)
155 # user_id = db.Column(
156 # db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
157 # user = db.relationship('User')
160 user_id = db.Column(db.Integer, db.ForeignKey('mtruser.flask_user.id', ondelete='CASCADE'))
161 user = db.relationship('User', foreign_keys=[user_id])
163 def is_refresh_token_active(self):
164 # access_token_revoked_at | refresh_token_revoked_at
166# if self.revoked:
167# return False
168 expires_at = self.issued_at + self.expires_in * 2
169 return expires_at >= time.time()
171 #from server.safia import app
172 with app.app_context():
173 db.create_all() # extend_existing = True
174 db.session.commit()
176 # COPY VR 11/11/25 Moved from oauth2 in lib.auth_oauth
177 from authlib.integrations.flask_oauth2 import (
178 AuthorizationServer,
179 ResourceProtector,
180)
181 from authlib.integrations.sqla_oauth2 import (
182 create_query_client_func,
183 create_save_token_func,
184 create_revocation_endpoint,
185 create_bearer_token_validator,
186 )
187 from authlib.oauth2.rfc6749 import grants
188 from authlib.oauth2.rfc7636 import CodeChallenge
189 #from .models import db, User
190 #from .models import OAuth2Client, OAuth2AuthorizationCode, OAuth2Token
192# 'client_secret_post', 'none',
193 class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
194 TOKEN_ENDPOINT_AUTH_METHODS = [
195 'client_secret_basic', 'client_secret_post', 'none'
196 ]
198 def save_authorization_code(self, code, request):
199 code_challenge = request.data.get('code_challenge')
200 code_challenge_method = request.data.get('code_challenge_method')
201 auth_code = OAuth2AuthorizationCode(
202 code=code,
203 client_id=request.client.client_id,
204 redirect_uri=request.redirect_uri,
205 scope=request.scope,
206 user_id=request.user.id,
207 code_challenge=code_challenge,
208 code_challenge_method=code_challenge_method,
209 )
210 db.session.add(auth_code)
211 db.session.commit()
212 return auth_code
214 def query_authorization_code(self, code, client):
215 auth_code = OAuth2AuthorizationCode.query.filter_by(
216 code=code, client_id=client.client_id).first()
217 if auth_code and not auth_code.is_expired():
218 return auth_code
220 def delete_authorization_code(self, authorization_code):
221 db.session.delete(authorization_code)
222 db.session.commit()
224 def authenticate_user(self, authorization_code):
225 UserModel = get_user_model()
226 if not UserModel:
227 return None
228 # Use db.session.get() which works across different SQLAlchemy instances
229 return db.session.get(UserModel, authorization_code.user_id)
232 class PasswordGrant(grants.ResourceOwnerPasswordCredentialsGrant):
233 def authenticate_user(self, username, password):
234 UserModel = get_user_model()
235 if not UserModel:
236 return None
237 # Use db.session.execute() with select() for SQLAlchemy 2.0 compatibility
238 user = db.session.execute(
239 db.select(UserModel).filter_by(username=username)
240 ).scalar_one_or_none()
241 if user is not None and user.check_password(password):
242 return user
243 return None
246 class RefreshTokenGrant(grants.RefreshTokenGrant):
247 INCLUDE_NEW_REFRESH_TOKEN = True
248 TOKEN_ENDPOINT_AUTH_METHODS = [
249 'client_secret_basic', 'client_secret_post', 'none'
250 ]
252 def authenticate_refresh_token(self, refresh_token):
253 token = OAuth2Token.query.filter_by(refresh_token=refresh_token).first()
254 if token and token.is_refresh_token_active():
255 return token
257 def authenticate_user(self, credential):
258 UserModel = get_user_model()
259 if not UserModel:
260 return None
261 # Use db.session.get() which works across different SQLAlchemy instances
262 return db.session.get(UserModel, credential.user_id)
264 def revoke_old_credential(self, credential):
265 credential.revoked = True
266 db.session.add(credential)
267 db.session.commit()
270 query_client = create_query_client_func(db.session, OAuth2Client)
271 save_token = create_save_token_func(db.session, OAuth2Token)
274 # VR TODO bullshit de duck.ai
275 # Autoriser la génération d'un nouveau refresh token
276 class MyAuthorizationServer(AuthorizationServer):
277 def create_refresh_token(self, client_id, grant_type):
278 # Logique pour créer un nouveau refresh token
279 return super().create_refresh_token(client_id, grant_type)
280# authorization = MyAuthorizationServer(
281# query_client=query_client,
282# save_token=save_token,
283# )
285 authorization = AuthorizationServer(
286 query_client=query_client,
287 save_token=save_token,
288 )
290 require_oauth = ResourceProtector()
292 def config_oauth(app):
293 authorization.init_app(app)
295 # support all grants
296 authorization.register_grant(grants.ImplicitGrant)
297 authorization.register_grant(grants.ClientCredentialsGrant)
298 authorization.register_grant(AuthorizationCodeGrant, [CodeChallenge(required=True)])
299 authorization.register_grant(PasswordGrant)
300 authorization.register_grant(RefreshTokenGrant)
301# authorization.register_token_generator("")
303 # support revocation
304 revocation_cls = create_revocation_endpoint(db.session, OAuth2Token)
305 authorization.register_endpoint(revocation_cls)
307 # protect resource
308 bearer_cls = create_bearer_token_validator(db.session, OAuth2Token)
309 require_oauth.register_token_validator(bearer_cls())
311 from oauthlib.oauth2 import OAuth2Error
312 #from oauthlib import authorization
313 # from authlib.integrations.flask_oauth2 import AuthorizationServer
316 import logging
317 loggger = logging.getLogger()
318 from flask import make_response, render_template
320 # VR Step 11/11/25 00am04
321 # - [x] home.home maybe to change login.login
322 # - [x] render template 'authorize.html' maybe to change
323 # - [ ] auth/oauth/confirm.html to deprecate since authorize.html copied in templates/auth/oauth in safia
324 # - [ ] Test all the flows again
325 # - [ ] Add all the needed routes from https://github.com/authlib/example-oauth2-server/blob/master/website/routes.py
326 # - [ ] def check_password(self, password): HACK HACK => je ne sais pas pourquoi et peut-etre plus en fait VR 11/11/25 12pm
330 @bp.route('/api/client', methods=['GET', 'POST'])
331# @oauth.authorize_handler
332 def api_client(*args, **kwargs):
333 return "Je vous dis quoi !"
335 @bp.route('/api/user')
336 @require_oauth('profile')
337 def user_profile():
338 # if Token model has `.user` foreign key
339 user = current_token.user
340 return jsonify({"username": user.username, "id": user.id})
343 from flask_openapi3 import Info, Tag
344 from pydantic import BaseModel, Field
345 from typing import Optional
347 oauth_tag = Tag(name="saxia_oauth", description="Saxia Oauth")
349 # Modèles pour la documentation OpenAPI
350 class AuthorizeGetQuery(BaseModel):
351 """Paramètres de la requête GET pour l'autorisation OAuth"""
352 response_type: str = Field(..., description="Type de réponse OAuth (code, token)")
353 client_id: str = Field(..., description="ID du client OAuth")
354 redirect_uri: Optional[str] = Field(None, description="URI de redirection après autorisation")
355 scope: Optional[str] = Field(None, description="Portée demandée (profile, email, etc.)")
356 state: Optional[str] = Field(None, description="État pour prévenir CSRF")
357 code_challenge: Optional[str] = Field(None, description="Challenge PKCE")
358 code_challenge_method: Optional[str] = Field(None, description="Méthode PKCE (S256, plain)")
360 class AuthorizePostForm(BaseModel):
361 """Paramètres du formulaire POST pour confirmer l'autorisation"""
362 confirm: str = Field(..., description="Confirmation de l'utilisateur (true/false)")
363 username: Optional[str] = Field(None, description="Nom d'utilisateur si non connecté")
365 # Modèles pour l'endpoint /oauth/token
366 class TokenRequestAuthorizationCode(BaseModel):
367 """Requête de token avec authorization_code"""
368 grant_type: str = Field(..., description="Type de grant (authorization_code)", example="authorization_code")
369 code: str = Field(..., description="Code d'autorisation reçu du serveur", example="abc123def456")
370 redirect_uri: str = Field(..., description="URI de redirection (doit correspondre à celle utilisée pour l'autorisation)")
371 client_id: str = Field(..., description="ID du client OAuth")
372 client_secret: Optional[str] = Field(None, description="Secret du client (si requis)")
373 code_verifier: Optional[str] = Field(None, description="Vérificateur PKCE")
375 class TokenRequestRefreshToken(BaseModel):
376 """Requête de token avec refresh_token"""
377 grant_type: str = Field(..., description="Type de grant (refresh_token)", example="refresh_token")
378 refresh_token: str = Field(..., description="Refresh token reçu lors de l'obtention du token initial", example="def456ghi789jkl012")
379 client_id: str = Field(..., description="ID du client OAuth")
380 client_secret: Optional[str] = Field(None, description="Secret du client (si requis)")
381 scope: Optional[str] = Field(None, description="Portée demandée (optionnel, par défaut même portée que le token initial)")
383 class TokenRequestPassword(BaseModel):
384 """Requête de token avec password grant"""
385 grant_type: str = Field(..., description="Type de grant (password)", example="password")
386 username: str = Field(..., description="Nom d'utilisateur")
387 password: str = Field(..., description="Mot de passe")
388 client_id: str = Field(..., description="ID du client OAuth")
389 client_secret: Optional[str] = Field(None, description="Secret du client (si requis)")
390 scope: Optional[str] = Field(None, description="Portée demandée")
392 class TokenResponse(BaseModel):
393 """Réponse contenant le token OAuth2"""
394 access_token: str = Field(..., description="Token d'accès OAuth2", example="YMWvsUJE15LGdsDKzQxHaRqEJxiIwZhBhpSvZqsC3o")
395 token_type: str = Field(..., description="Type de token", example="Bearer")
396 expires_in: int = Field(..., description="Durée de validité en secondes", example=3600)
397 refresh_token: Optional[str] = Field(None, description="Token de rafraîchissement", example="def456ghi789jkl012")
398 scope: Optional[str] = Field(None, description="Portées accordées", example="profile email")
400 class TokenErrorResponse(BaseModel):
401 """Réponse d'erreur OAuth2"""
402 error: str = Field(..., description="Code d'erreur OAuth2", example="invalid_grant")
403 error_description: Optional[str] = Field(None, description="Description de l'erreur", example="Invalid authorization code")
405 # Modèles pour l'endpoint /oauth/revoke
406 class RevokeTokenRequest(BaseModel):
407 """Requête de révocation de token"""
408 token: str = Field(..., description="Token à révoquer (access_token ou refresh_token)", example="YMWvsUJE15LGdsDKzQxHaRqEJxiIwZhBhpSvZqsC3o")
409 token_type_hint: Optional[str] = Field(None, description="Type de token (access_token ou refresh_token)", example="access_token")
410 client_id: Optional[str] = Field(None, description="ID du client OAuth (requis si le client a un secret)")
411 client_secret: Optional[str] = Field(None, description="Secret du client OAuth")
413 class RevokeTokenResponse(BaseModel):
414 """Réponse de révocation de token (succès)"""
415 status: str = Field(default="revoked", description="Statut de la révocation", example="revoked")
417#@app.errorhandler(401)
418#@app.route('/api/v1/uploader', methods = ['GET', 'POST'])
419# @app.get(rule = '/static/<path:path>', tags = [oauth_tag])
421 def _fix_request_scheme():
422 """Fonction helper pour forcer le schéma HTTPS"""
423 loggger.warning(" request.scheme : " + str(request.scheme))
424 print(" p request.scheme : " + str(request.scheme))
425 loggger.warning(" request.url : " + str(request.url))
426 print(" p request.url : " + str(request.url))
427 request.scheme = "https"
428 request.url = 'https://app2.ultradocia.fr/oauth/authorize'
429 loggger.warning(" request.scheme : " + str(request.scheme))
430 print(" p request.scheme : " + str(request.scheme))
431 loggger.warning(" request.url : " + str(request.url))
432 print(" p request.url : " + str(request.url))
434 @bp.get('/oauth/authorize',
435 tags=[oauth_tag],
436 summary="Afficher le formulaire de consentement OAuth",
437 description="Affiche le formulaire de consentement pour l'autorisation OAuth. L'utilisateur doit être authentifié.")
438 def authorize_get(query: AuthorizeGetQuery):
439 """
440 Endpoint GET pour afficher le formulaire d'autorisation OAuth.
442 Redirige vers la page de login si l'utilisateur n'est pas authentifié.
443 Affiche le formulaire de consentement si l'utilisateur est authentifié.
444 """
445 _fix_request_scheme()
446 user = current_user_loc()
447 # if user log status is not true (Auth server), then to log it in
448 if not user or user.is_anonymous:
449 return redirect(url_for('login.login', next=request.url))
451 grant = None
452 try:
453 request.scheme = "https"
454 grant = authorization.get_consent_grant(end_user=user, request=request)
455 except OAuth2Error as error:
456 return error.error
457 except Exception as e:
458 print(str(e))
459 print("Grant is maybe none : " + str(grant))
460 return render_template('auth/oauth/authorize.html', user=user, grant=grant)
462 @bp.post('/oauth/authorize',
463 tags=[oauth_tag],
464 summary="Confirmer ou refuser l'autorisation OAuth",
465 description="Traite la confirmation de l'utilisateur pour l'autorisation OAuth et retourne le code d'autorisation.")
466 def authorize_post(form: AuthorizePostForm):
467 """
468 Endpoint POST pour traiter la confirmation de l'autorisation OAuth.
470 L'utilisateur confirme ou refuse l'accès au client OAuth.
471 Retourne un code d'autorisation si accepté, ou une erreur si refusé.
472 """
473 _fix_request_scheme()
474 user = current_user_loc()
475 # if user log status is not true (Auth server), then to log it in
476 if not user or user.is_anonymous:
477 return redirect(url_for('login.login', next=request.url))
479 if not user and 'username' in request.form:
480 username = request.form.get('username')
481 UserModel = get_user_model()
482 if UserModel:
483 user = db.session.execute(
484 db.select(UserModel).filter_by(username=username)
485 ).scalar_one_or_none()
486 else:
487 user = None
488 if request.form['confirm']:
489 grant_user = user
490 else:
491 grant_user = None
492 return authorization.create_authorization_response(grant_user=grant_user)
495 @bp.post('/oauth/token',
496 tags=[oauth_tag],
497 summary="Obtenir un access token OAuth2",
498 description="""
499Endpoint pour obtenir un access token OAuth2. Supporte plusieurs grant types :
501### 1. Authorization Code Grant
502Échange un code d'autorisation contre un access token.
503```bash
504curl -X POST https://app2.ultradocia.fr/oauth/token \\
505 -H "Content-Type: application/x-www-form-urlencoded" \\
506 -d "grant_type=authorization_code" \\
507 -d "code=abc123def456" \\
508 -d "redirect_uri=https://client.example.com/callback" \\
509 -d "client_id=YOUR_CLIENT_ID" \\
510 -d "client_secret=YOUR_CLIENT_SECRET" \\
511 -d "code_verifier=PKCE_CODE_VERIFIER"
512```
514### 2. Refresh Token Grant (Rafraîchissement du token)
515Utilise un refresh_token pour obtenir un nouveau access token sans redemander l'autorisation.
516```bash
517curl -X POST https://app2.ultradocia.fr/oauth/token \\
518 -H "Content-Type: application/x-www-form-urlencoded" \\
519 -d "grant_type=refresh_token" \\
520 -d "refresh_token=def456ghi789jkl012" \\
521 -d "client_id=YOUR_CLIENT_ID" \\
522 -d "client_secret=YOUR_CLIENT_SECRET"
523```
525### 3. Password Grant
526Obtient un token directement avec username/password (à utiliser avec précaution).
527```bash
528curl -X POST https://app2.ultradocia.fr/oauth/token \\
529 -H "Content-Type: application/x-www-form-urlencoded" \\
530 -d "grant_type=password" \\
531 -d "username=user@example.com" \\
532 -d "password=secret" \\
533 -d "client_id=YOUR_CLIENT_ID"
534```
536**Note importante sur le refresh_token :**
537- Le refresh_token permet d'obtenir un nouveau access_token sans redemander l'autorisation à l'utilisateur
538- Conservez le refresh_token en sécurité (il a une durée de vie plus longue que l'access_token)
539- Un nouveau refresh_token peut être retourné lors du rafraîchissement (INCLUDE_NEW_REFRESH_TOKEN = True)
540- Le refresh_token a une durée de validité de 2x la durée de l'access_token
541 """,
542 responses={
543 "200": TokenResponse,
544 "400": TokenErrorResponse,
545 "401": TokenErrorResponse
546 })
547 def issue_token():
548 """
549 Issue an OAuth2 access token.
551 This endpoint supports multiple grant types:
552 - authorization_code: Exchange authorization code for token
553 - refresh_token: Get new access token using refresh token
554 - password: Get token using username/password (Resource Owner Password Credentials)
555 - client_credentials: Get token using client credentials only
556 """
557 loggger.warning(" request.scheme : " + str(request.scheme))
558 print(" p request.scheme : " + str(request.scheme))
559 loggger.warning(" request.url : " + str(request.url))
560 print(" p request.url : " + str(request.url))
561 request.scheme = "https"
562 request.url = 'https://app2.ultradocia.fr/oauth/token'
563 loggger.warning(" request.scheme : " + str(request.scheme))
564 print(" p request.scheme : " + str(request.scheme))
565 loggger.warning(" request.url : " + str(request.url))
566 print(" p request.url : " + str(request.url))
567 return authorization.create_token_response()
570 @bp.post('/oauth/revoke',
571 tags=[oauth_tag],
572 summary="Révoquer un token OAuth2",
573 description="""
574Endpoint pour révoquer un access token ou refresh token OAuth2.
576Conforme à la spécification OAuth 2.0 Token Revocation (RFC 7009).
578### Cas d'usage :
579- L'utilisateur se déconnecte de l'application
580- L'application veut invalider un token compromis
581- Révocation d'un refresh_token pour empêcher le renouvellement
583### Comportement :
584- La révocation d'un token est **définitive** et **irréversible**
585- Si le token n'existe pas ou est déjà révoqué, l'endpoint retourne quand même un succès (RFC 7009)
586- La révocation d'un refresh_token peut également invalider les access_tokens associés (selon configuration)
588**Example curl command (révoquer un access_token):**
589```bash
590curl -X POST https://app2.ultradocia.fr/oauth/revoke \\
591 -H "Content-Type: application/x-www-form-urlencoded" \\
592 -d "token=YMWvsUJE15LGdsDKzQxHaRqEJxiIwZhBhpSvZqsC3o" \\
593 -d "token_type_hint=access_token" \\
594 -d "client_id=YOUR_CLIENT_ID" \\
595 -d "client_secret=YOUR_CLIENT_SECRET"
596```
598**Example curl command (révoquer un refresh_token):**
599```bash
600curl -X POST https://app2.ultradocia.fr/oauth/revoke \\
601 -H "Content-Type: application/x-www-form-urlencoded" \\
602 -d "token=def456ghi789jkl012" \\
603 -d "token_type_hint=refresh_token" \\
604 -d "client_id=YOUR_CLIENT_ID" \\
605 -d "client_secret=YOUR_CLIENT_SECRET"
606```
608**Paramètres:**
609- `token` (requis) : Le token à révoquer (access_token ou refresh_token)
610- `token_type_hint` (optionnel) : Type de token pour optimiser la recherche ("access_token" ou "refresh_token")
611- `client_id` (optionnel/requis) : ID du client (requis si le client a un secret)
612- `client_secret` (optionnel) : Secret du client pour l'authentification
614**Important :**
615- Après révocation d'un access_token, toute tentative d'utilisation retournera une erreur 401
616- Après révocation d'un refresh_token, il ne sera plus possible de rafraîchir l'access_token
617- L'endpoint retourne toujours 200 OK même si le token n'existe pas (sécurité par obscurité)
618 """,
619 responses={
620 "200": RevokeTokenResponse,
621 "400": TokenErrorResponse,
622 "401": TokenErrorResponse
623 })
624 def revoke_token():
625 """
626 Revoke an OAuth2 token (access_token or refresh_token).
628 Implements RFC 7009 - OAuth 2.0 Token Revocation.
629 Returns 200 OK regardless of whether the token exists (security by obscurity).
630 """
631 return authorization.create_endpoint_response('revocation')
634 # Admin/Debug endpoints for OAuth2 client management
635 @bp.route('/oauth/clients', methods=['GET'])
636 def list_clients():
637 """List all OAuth2 clients (for debugging)"""
638 clients = OAuth2Client.query.all()
639 result = []
640 for client in clients:
641 result.append({
642 'client_id': client.client_id,
643 'client_name': getattr(client, 'client_name', 'N/A'),
644 'redirect_uris': client.redirect_uris.split() if hasattr(client, 'redirect_uris') else [],
645 'grant_types': client.grant_types.split() if hasattr(client, 'grant_types') else [],
646 'scope': client.scope
647 })
648 return jsonify(result)
650 @bp.route('/oauth/init_test_client', methods=['POST', 'GET'])
651 def init_test_client():
652 """Initialize a test OAuth2 client for development"""
653 from werkzeug.security import gen_salt
655 # Check if test client already exists
656 client = OAuth2Client.query.filter_by(client_id='dev').first()
658 if client:
659 return jsonify({
660 'status': 'exists',
661 'message': 'Test client already exists',
662 'client_id': client.client_id,
663 'redirect_uris': client.redirect_uris.split() if hasattr(client, 'redirect_uris') else []
664 })
666 # Get a test user (first user or create one)
667 UserModel = get_user_model()
668 if not UserModel:
669 return jsonify({'error': 'User model not found'}), 500
671 user = db.session.execute(db.select(UserModel)).scalar_one_or_none()
672 if not user:
673 return jsonify({'error': 'No users found. Please create a user first.'}), 400
675 # Create test client
676 client = OAuth2Client(
677 client_id='dev',
678 client_secret='toto2', # In production, this should be hashed
679 user_id=user.id
680 )
682 # Set client metadata
683 client_metadata = {
684 'client_name': 'Development Test Client',
685 'client_uri': 'https://safia.rubbia.fr',
686 'grant_types': ['authorization_code', 'password', 'client_credentials', 'refresh_token'],
687 'redirect_uris': [
688 'https://safia.rubbia.fr/authorize',
689 'https://app2.ultradocia.fr/authorize',
690 'http://localhost:8000/authorize'
691 ],
692 'response_types': ['code', 'token'],
693 'scope': 'profile email openid',
694 'token_endpoint_auth_method': 'client_secret_post'
695 }
696 client.set_client_metadata(client_metadata)
698 db.session.add(client)
699 db.session.commit()
701 return jsonify({
702 'status': 'created',
703 'message': 'Test client created successfully',
704 'client_id': client.client_id,
705 'client_secret': 'toto2',
706 'redirect_uris': client_metadata['redirect_uris']
707 })
710# db.init_app(app)
713 with app.app_context():
714 db.create_all() # extend_existing = True
715 db.session.commit()
718 config_oauth(app)
719 return bp, require_oauth
721import os
722# os.environ['AUTHLIB_INSECURE_TRANSPORT'] = 'true'
723import os
724# os.environ['DEBUG'] = 'true'