Coverage for server/bp/auth.py: 25%
602 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-01-26 23:58 +0100
« prev ^ index » next coverage.py v7.9.1, created at 2026-01-26 23:58 +0100
2def get_ipv4_from_request(request):
3 ipv4 = request.environ['REMOTE_ADDR']
4 print("ipv4 l1 get_ipv4_from_request : " + str(ipv4))
5 ipv4_list = request.access_route
6 print(" list ipv4 : " + str(ipv4_list))
7 if str(type(ipv4_list)) == "<class 'werkzeug.datastructures.structures.ImmutableList'>" and len(ipv4_list) > 0:
8 ipv4 = ipv4_list[-1]
10 print("ipv4 in get_ipv4_from_request : " + str(ipv4))
12 return ipv4
14db_session = None
16import bcrypt
17from flask_bcrypt import Bcrypt
19from flask import redirect, url_for, render_template, request, Blueprint
20def create_login_bp(app, db = None):
21 from flask_login import LoginManager
22 from flask_sqlalchemy import SQLAlchemy
23 from werkzeug.security import generate_password_hash, check_password_hash
24 from flask_login import UserMixin, login_user, logout_user, login_required, current_user
26 bp2 = Blueprint('login', __name__)
28 login_manager = LoginManager()
29 login_manager.init_app(app)
30 bcrypt_app = Bcrypt(app)
31 # VR 15/11/25 : ca c'est un bug que claude a laissé trainer
32 from auth.lib_conf_system import LibConfSystem as LCS
33 lcs = LCS()
35 if db is None:
37 (PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) = lcs.get_pg_conf()
39 app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql+psycopg2://user:password@localhost/dbname'
40 app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql+psycopg2://' + PG_USER + ':' + PG_PASSWORD + '@localhost/' + PG_DB
41 app.config[
42 'SQLALCHEMY_DATABASE_URI'] = 'postgresql+psycopg2://' + PG_USER + ':' + PG_PASSWORD + '@' + PG_HOST + ':' + str(
43 PG_PORT) + '/' + PG_DB
45 # Line to decomment VR 22/4/24
46 # app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://victor_r:VR2001%40FTN01@192.168.1.48:3306/MTRBack' #?useUnicode=true&characterSetResults=utf-8&zeroDateTimeBehavior=CONVERT_TO_NULL&character-set-client-handshake=FALSE&serverTimezone=Europe/Paris'
47 # , pool_recycle)=3600, pool_size=50, max_overflow=50)
53 # dbConnection = sqlEngine.connect()
55 # from sqlalchemy import create_engine
56 # engine = create_engine("postgresql+pg8000://dbuser:kx%40jj5%2Fg@pghost10/appdb")
58 #app.config['SQLALCHEMY_DATABASE_URI'] = f'postgresql+psycopg2://{PG_USER}:{PG_PASSWORD}@localhost/{PG_DB}'
59 app.config['SECRET_KEY'] = 'your_secret_key_here'
61# db = SQLAlchemy(app)cr
63 db = SQLAlchemy()
64 db.init_app(app)
65 bp2.db = db
66 with app.app_context():
67 db.create_all()
69 db_session = db
70 else:
71 bp2.db = db
72 with app.app_context():
73 db.create_all()
75 db_session = db
77 # Définition du modèle d'utilisateur pour SQLAlchemy
78 class User(UserMixin, db.Model):
79 __tablename__ = 'flask_user'
80 __table_args__ = {"schema": "mtruser"} # , ForeignKeyConstraint(["mtr_user_id"], ["user.id"])}
81 id = db.Column(db.Integer, primary_key=True)
82 username = db.Column(db.String(100), unique=True)
83 password_hash = db.Column(db.String())
84 mtr_user_id = db.Column(db.Integer)
85# miscinfo = db.Column(db.JSON)
86# secret_token = db.Column(db.String, unique=True)
88 # A rajouter pour les users !
89 # self.secret_token = pyotp.random_base32()
90 def get_is_two_factor_authentication_enabled(self):
91 from server.safia import lpgss_singleton
92 info = lpgss_singleton.select_user(self.mtr_user_id)
93 if "miscinfo" not in info or info["miscinfo"] == None :
94 info["miscinfo"] = {}
95 if "secret_token" not in info["miscinfo"] or info["miscinfo"]["secret_token"] == None:
96 return False
97 else:
98 return True
100 def set_is_two_factor_authentication_enabled(self, value = True):
101 import pyotp
102 from server.safia import lpgss_singleton
103 info = lpgss_singleton.select_user(self.mtr_user_id)
104 if "miscinfo" not in info or info["miscinfo"] == None:
105 info["miscinfo"] = {}
106 if value == False:
107 info["miscinfo"]["secret_token"] = None
109 # db.session.commit()
110 else:
111 info["miscinfo"]["secret_token"] = pyotp.random_base32()
113 lpgss_singleton.update_user(self.mtr_user_id, misc_info=info["miscinfo"])
115 def get_authentication_setup_uri(self):
116 import pyotp
117 from server.safia import lpgss_singleton
118 info = lpgss_singleton.select_user(self.mtr_user_id)
119 if "miscinfo" not in info :
120 info["miscinfo"] = {}
121 if "secret_token" not in info["miscinfo"] or info["miscinfo"]["secret_token"] == None:
122 info["miscinfo"]["secret_token"] = pyotp.random_base32()
123 lpgss_singleton.update_user(self.mtr_user_id, misc_info = info["miscinfo"])
124 # db.session.commit()
125 token_used = info["miscinfo"]["secret_token"]
126 return pyotp.totp.TOTP(token_used).provisioning_uri(
127 name=self.username, issuer_name=app.name)
129 def is_otp_valid(self, user_otp):
130 import pyotp
131 totp = pyotp.parse_uri(self.get_authentication_setup_uri())
132 return totp.verify(user_otp)
134 def set_password(self, password):
135 self.password_hash = generate_password_hash(password)
137 #self.password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
139 #salt = bcrypt.gensalt(10)
140 #print("Salt:", salt)
141 #self.previous_password_hash = self.password_hash
142 #self.password_hash = bcrypt.hashpw(password.encode('utf-8'), salt)
144 def check_password(self, password):
145 #return bcrypt.checkpw(self.password_hash.encode('utf-8'), password.encode('utf-8'))
147 #return check_password_hash(self.password_hash, password)
148 pass_encode = password
149 password_hash_encode = self.password_hash
150 return check_password_hash(password_hash_encode, pass_encode)
152# VR 30-5-25 : needed for oauth2, but I just want to do basic token
153 def get_user_id(self):
154 return self.mtr_user_id
156 #def check_previous_password(self, password):
157 #return bcrypt.checkpw(password.encode('utf-8'), self.previous_password_hash.encode('utf-8'))
159 @login_manager.user_loader
160 def load_user(user_id):
161 try :
162 return User.query.get(int(user_id))
163 except Exception as e:
164 print("Error in load_user due to refacto in SqlAlchemy not migrated in prod (wtf) : " + str(e))
165 return None
167 def public_route(decorated_function):
168 decorated_function.is_public = True
169 return decorated_function
171 @public_route
172 @bp2.route('/signup', methods=['GET', 'POST'])
173 def signup():
174 if request.method == 'POST':
175 username = request.form.get('username')
176 password = request.form.get('password')
177 type_account = request.form.get('type_account')
179 # Vérifiez si l'utilisateur existe déjà
180 existing_user = User.query.filter_by(username=username).first()
181 if existing_user is None:
182 # Création d'un nouveau utilisateur
183 new_user = User(username=username)
184 new_user.set_password(password)
185 db.session.add(new_user)
186 db.session.commit()
188 login_user(new_user)
189 # return redirect(url_for('login.profile'))
190 return redirect(url_for('login.create_mtr_user'))
192 else:
193 return 'L’utilisateur existe déjà'
195 config_app = lcs.get_config_app()
196 return render_template('auth/signup.html',
197 config_app=config_app)
199 @public_route
200 @bp2.route('/login', methods=['GET', 'POST'])
201 def login():
202 if request.method == 'POST':
203 username = request.form.get('username')
204 password = request.form.get('password')
205 redirecturl = request.form.get('redirecturl')
206 user = User.query.filter_by(username=username).first()
208 ipv4 = get_ipv4_from_request(request)
209 from auth.lib_stat_usage import record_usage
210 from server.safia import lpgss_singleton
212 from auth.lib_conf_system import lcs_global_singleton
213 (PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) = lcs_global_singleton.get_pg_conf()
214 from lib.stockage.lib_pg_safia_sys import LibPGSafiaSys
215 one_client = LibPGSafiaSys(PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT)
217 if user and user.check_password(password):
219 # check 2FA and no ip request.environ['REMOTE_ADDR']
220 # VR 3-2-25 : we should not call login_user at this step, but just store temporary user_id or something like this until we have the 2FA code
221 from flask import session
222 session["mtr_user_id"] = user.mtr_user_id
223 if user.get_is_two_factor_authentication_enabled() == True:
224 login_user(user)
225 return redirect(url_for('login.verify_two_factor_auth'))
227# login_user(user)
228 from auth.lib_2fa import login_or_send_code
229 ipv4 = request.environ['REMOTE_ADDR']
230 print("ipv4 l1 : " + str(ipv4))
231 ipv4 = request.remote_addr
232 print("ipv4 l2 : " + str(ipv4))
233 ipv4_list = request.access_route
234 print(" list ipv4 : " + str(ipv4_list))
235 print(" ipv4_list : " + str(type(ipv4_list)))
236 if str(type(ipv4_list)) == "<class 'werkzeug.datastructures.structures.ImmutableList'>" and len(ipv4_list) > 0:
237 ipv4 = ipv4_list[-1]
238 print("ipv4 l3 : " + str(ipv4))
240 ret, step, data = login_or_send_code(user.mtr_user_id, ipv4, redirecturl, one_client)
241 ipv4 = None
242 if 'ipv4' in data:
243 # TODO VR 3-2-25 : IL FAUT CHOISIR SI ON FAIT LE MEME ENDPOINT POUR LE LOGIN ET LA VALIDATION AUQUEL CAS IL FAUT RAJOUTER UN TEST ET aLORS ON PEUT PASSER DE DIFFERENTES MANIERES LES DONN2ES
244 session["ipv4"] = data["ipv4"]
245 ipv4 = data["ipv4"]
246 if step == 'step_2fa':
247 record_usage(user.mtr_user_id, ipv4, type="step_2fa", info_message='step_2fa',
248 lpgss=lpgss_singleton)
249 if ret != '' and ret != None:
250 return redirect(url_for(ret, ipv4 = ipv4)) #, data=data)
251 else :
252 print("Internal error login login")
253 return redirect(url_for('login.login'))
254 elif step == 'login_ip_white_listed':
255 login_user(user)
257 record_usage(user.mtr_user_id, ipv4, type="try_connect", info_message='connexion_ok_with_ip_white_listed',
258 lpgss=lpgss_singleton)
260 return redirect(url_for('uld.uld_app')) # we had saxia_simple
261 else:
263 record_usage(user.mtr_user_id, ipv4, type="try_connect", info_message='unexpected_step_login_197',
264 lpgss=lpgss_singleton)
266 return redirect(url_for('login.login'))
267 # VR TODO analyser d'ou vient ce code
268 return redirect(url_for('saxia_simple'))
269 if redirecturl != "None":
270#### return redirect(url_for('login.profile'))
271 return redirect(redirecturl)
272 else:
273 return redirect(url_for('uld.uld_app')) # we had login.profile
274 else:
275 if user:
276 record_usage(user.mtr_user_id, ipv4, type="try_connect", info_message='wrong_password',
277 lpgss=lpgss_singleton)
279 return 'Identifiants invalides.'
280 # LA on gère en GET
282 config_app = lcs.get_config_app()
284 print(request.referrer)
285 redirecturl = request.args.get('redirecturl', request.referrer)
286 return render_template('auth/login.html',
287 data = {"redirecturl":redirecturl},
288 config_app=config_app)
290 # TODO VR 3-2-25 : useless since we don't want to redirect to this page, or if we put a link in the email
291 # TODO VR 3-2-25 : should use a function in lib_2fa
292 @public_route
293 @bp2.route('/form_enter_validate_code', methods=['GET', 'POST'])
294 # @login_required # VR TODO pourquoi ce login required ?!?
295 def form_enter_validate_code(ipv4 = None, message = None):
296 if request.method == 'GET':
297 from flask_login import current_user
298 print(str(current_user) + " should be none before connexion ")
299 from flask import session
300 message = request.args.get("message", "")
301 ipv4 = request.args.get("ipv4", None)
302 print(" referer : " + str(request.referrer))
303 # TODO VR 3-2-25 : get correct redirecturl
304 redirecturl = request.referrer
305 redirecturl = None
306 config_app = lcs.get_config_app()
307 ipv4_req = request.environ['REMOTE_ADDR']
308 if ipv4_req != ipv4:
309 print("Unexpected ipv4_req : " + str(ipv4_req) + " != " + str(ipv4) + " TROUBLESHOOT THIS USER : ")
310 user_id = session["mtr_user_id"]
311 data = {"user_id": user_id, "ipv4": ipv4, "redirecturl" : redirecturl, "message" : message}
312 #return 'Bienvenue dans votre profil ' + str(current_user.username)
313 return render_template('auth/form_enter_validate_code.html',
314 config_app=config_app,
315 data=data)
316 else : # TODO VR : dedupliquer avec validate_code => mais c'est ca qu'on utilise
318 ipv4 = get_ipv4_from_request(request)
319 from auth.lib_stat_usage import record_usage
320 from server.safia import lpgss_singleton
322 from flask import session
323 mtr_user_id = session["mtr_user_id"]
324 # login_user(user)
325 code = request.form.get('code')
326 iprecord = request.form.get('iprecord')
327 ipv4 = request.form.get('ipv4')
328 ipv4_list = request.access_route
329 print(" list ipv4 : " + str(ipv4_list))
330 if type(ipv4_list) == list and len(ipv4_list) > 0:
331 ipv4 = ipv4_list[-1]
332 print("ipv4 l5 : " + str(ipv4))
333 redirecturl = request.form.get('redirecturl')
334 from flask_login import current_user
335 print(str(current_user))
336 print(" referer : " + str(request.referrer))
337 config_app = lcs.get_config_app()
339 from server.safia import lpgss_singleton
341 from auth.lib_2fa import validate_code_and_record_ip
342 ret = validate_code_and_record_ip(mtr_user_id, code, ipv4, iprecord, lpgss_singleton)
343 if ret == False or ret == None:
344 record_usage(mtr_user_id, ipv4, type="try_connect", info_message="Connexion NOK",
345 lpgss=lpgss_singleton)
347 return redirect(url_for('login.form_enter_validate_code', message = 'Wrong Validation Code Or Already Used, Please Retry or contact support'))
348 else :
350 user = User.query.filter_by(mtr_user_id=mtr_user_id).first()
351 login_user(user)
353 record_usage(mtr_user_id, ipv4, type="try_connect", info_message="Connexion OK",
354 lpgss=lpgss_singleton)
356 if redirecturl != "None":
357 return redirect(redirecturl)
358 else:
359 return redirect(url_for('uld.uld_app')) # avan tlogin.profile
363 @bp2.route('/api/v1/auth/validation/code/get', methods=['GET', 'POST'])
364 def get_validation_code():
366 from flask import session
367 mtr_user_id = session["mtr_user_id"]
369 from server.safia import lpgss_singleton
371 ipv4 = get_ipv4_from_request(request)
373 from auth.lib_2fa import login_or_send_code
374 login_or_send_code(mtr_user_id, ipv4 = ipv4, redirect = None, lpgss = lpgss_singleton)
376 return "Code Renvoyé !"
380 @bp2.route('/api/v1/auth/remove/ips', methods=['GET', 'POST'])
381 @login_required
382 def unvalidate():
383 from server.safia import lpgss_singleton
385 lpgss_singleton.deactivate_all_ips_2fa(current_user.mtr_user_id)
387 return "All ips deactivated !"
391 @bp2.route('/api/v1/auth/authenticator/desactivate', methods=['GET', 'POST'])
392 @login_required
393 def api_v1_auth_authenticator_desactivate():
395 current_user.set_is_two_factor_authentication_enabled(False)
397 return "Desactivate authenticator !"
401 # VR TODO a virer
402 @public_route
403 @bp2.route('/validate_code', methods=['POST', 'GET'])
404# @login_required
405 def validate_code():
406 if request.method == 'POST':
407 from flask import session
408 user_id = session["mtr_user_id"]
409 # login_user(user)
410 code = request.form.get('code')
411 iprecord = request.form.get('iprecord')
412 ipv4 = request.form.get('ipv4')
413 redirecturl = request.form.get('redirecturl')
414 from flask_login import current_user
415 print(str(current_user))
416 print(" referer : " + str(request.referrer))
417 config_app = lcs.get_config_app()
418 ipv4 = request.environ['REMOTE_ADDR']
419 from auth.lib_2fa import validate_code_and_record_ip
420 ret = validate_code_and_record_ip(current_user.id, code, ipv4, iprecord)
421 user = User.query.filter_by(id=user_id).first()
422 login_user(user)
423 if ret == None:
424 return redirect(url_for('auth.form_enter_validate_code'))
425 else :
426 if redirecturl != "None":
427 return redirect(redirecturl)
428 else:
429 return redirect(url_for('uld.uld_app')) # avant login.profile
430 else:
431 print("Unexpected access to POST endpoint validate_code")
432 return redirect(url_for('auth.form_enter_validate_code'))
434 # Inutile remplacer par account
435 @bp2.route('/profile')
436 @login_required
437 def profile():
438 from flask_login import current_user
439 print(str(current_user))
440 print(" referer : " + str(request.referrer))
441 config_app = lcs.get_config_app()
442 #return 'Bienvenue dans votre profil ' + str(current_user.username)
443 return render_template('auth/profile.html',
444 username=current_user.username,
445 mtr_user_id=current_user.mtr_user_id,
446 config_app=config_app)
448 @bp2.route('/logout', methods=['POST', 'GET'])
449 @login_required
450 def logout():
451 logout_user()
452 from flask import session
453 session["mtr_user_id"] = None
454 del session["mtr_user_id"]
455 #return redirect(url_for('index'))
456 return redirect("https://ultradocia.fr")
458 @bp2.route('/create_mtr_user', methods=['GET'])
459 @login_required
460 def create_mtr_user():
461 from server.safia import lpgss_singleton, lib_external_info_from_apia_at, lib_right_singleton, LibSafiaSystem
462 print(str(current_user))
463 type = request.args.get('type', 'saxia')
465 lss = LibSafiaSystem(lib_user_data_internal=lpgss_singleton,
466 lib_user_data_external=lib_external_info_from_apia_at,
467 lib_auth_user_otp=None,
468 lib_right=lib_right_singleton)
470 info_setup = lss.setup_user(current_user.username, "default_project_user_" + current_user.username, "password", "")
472 current_user.mtr_user_id = info_setup["setup_user"] if "setup_user" in info_setup else info_setup["project"]["owner_id"]
473# current_user.mtr_user_id = lpgss_singleton.create_new_user(current_user.username)
474 db.session.commit()
476 if type == "saxia":
477 project_id = info_setup["project"]["id"] if "project" in info_setup and "id" in info_setup["project"] else -1
478 default_project_config_saxia = 70
479 configuration = lss.lib_user_data_internal.load_conf_project(default_project_config_saxia)
480 lss.update_conf_project(project_id, configuration)
482 from flask import make_response
483 response = make_response(redirect(url_for('login.login')))
485# Ce n'est plus nécessaire car on charge automatiquement le projet crée et en plus on veut faire le 2FA
486# response = make_response(redirect('/' + "?project_id=" + str(project_id)))
488 return response #redirect(url_for('/') + "?project_id=" + str(project_id))
490 from flask import make_response
491# response = make_response(redirect('/profile'))
492 response = make_response(redirect(url_for('login.login')))
493 return response
495 @bp2.route('/set_mtr', methods=['GET'])
496 @login_required
497 def set_mtr():
499 if current_user.mtr_user_id != None:
500 return(" Safia User Id Already Set")
502 from server.safia import register_user_get_data
503 # VR TODO 3-2-25 : en fait mtr_user_id est user_id
504 user, data_needed, lss = register_user_get_data(request, list_data_needed=["user_id", "otp"])
505 user_id = lss.get_user_id()
506 otp = data_needed["otp"]
508 print(str(current_user))
509 if (current_user.mtr_user_id == None and user != "anonymous@opio.fr") : #user_id == current_user.id):
510 current_user.mtr_user_id = user_id
511 db.session.commit()
512 return 'MTR user id set, you can make a project now.'
513 else:
514 return 'MTR user id not set, please find a valid otp or contact us at vr@opio.fr with the information displayed on this page. You may have already set it : ' + str(current_user.mtr_user_id) + " " + str(user_id)
518 @app.before_request
519 def check_route_access():
520 return
522 def plop():
523 print(str(current_user))
525 from server.safia import register_user_get_data
526 user, data_needed, lss = register_user_get_data(request, list_data_needed=["user_id", "otp"])
527 user_id = lss.get_user_id()
528 otp = data_needed["otp"]
529 otp_user_login = user != "anonymous@opio.fr"
530# otp_user_login = False
532 if any([otp_user_login,
533 request.path.startswith('static/'),
534 request.endpoint != None and request.endpoint.startswith('static/'),
535 current_user.is_authenticated, # From Flask-Login
536 getattr(app.view_functions[request.endpoint], 'is_public', False)]):
537 return # Access granted
538 else:
539 return redirect(url_for('login.login') + "?redirecturl=" + request.url)
541 @bp2.route('/checkb', methods=['GET'])
542 def checkb_valid_login():
543 return 'checkb is ok'
545 @public_route
546 @bp2.route('/check', methods=['GET'])
547# @app.before_request
548 def check_valid_login():
549 login_valid = current_user.is_authenticated
550 print(str(current_user))
552 if (not login_valid):
553 return 'Login invalid: ' + str(login_valid)
554 else:
555 return 'Login valid: ' + str(login_valid)
557 # https://stackoverflow.com/questions/32815451/are-global-variables-thread-safe-in-flask-how-do-i-share-data-between-requests
558 # on va obliger à utiliser la meme session ? Ou pas le dire ?
559 # ou memcached
560 map_token_user = {}
562 @public_route
563 @bp2.route('/password-reset', methods=['GET', 'POST'])
564 def password_reset():
565 if request.method == 'POST':
566 if request.referrer.split("/")[-1] == "login":
567 username_or_email = request.form.get('username')
569 import secrets
570 token = secrets.token_urlsafe(32)
571# user.reset_password_token = token
572# db.session.commit()
573# lss.lib_user_data_internal.update_user(current_user.mtr_user_id, {"reset_password_token": token})
574 map_token_user[token] = username_or_email
575 import datetime
576 from flask import session
578 from server.safia import lpgss_singleton
579# info_user = lpgss_singleton.select_user(current_user.mtr_user_id)
580 info_user_from_mail = lpgss_singleton.get_user_from_mail(username_or_email)
581 user = User.query.filter_by(username=username_or_email).first()
583 email = None
584 if user == None and info_user_from_mail == None:
585 return 'Aucun user trouvé avec ce username ou email'
586 if info_user_from_mail != None:
587# PAS BIEN DE FAIRE CELA !
588# user.set_is_two_factor_authentication_enabled(False)
589 id = info_user_from_mail["id"]
590 user = User.query.filter_by(mtr_user_id=id).first()
591 if user == None:
592 return "Your safia user (deprecated from otp) is not associated with a saxia user, we will soon set it automaticaly, in the mean time contact assistant@fotonower.com or follow instruction on https://app.ultradocia.fr/help ! "
593 email = username_or_email
594 # while resetting from username
595 if email == None:
596 info_user = lpgss_singleton.select_user(user.mtr_user_id)
597 email = info_user["mail"]
599 username = user.username
600 new_token_data = {"username" : username, "email" : email, "created_at" : datetime.datetime.now(),
601 "valid_until" : datetime.datetime.now() + datetime.timedelta(minutes=10)}
602 print(" new_token_data : " + str(new_token_data))
603 session[token] = new_token_data
605 print(" username : " + username)
606 print(f"Token : {token}")
607 url_reset_password = url_for('login.password_reset', token=token, _external=True)
608 print(" url_reset_password : " + url_reset_password + " generated, maybe due to apache2 managing ssl, flask don't know it is in https ")
609 url_reset_password = url_reset_password.replace("http://", "https://")
610 # "/api/v1/safia # api_v1_safia_query
611 import urllib
612 message_encoded = urllib.parse.quote(url_reset_password) # urllib.parse.quote_plus(string, safe='', encoding=None, errors=None)
613 vraiment_pas_bien = url_for("query", _external=True) + "?datou-50=true&object=simple_text_query&input_csv=from_mail_to_send%3Dvr%40opio.fr%2Cuseless%3Ddummy%2Cresult%3D" + message_encoded + "%2Crequest%3Duseless%2Cfile%3Duseless"
615 print(" password_reset vraiment_pas_bien : " + str(vraiment_pas_bien))
617 import requests
618 ret = requests.get(vraiment_pas_bien)
620 print(" after request vraiment_pas_bien :" + vraiment_pas_bien)
622 ipv4 = get_ipv4_from_request(request)
623 from auth.lib_stat_usage import record_usage
624 from server.safia import lpgss_singleton
625 record_usage(user.mtr_user_id, ipv4, type="link_reset_password", info_message= url_reset_password, lpgss=lpgss_singleton)
627 from lib.lib_www.lib_www_util import send_message
628 message = """
629 <html>
630 <head></head>
631 <body>
632 <p>Click on the link to reset your Saxia password:</p>
633 <a href='""" + url_reset_password + """'>Reset Password</a>
635 It will be valid for 10 minutes !
637 </body>
638 </html>
639 """
640 message = "Click on the link to reset your Saxia password:\n" + url_reset_password + "\n<br>\nIt will be valid for 10 minutes only from the same browser and computer or mobile device !"
641 ret = send_message(to=email,
642 message=message,
643 object_message="Reset Password",
644 verbose=False)
645 print(str(ret))
647# print(" Ici on envoi un email !")
648# redirect(vraiment_pas_bien) #, code=307)
649 from flask import flash
650# flash("Email sent if your mail is correct !", "success")
651# return redirect(url_for("uld.uld_app"))
652 #flash("Email sent if your mail is correct !", "success")
653 return("Email sent if your mail is correct !")
656 else :
657 token = request.form.get('token')
659 from flask import session
660 import datetime
661 if token in session:
662 token_info = session[token]
663 if token_info["valid_until"] > datetime.datetime.now():
664 username = token_info["username"]
665 user = User.query.filter_by(username=username).first()
666 if user == None:
667 return 'Aucun user trouvé avec ce username, please signup !'
668 new_password = request.form.get('new_password')
669 user.set_password(new_password)
670 db.session.commit()
672 ipv4 = get_ipv4_from_request(request)
673 from auth.lib_stat_usage import record_usage
674 from server.safia import lpgss_singleton
675 record_usage(user.mtr_user_id, ipv4, type="link_reset_password",
676 info_message="Password Rest", lpgss=lpgss_singleton)
678 return 'Password reset successful'
679 else:
680 return "Token not found, please retry or send a mail to support@fotonower.com with object SAXIA RESET PASSWORD"
681 else:
682 print("Token not found in session it shouldn't be found in map_token_user that is deprecetad and not multi process plop")
683 if token in map_token_user:
684 username = map_token_user[token]
685 #user = User.query.get(user_id)
686 user = User.query.filter_by(username=username).first()
687 new_password = request.form.get('new_password')
688 user.set_password(new_password)
689 db.session.commit()
690 print("Internal error but it works ! => ON N=e DOIT JAMAIS ETRE LA")
691 return 'Password reset successful'
692 else:
693 print("Indeed")
694 return "Token not found, please retry or send a mail to support@fotonower.com with object SAXIA reset PASSWORD"
695# return render_template('auth/password_reset.html')
696# return 'Aucun user trouvé avec ce username'
697 else:
698 token = request.args.get('token')
699 return render_template('auth/password_reset.html', token=token)
701 @bp2.route('/api/v1/admin/usage', methods=['GET'])
702 def api_v1_admin_usage():
703 print("Inside saxia admin/all_users endpoint")
704 from server.safia import register_user_get_data
705 user, data_needed, lss = register_user_get_data(request, list_data_needed=[])
707 user_id = lss.get_user_id()
708 check = lss.lib_right.is_user_admin_config(user_id)
709 if check:
710 type = request.args.get('type')
711 nb_day = request.args.get('nb_day')
712 mtr_user_id = request.args.get('mtr_user_id')
713 ipv4 = request.args.get('ipv4')
715 from server.safia import lpgss_singleton
716 from auth.lib_stat_usage import get_usage
718 data = get_usage(lpgss_singleton, mtr_user_id, ipv4, nb_day=nb_day, type=type)
719 return data
720 else :
721 return "You are not allowed to access this endpoint"
723 from lib.lib_util import get_b64encoded_qr_image
725 @bp2.route("/setup-2fa")
726 @login_required
727 def setup_two_factor_auth():
728 # secret = current_user.secret_token
729#
730 uri = current_user.get_authentication_setup_uri()
731 from server.safia import lpgss_singleton
732 info = lpgss_singleton.get_user_info_from_id(current_user.mtr_user_id)
733 secret = info["miscinfo"]["secret_token"]
734 base64_qr_image = get_b64encoded_qr_image(uri)
735 return render_template("auth/setup-2fa.html", secret=secret, qr_image=base64_qr_image)
738# from auth.lib_stat_usage import get_usage, record_usage
739# def record_usage(mtr_user_id, ipv4, type, info_message, lpgss):
740 from flask_wtf import FlaskForm
741 from wtforms import EmailField, PasswordField
742 from wtforms.validators import DataRequired, Email, EqualTo, Length, InputRequired
743 from wtforms.fields import StringField, SubmitField
745 class TwoFactorForm(FlaskForm):
746 otp = StringField('Enter OTP', validators=[
747 InputRequired(), Length(min=6, max=6)])
749 class LoginForm(FlaskForm):
750 username = StringField("Username", validators=[DataRequired()])
751 password = PasswordField("Password", validators=[DataRequired()])
753 from flask import flash
755 HOME_URL = "uld.uld_app"
756 SETUP_2FA_URL = "login.setup_two_factor_auth"
757 VERIFY_2FA_URL = "login.verify_two_factor_auth"
759 @bp2.route("/verify-2fa", methods=["GET", "POST"])
760 @login_required
761 def verify_two_factor_auth():
762 form = TwoFactorForm(request.form)
763 if form.validate_on_submit():
764 if current_user.is_otp_valid(form.otp.data):
765 if current_user.get_is_two_factor_authentication_enabled():
766 flash("2FA verification successful. You are logged in!", "success")
767 return redirect(url_for(HOME_URL))
768 else:
769 try:
770 current_user.set_is_two_factor_authentication_enabled(True)
772# db.session.commit()
773 flash("2FA setup successful. You are logged in!", "success")
774 return redirect(url_for(HOME_URL))
775 except Exception:
776 db.session.rollback()
777 flash("2FA setup failed. Please try again.", "danger")
778 return redirect(url_for(VERIFY_2FA_URL))
779 else:
780 flash("Invalid OTP. Please try again.", "danger")
781 return redirect(url_for(VERIFY_2FA_URL))
782 else:
783 if not current_user.get_is_two_factor_authentication_enabled():
784 flash(
785 "You have not enabled 2-Factor Authentication. Please enable it first.", "info")
786 return render_template("auth/verify-2fa.html", form=form)
789 @bp2.route("/login_2fa", methods=["GET", "POST"])
790 def login_two_factor_auth():
791 if current_user.is_authenticated:
792 if current_user.get_is_two_factor_authentication_enabled():
793 flash("You are already logged in.", "info")
794 return redirect(url_for(HOME_URL))
795 else:
796 flash("You have not enabled 2-Factor Authentication. Please enable first to login.", "info")
797 return redirect(url_for(SETUP_2FA_URL))
799 form = LoginForm(request.form)
800 if form.validate_on_submit():
801 user = User.query.filter_by(username=form.username.data).first()
802 if user and bcrypt_app.check_password_hash(user.password, request.form["password"]):
803 login_user(user)
804 if not current_user.get_is_two_factor_authentication_enabled():
805 flash(
806 "You have not enabled 2-Factor Authentication. Please enable first to login.", "info")
807 return redirect(url_for(SETUP_2FA_URL))
808 return redirect(url_for(VERIFY_2FA_URL))
809 elif not user:
810 flash("You are not registered. Please register.", "danger")
811 else:
812 flash("Invalid username and/or password.", "danger")
813 return render_template("auth/login-2fa.html", form=form)
815 @bp2.route("/api/v1/token/create", methods=["POST"])
816 @login_required
817 def api_v1_token_create():
818 from server.safia import register_user_get_data
819 user, data_needed, lss = register_user_get_data(request, list_data_needed=[])
820 from flask_login import current_user
821 if current_user.is_authenticated:
822 print("On y croit")
824 # request = flask.request
825 data_json = request.json
826 description = data_json.get("token_name", "")
827 from auth.lib_auth import create_id
828 token = "safia_" + create_id()
829 ret = lss.lib_user_data_internal.create_token(
830 current_user.mtr_user_id, description=description, token=token)
832 if ret is not None:
833 from flask import jsonify
834 return jsonify({"status": "success", "token": token}), 201
835 else:
836 from flask import jsonify
837 return jsonify({"status": "error", "message": "Token creation failed"}), 500
839 @bp2.route("/api/v1/token/revoke", methods=["POST"])
840 @login_required
841 def api_v1_token_revoke():
842 if current_user.is_authenticated:
843 print("On y croit")
845# def update_token(self, token, configuration=None, project_id=None, mtr_user_id=None, set_not_valid=True,
846# description=None):
848 return "Token could be revoked !"
850 @bp2.route("/api/v1/token/update", methods=["POST"])
851 @login_required
852 def api_v1_token_update():
853 if current_user.is_authenticated:
854 print("On y croit")
855 return "Token Update !"
857 @bp2.route("/api/v1/token/list", methods=["POST", "GET"])
858 @login_required
859 def api_v1_token_index():
860 from server.safia import register_user_get_data
861 user, data_needed, lss = register_user_get_data(request, list_data_needed=[])
862 from flask_login import current_user
863 if current_user.is_authenticated:
864 print("On y croit")
866 mtr_user_id_used = current_user.mtr_user_id
867 user_id_input = request.args.get('user_id', None)
868 if user_id_input != None and user_id_input != current_user.mtr_user_id :
869 is_admin = lss.lib_right.is_user_admin_config(current_user.mtr_user_id)
870 if not is_admin:
871 from flask import jsonify
872 return jsonify({"status": "error", "message": "User is not admin"}), 403
873 else :
874 mtr_user_id_used = user_id_input
876 list_tokens = lss.lib_user_data_internal.get_token(mtr_user_id_used)
877 if list_tokens is None:
878 list_tokens = []
879 from lib.lib_util import humanize_modified_time
880 for t in list_tokens:
881 valid_until = t.get("valid_until", None)
882 if valid_until is not None:
883 t["valid_until_h"] = humanize_modified_time(valid_until)
884 from flask import jsonify
885 if list_tokens is not None:
886 return jsonify({"status": "success", "tokens": list_tokens}), 200
887 else:
888 return jsonify({"status": "error", "message": "Token retrieval failed"}), 500
891 # VR 8-6-25 : n'y a t'il pas un doublon qqpart ?
892 @bp2.route("/api/v1/auth/authenticator/deactivate", methods=["POST", "GET"])
893 @login_required
894 def api_v1_auth_authenticator_deactivate():
895 from server.safia import register_user_get_data
896 user, data_needed, lss = register_user_get_data(request, list_data_needed=[])
897 from flask_login import current_user
898 from flask import jsonify
900 if current_user.is_authenticated:
901 print("On y croit")
903 # check if user is admin
904 from server.safia import lpgss_singleton
905 is_admin = lss.lib_right.is_user_admin_config(current_user.mtr_user_id)
906 user_id_input = request.args.get('user_id', None)
908 mtr_user_id_used = current_user.mtr_user_id
909 user_used = current_user
910 user_id_input = request.json.get('user_id', None)
911 if user_id_input != None and user_id_input != current_user.mtr_user_id:
912 is_admin = lss.lib_right.is_user_admin_config(current_user.mtr_user_id)
913 if not is_admin:
914 from flask import jsonify
915 return jsonify({"status": "error", "message": "User is not admin"}), 403
916 else:
917 mtr_user_id_used = user_id_input
918 user_used = User.query.filter_by(mtr_user_id=mtr_user_id_used).first()
922# login_user(user)
924# User(mtr_user_id=user_id_input)
925 print(str(user_used))
926 is_activated = user_used.get_is_two_factor_authentication_enabled()
927 if is_activated:
928 print("Still to do")
930 user_used.set_is_two_factor_authentication_enabled(False)
932# return "Authenticator deactivated !"
933 return jsonify({"status": "success", "message": "Authenticator deactivated !"}), 200
934 else :
935 print("Already deactivated")
936 #return "Authenticator already deactivated !"
937 return jsonify({"status": "success", "message": "Authenticator already deactivated !"}), 200
938 else:
939 return jsonify({"status": "error", "message": "User not authenticated"}), 401
941 return bp2, User