Coverage for server/bp/auth.py: 25%

602 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-01-26 23:58 +0100

1 

2def get_ipv4_from_request(request): 

3 ipv4 = request.environ['REMOTE_ADDR'] 

4 print("ipv4 l1 get_ipv4_from_request : " + str(ipv4)) 

5 ipv4_list = request.access_route 

6 print(" list ipv4 : " + str(ipv4_list)) 

7 if str(type(ipv4_list)) == "<class 'werkzeug.datastructures.structures.ImmutableList'>" and len(ipv4_list) > 0: 

8 ipv4 = ipv4_list[-1] 

9 

10 print("ipv4 in get_ipv4_from_request : " + str(ipv4)) 

11 

12 return ipv4 

13 

14db_session = None 

15 

16import bcrypt 

17from flask_bcrypt import Bcrypt 

18 

19from flask import redirect, url_for, render_template, request, Blueprint 

20def create_login_bp(app, db = None): 

21 from flask_login import LoginManager 

22 from flask_sqlalchemy import SQLAlchemy 

23 from werkzeug.security import generate_password_hash, check_password_hash 

24 from flask_login import UserMixin, login_user, logout_user, login_required, current_user 

25 

26 bp2 = Blueprint('login', __name__) 

27 

28 login_manager = LoginManager() 

29 login_manager.init_app(app) 

30 bcrypt_app = Bcrypt(app) 

31 # VR 15/11/25 : ca c'est un bug que claude a laissé trainer 

32 from auth.lib_conf_system import LibConfSystem as LCS 

33 lcs = LCS() 

34 

35 if db is None: 

36 

37 (PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) = lcs.get_pg_conf() 

38 

39 app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql+psycopg2://user:password@localhost/dbname' 

40 app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql+psycopg2://' + PG_USER + ':' + PG_PASSWORD + '@localhost/' + PG_DB 

41 app.config[ 

42 'SQLALCHEMY_DATABASE_URI'] = 'postgresql+psycopg2://' + PG_USER + ':' + PG_PASSWORD + '@' + PG_HOST + ':' + str( 

43 PG_PORT) + '/' + PG_DB 

44 

45 # Line to decomment VR 22/4/24 

46 # app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://victor_r:VR2001%40FTN01@192.168.1.48:3306/MTRBack' #?useUnicode=true&characterSetResults=utf-8&zeroDateTimeBehavior=CONVERT_TO_NULL&character-set-client-handshake=FALSE&serverTimezone=Europe/Paris' 

47 # , pool_recycle)=3600, pool_size=50, max_overflow=50) 

48 

49 

50 

51 

52 

53 # dbConnection = sqlEngine.connect() 

54 

55 # from sqlalchemy import create_engine 

56 # engine = create_engine("postgresql+pg8000://dbuser:kx%40jj5%2Fg@pghost10/appdb") 

57 

58 #app.config['SQLALCHEMY_DATABASE_URI'] = f'postgresql+psycopg2://{PG_USER}:{PG_PASSWORD}@localhost/{PG_DB}' 

59 app.config['SECRET_KEY'] = 'your_secret_key_here' 

60 

61# db = SQLAlchemy(app)cr 

62 

63 db = SQLAlchemy() 

64 db.init_app(app) 

65 bp2.db = db 

66 with app.app_context(): 

67 db.create_all() 

68 

69 db_session = db 

70 else: 

71 bp2.db = db 

72 with app.app_context(): 

73 db.create_all() 

74 

75 db_session = db 

76 

77 # Définition du modèle d'utilisateur pour SQLAlchemy 

78 class User(UserMixin, db.Model): 

79 __tablename__ = 'flask_user' 

80 __table_args__ = {"schema": "mtruser"} # , ForeignKeyConstraint(["mtr_user_id"], ["user.id"])} 

81 id = db.Column(db.Integer, primary_key=True) 

82 username = db.Column(db.String(100), unique=True) 

83 password_hash = db.Column(db.String()) 

84 mtr_user_id = db.Column(db.Integer) 

85# miscinfo = db.Column(db.JSON) 

86# secret_token = db.Column(db.String, unique=True) 

87 

88 # A rajouter pour les users ! 

89 # self.secret_token = pyotp.random_base32() 

90 def get_is_two_factor_authentication_enabled(self): 

91 from server.safia import lpgss_singleton 

92 info = lpgss_singleton.select_user(self.mtr_user_id) 

93 if "miscinfo" not in info or info["miscinfo"] == None : 

94 info["miscinfo"] = {} 

95 if "secret_token" not in info["miscinfo"] or info["miscinfo"]["secret_token"] == None: 

96 return False 

97 else: 

98 return True 

99 

100 def set_is_two_factor_authentication_enabled(self, value = True): 

101 import pyotp 

102 from server.safia import lpgss_singleton 

103 info = lpgss_singleton.select_user(self.mtr_user_id) 

104 if "miscinfo" not in info or info["miscinfo"] == None: 

105 info["miscinfo"] = {} 

106 if value == False: 

107 info["miscinfo"]["secret_token"] = None 

108 

109 # db.session.commit() 

110 else: 

111 info["miscinfo"]["secret_token"] = pyotp.random_base32() 

112 

113 lpgss_singleton.update_user(self.mtr_user_id, misc_info=info["miscinfo"]) 

114 

115 def get_authentication_setup_uri(self): 

116 import pyotp 

117 from server.safia import lpgss_singleton 

118 info = lpgss_singleton.select_user(self.mtr_user_id) 

119 if "miscinfo" not in info : 

120 info["miscinfo"] = {} 

121 if "secret_token" not in info["miscinfo"] or info["miscinfo"]["secret_token"] == None: 

122 info["miscinfo"]["secret_token"] = pyotp.random_base32() 

123 lpgss_singleton.update_user(self.mtr_user_id, misc_info = info["miscinfo"]) 

124 # db.session.commit() 

125 token_used = info["miscinfo"]["secret_token"] 

126 return pyotp.totp.TOTP(token_used).provisioning_uri( 

127 name=self.username, issuer_name=app.name) 

128 

129 def is_otp_valid(self, user_otp): 

130 import pyotp 

131 totp = pyotp.parse_uri(self.get_authentication_setup_uri()) 

132 return totp.verify(user_otp) 

133 

134 def set_password(self, password): 

135 self.password_hash = generate_password_hash(password) 

136 

137 #self.password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) 

138 

139 #salt = bcrypt.gensalt(10) 

140 #print("Salt:", salt) 

141 #self.previous_password_hash = self.password_hash 

142 #self.password_hash = bcrypt.hashpw(password.encode('utf-8'), salt) 

143 

144 def check_password(self, password): 

145 #return bcrypt.checkpw(self.password_hash.encode('utf-8'), password.encode('utf-8')) 

146 

147 #return check_password_hash(self.password_hash, password) 

148 pass_encode = password 

149 password_hash_encode = self.password_hash 

150 return check_password_hash(password_hash_encode, pass_encode) 

151 

152# VR 30-5-25 : needed for oauth2, but I just want to do basic token 

153 def get_user_id(self): 

154 return self.mtr_user_id 

155 

156 #def check_previous_password(self, password): 

157 #return bcrypt.checkpw(password.encode('utf-8'), self.previous_password_hash.encode('utf-8')) 

158 

159 @login_manager.user_loader 

160 def load_user(user_id): 

161 try : 

162 return User.query.get(int(user_id)) 

163 except Exception as e: 

164 print("Error in load_user due to refacto in SqlAlchemy not migrated in prod (wtf) : " + str(e)) 

165 return None 

166 

167 def public_route(decorated_function): 

168 decorated_function.is_public = True 

169 return decorated_function 

170 

171 @public_route 

172 @bp2.route('/signup', methods=['GET', 'POST']) 

173 def signup(): 

174 if request.method == 'POST': 

175 username = request.form.get('username') 

176 password = request.form.get('password') 

177 type_account = request.form.get('type_account') 

178 

179 # Vérifiez si l'utilisateur existe déjà 

180 existing_user = User.query.filter_by(username=username).first() 

181 if existing_user is None: 

182 # Création d'un nouveau utilisateur 

183 new_user = User(username=username) 

184 new_user.set_password(password) 

185 db.session.add(new_user) 

186 db.session.commit() 

187 

188 login_user(new_user) 

189 # return redirect(url_for('login.profile')) 

190 return redirect(url_for('login.create_mtr_user')) 

191 

192 else: 

193 return 'L’utilisateur existe déjà' 

194 

195 config_app = lcs.get_config_app() 

196 return render_template('auth/signup.html', 

197 config_app=config_app) 

198 

199 @public_route 

200 @bp2.route('/login', methods=['GET', 'POST']) 

201 def login(): 

202 if request.method == 'POST': 

203 username = request.form.get('username') 

204 password = request.form.get('password') 

205 redirecturl = request.form.get('redirecturl') 

206 user = User.query.filter_by(username=username).first() 

207 

208 ipv4 = get_ipv4_from_request(request) 

209 from auth.lib_stat_usage import record_usage 

210 from server.safia import lpgss_singleton 

211 

212 from auth.lib_conf_system import lcs_global_singleton 

213 (PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) = lcs_global_singleton.get_pg_conf() 

214 from lib.stockage.lib_pg_safia_sys import LibPGSafiaSys 

215 one_client = LibPGSafiaSys(PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) 

216 

217 if user and user.check_password(password): 

218 

219 # check 2FA and no ip request.environ['REMOTE_ADDR'] 

220 # VR 3-2-25 : we should not call login_user at this step, but just store temporary user_id or something like this until we have the 2FA code 

221 from flask import session 

222 session["mtr_user_id"] = user.mtr_user_id 

223 if user.get_is_two_factor_authentication_enabled() == True: 

224 login_user(user) 

225 return redirect(url_for('login.verify_two_factor_auth')) 

226 

227# login_user(user) 

228 from auth.lib_2fa import login_or_send_code 

229 ipv4 = request.environ['REMOTE_ADDR'] 

230 print("ipv4 l1 : " + str(ipv4)) 

231 ipv4 = request.remote_addr 

232 print("ipv4 l2 : " + str(ipv4)) 

233 ipv4_list = request.access_route 

234 print(" list ipv4 : " + str(ipv4_list)) 

235 print(" ipv4_list : " + str(type(ipv4_list))) 

236 if str(type(ipv4_list)) == "<class 'werkzeug.datastructures.structures.ImmutableList'>" and len(ipv4_list) > 0: 

237 ipv4 = ipv4_list[-1] 

238 print("ipv4 l3 : " + str(ipv4)) 

239 

240 ret, step, data = login_or_send_code(user.mtr_user_id, ipv4, redirecturl, one_client) 

241 ipv4 = None 

242 if 'ipv4' in data: 

243 # TODO VR 3-2-25 : IL FAUT CHOISIR SI ON FAIT LE MEME ENDPOINT POUR LE LOGIN ET LA VALIDATION AUQUEL CAS IL FAUT RAJOUTER UN TEST ET aLORS ON PEUT PASSER DE DIFFERENTES MANIERES LES DONN2ES 

244 session["ipv4"] = data["ipv4"] 

245 ipv4 = data["ipv4"] 

246 if step == 'step_2fa': 

247 record_usage(user.mtr_user_id, ipv4, type="step_2fa", info_message='step_2fa', 

248 lpgss=lpgss_singleton) 

249 if ret != '' and ret != None: 

250 return redirect(url_for(ret, ipv4 = ipv4)) #, data=data) 

251 else : 

252 print("Internal error login login") 

253 return redirect(url_for('login.login')) 

254 elif step == 'login_ip_white_listed': 

255 login_user(user) 

256 

257 record_usage(user.mtr_user_id, ipv4, type="try_connect", info_message='connexion_ok_with_ip_white_listed', 

258 lpgss=lpgss_singleton) 

259 

260 return redirect(url_for('uld.uld_app')) # we had saxia_simple 

261 else: 

262 

263 record_usage(user.mtr_user_id, ipv4, type="try_connect", info_message='unexpected_step_login_197', 

264 lpgss=lpgss_singleton) 

265 

266 return redirect(url_for('login.login')) 

267 # VR TODO analyser d'ou vient ce code 

268 return redirect(url_for('saxia_simple')) 

269 if redirecturl != "None": 

270#### return redirect(url_for('login.profile')) 

271 return redirect(redirecturl) 

272 else: 

273 return redirect(url_for('uld.uld_app')) # we had login.profile 

274 else: 

275 if user: 

276 record_usage(user.mtr_user_id, ipv4, type="try_connect", info_message='wrong_password', 

277 lpgss=lpgss_singleton) 

278 

279 return 'Identifiants invalides.' 

280 # LA on gère en GET 

281 

282 config_app = lcs.get_config_app() 

283 

284 print(request.referrer) 

285 redirecturl = request.args.get('redirecturl', request.referrer) 

286 return render_template('auth/login.html', 

287 data = {"redirecturl":redirecturl}, 

288 config_app=config_app) 

289 

290 # TODO VR 3-2-25 : useless since we don't want to redirect to this page, or if we put a link in the email 

291 # TODO VR 3-2-25 : should use a function in lib_2fa 

292 @public_route 

293 @bp2.route('/form_enter_validate_code', methods=['GET', 'POST']) 

294 # @login_required # VR TODO pourquoi ce login required ?!? 

295 def form_enter_validate_code(ipv4 = None, message = None): 

296 if request.method == 'GET': 

297 from flask_login import current_user 

298 print(str(current_user) + " should be none before connexion ") 

299 from flask import session 

300 message = request.args.get("message", "") 

301 ipv4 = request.args.get("ipv4", None) 

302 print(" referer : " + str(request.referrer)) 

303 # TODO VR 3-2-25 : get correct redirecturl 

304 redirecturl = request.referrer 

305 redirecturl = None 

306 config_app = lcs.get_config_app() 

307 ipv4_req = request.environ['REMOTE_ADDR'] 

308 if ipv4_req != ipv4: 

309 print("Unexpected ipv4_req : " + str(ipv4_req) + " != " + str(ipv4) + " TROUBLESHOOT THIS USER : ") 

310 user_id = session["mtr_user_id"] 

311 data = {"user_id": user_id, "ipv4": ipv4, "redirecturl" : redirecturl, "message" : message} 

312 #return 'Bienvenue dans votre profil ' + str(current_user.username) 

313 return render_template('auth/form_enter_validate_code.html', 

314 config_app=config_app, 

315 data=data) 

316 else : # TODO VR : dedupliquer avec validate_code => mais c'est ca qu'on utilise 

317 

318 ipv4 = get_ipv4_from_request(request) 

319 from auth.lib_stat_usage import record_usage 

320 from server.safia import lpgss_singleton 

321 

322 from flask import session 

323 mtr_user_id = session["mtr_user_id"] 

324 # login_user(user) 

325 code = request.form.get('code') 

326 iprecord = request.form.get('iprecord') 

327 ipv4 = request.form.get('ipv4') 

328 ipv4_list = request.access_route 

329 print(" list ipv4 : " + str(ipv4_list)) 

330 if type(ipv4_list) == list and len(ipv4_list) > 0: 

331 ipv4 = ipv4_list[-1] 

332 print("ipv4 l5 : " + str(ipv4)) 

333 redirecturl = request.form.get('redirecturl') 

334 from flask_login import current_user 

335 print(str(current_user)) 

336 print(" referer : " + str(request.referrer)) 

337 config_app = lcs.get_config_app() 

338 

339 from server.safia import lpgss_singleton 

340 

341 from auth.lib_2fa import validate_code_and_record_ip 

342 ret = validate_code_and_record_ip(mtr_user_id, code, ipv4, iprecord, lpgss_singleton) 

343 if ret == False or ret == None: 

344 record_usage(mtr_user_id, ipv4, type="try_connect", info_message="Connexion NOK", 

345 lpgss=lpgss_singleton) 

346 

347 return redirect(url_for('login.form_enter_validate_code', message = 'Wrong Validation Code Or Already Used, Please Retry or contact support')) 

348 else : 

349 

350 user = User.query.filter_by(mtr_user_id=mtr_user_id).first() 

351 login_user(user) 

352 

353 record_usage(mtr_user_id, ipv4, type="try_connect", info_message="Connexion OK", 

354 lpgss=lpgss_singleton) 

355 

356 if redirecturl != "None": 

357 return redirect(redirecturl) 

358 else: 

359 return redirect(url_for('uld.uld_app')) # avan tlogin.profile 

360 

361 

362 

363 @bp2.route('/api/v1/auth/validation/code/get', methods=['GET', 'POST']) 

364 def get_validation_code(): 

365 

366 from flask import session 

367 mtr_user_id = session["mtr_user_id"] 

368 

369 from server.safia import lpgss_singleton 

370 

371 ipv4 = get_ipv4_from_request(request) 

372 

373 from auth.lib_2fa import login_or_send_code 

374 login_or_send_code(mtr_user_id, ipv4 = ipv4, redirect = None, lpgss = lpgss_singleton) 

375 

376 return "Code Renvoyé !" 

377 

378 

379 

380 @bp2.route('/api/v1/auth/remove/ips', methods=['GET', 'POST']) 

381 @login_required 

382 def unvalidate(): 

383 from server.safia import lpgss_singleton 

384 

385 lpgss_singleton.deactivate_all_ips_2fa(current_user.mtr_user_id) 

386 

387 return "All ips deactivated !" 

388 

389 

390 

391 @bp2.route('/api/v1/auth/authenticator/desactivate', methods=['GET', 'POST']) 

392 @login_required 

393 def api_v1_auth_authenticator_desactivate(): 

394 

395 current_user.set_is_two_factor_authentication_enabled(False) 

396 

397 return "Desactivate authenticator !" 

398 

399 

400 

401 # VR TODO a virer 

402 @public_route 

403 @bp2.route('/validate_code', methods=['POST', 'GET']) 

404# @login_required 

405 def validate_code(): 

406 if request.method == 'POST': 

407 from flask import session 

408 user_id = session["mtr_user_id"] 

409 # login_user(user) 

410 code = request.form.get('code') 

411 iprecord = request.form.get('iprecord') 

412 ipv4 = request.form.get('ipv4') 

413 redirecturl = request.form.get('redirecturl') 

414 from flask_login import current_user 

415 print(str(current_user)) 

416 print(" referer : " + str(request.referrer)) 

417 config_app = lcs.get_config_app() 

418 ipv4 = request.environ['REMOTE_ADDR'] 

419 from auth.lib_2fa import validate_code_and_record_ip 

420 ret = validate_code_and_record_ip(current_user.id, code, ipv4, iprecord) 

421 user = User.query.filter_by(id=user_id).first() 

422 login_user(user) 

423 if ret == None: 

424 return redirect(url_for('auth.form_enter_validate_code')) 

425 else : 

426 if redirecturl != "None": 

427 return redirect(redirecturl) 

428 else: 

429 return redirect(url_for('uld.uld_app')) # avant login.profile 

430 else: 

431 print("Unexpected access to POST endpoint validate_code") 

432 return redirect(url_for('auth.form_enter_validate_code')) 

433 

434 # Inutile remplacer par account 

435 @bp2.route('/profile') 

436 @login_required 

437 def profile(): 

438 from flask_login import current_user 

439 print(str(current_user)) 

440 print(" referer : " + str(request.referrer)) 

441 config_app = lcs.get_config_app() 

442 #return 'Bienvenue dans votre profil ' + str(current_user.username) 

443 return render_template('auth/profile.html', 

444 username=current_user.username, 

445 mtr_user_id=current_user.mtr_user_id, 

446 config_app=config_app) 

447 

448 @bp2.route('/logout', methods=['POST', 'GET']) 

449 @login_required 

450 def logout(): 

451 logout_user() 

452 from flask import session 

453 session["mtr_user_id"] = None 

454 del session["mtr_user_id"] 

455 #return redirect(url_for('index')) 

456 return redirect("https://ultradocia.fr") 

457 

458 @bp2.route('/create_mtr_user', methods=['GET']) 

459 @login_required 

460 def create_mtr_user(): 

461 from server.safia import lpgss_singleton, lib_external_info_from_apia_at, lib_right_singleton, LibSafiaSystem 

462 print(str(current_user)) 

463 type = request.args.get('type', 'saxia') 

464 

465 lss = LibSafiaSystem(lib_user_data_internal=lpgss_singleton, 

466 lib_user_data_external=lib_external_info_from_apia_at, 

467 lib_auth_user_otp=None, 

468 lib_right=lib_right_singleton) 

469 

470 info_setup = lss.setup_user(current_user.username, "default_project_user_" + current_user.username, "password", "") 

471 

472 current_user.mtr_user_id = info_setup["setup_user"] if "setup_user" in info_setup else info_setup["project"]["owner_id"] 

473# current_user.mtr_user_id = lpgss_singleton.create_new_user(current_user.username) 

474 db.session.commit() 

475 

476 if type == "saxia": 

477 project_id = info_setup["project"]["id"] if "project" in info_setup and "id" in info_setup["project"] else -1 

478 default_project_config_saxia = 70 

479 configuration = lss.lib_user_data_internal.load_conf_project(default_project_config_saxia) 

480 lss.update_conf_project(project_id, configuration) 

481 

482 from flask import make_response 

483 response = make_response(redirect(url_for('login.login'))) 

484 

485# Ce n'est plus nécessaire car on charge automatiquement le projet crée et en plus on veut faire le 2FA 

486# response = make_response(redirect('/' + "?project_id=" + str(project_id))) 

487 

488 return response #redirect(url_for('/') + "?project_id=" + str(project_id)) 

489 

490 from flask import make_response 

491# response = make_response(redirect('/profile')) 

492 response = make_response(redirect(url_for('login.login'))) 

493 return response 

494 

495 @bp2.route('/set_mtr', methods=['GET']) 

496 @login_required 

497 def set_mtr(): 

498 

499 if current_user.mtr_user_id != None: 

500 return(" Safia User Id Already Set") 

501 

502 from server.safia import register_user_get_data 

503 # VR TODO 3-2-25 : en fait mtr_user_id est user_id 

504 user, data_needed, lss = register_user_get_data(request, list_data_needed=["user_id", "otp"]) 

505 user_id = lss.get_user_id() 

506 otp = data_needed["otp"] 

507 

508 print(str(current_user)) 

509 if (current_user.mtr_user_id == None and user != "anonymous@opio.fr") : #user_id == current_user.id): 

510 current_user.mtr_user_id = user_id 

511 db.session.commit() 

512 return 'MTR user id set, you can make a project now.' 

513 else: 

514 return 'MTR user id not set, please find a valid otp or contact us at vr@opio.fr with the information displayed on this page. You may have already set it : ' + str(current_user.mtr_user_id) + " " + str(user_id) 

515 

516 

517 

518 @app.before_request 

519 def check_route_access(): 

520 return 

521 

522 def plop(): 

523 print(str(current_user)) 

524 

525 from server.safia import register_user_get_data 

526 user, data_needed, lss = register_user_get_data(request, list_data_needed=["user_id", "otp"]) 

527 user_id = lss.get_user_id() 

528 otp = data_needed["otp"] 

529 otp_user_login = user != "anonymous@opio.fr" 

530# otp_user_login = False 

531 

532 if any([otp_user_login, 

533 request.path.startswith('static/'), 

534 request.endpoint != None and request.endpoint.startswith('static/'), 

535 current_user.is_authenticated, # From Flask-Login 

536 getattr(app.view_functions[request.endpoint], 'is_public', False)]): 

537 return # Access granted 

538 else: 

539 return redirect(url_for('login.login') + "?redirecturl=" + request.url) 

540 

541 @bp2.route('/checkb', methods=['GET']) 

542 def checkb_valid_login(): 

543 return 'checkb is ok' 

544 

545 @public_route 

546 @bp2.route('/check', methods=['GET']) 

547# @app.before_request 

548 def check_valid_login(): 

549 login_valid = current_user.is_authenticated 

550 print(str(current_user)) 

551 

552 if (not login_valid): 

553 return 'Login invalid: ' + str(login_valid) 

554 else: 

555 return 'Login valid: ' + str(login_valid) 

556 

557 # https://stackoverflow.com/questions/32815451/are-global-variables-thread-safe-in-flask-how-do-i-share-data-between-requests 

558 # on va obliger à utiliser la meme session ? Ou pas le dire ? 

559 # ou memcached 

560 map_token_user = {} 

561 

562 @public_route 

563 @bp2.route('/password-reset', methods=['GET', 'POST']) 

564 def password_reset(): 

565 if request.method == 'POST': 

566 if request.referrer.split("/")[-1] == "login": 

567 username_or_email = request.form.get('username') 

568 

569 import secrets 

570 token = secrets.token_urlsafe(32) 

571# user.reset_password_token = token 

572# db.session.commit() 

573# lss.lib_user_data_internal.update_user(current_user.mtr_user_id, {"reset_password_token": token}) 

574 map_token_user[token] = username_or_email 

575 import datetime 

576 from flask import session 

577 

578 from server.safia import lpgss_singleton 

579# info_user = lpgss_singleton.select_user(current_user.mtr_user_id) 

580 info_user_from_mail = lpgss_singleton.get_user_from_mail(username_or_email) 

581 user = User.query.filter_by(username=username_or_email).first() 

582 

583 email = None 

584 if user == None and info_user_from_mail == None: 

585 return 'Aucun user trouvé avec ce username ou email' 

586 if info_user_from_mail != None: 

587# PAS BIEN DE FAIRE CELA ! 

588# user.set_is_two_factor_authentication_enabled(False) 

589 id = info_user_from_mail["id"] 

590 user = User.query.filter_by(mtr_user_id=id).first() 

591 if user == None: 

592 return "Your safia user (deprecated from otp) is not associated with a saxia user, we will soon set it automaticaly, in the mean time contact assistant@fotonower.com or follow instruction on https://app.ultradocia.fr/help ! " 

593 email = username_or_email 

594 # while resetting from username 

595 if email == None: 

596 info_user = lpgss_singleton.select_user(user.mtr_user_id) 

597 email = info_user["mail"] 

598 

599 username = user.username 

600 new_token_data = {"username" : username, "email" : email, "created_at" : datetime.datetime.now(), 

601 "valid_until" : datetime.datetime.now() + datetime.timedelta(minutes=10)} 

602 print(" new_token_data : " + str(new_token_data)) 

603 session[token] = new_token_data 

604 

605 print(" username : " + username) 

606 print(f"Token : {token}") 

607 url_reset_password = url_for('login.password_reset', token=token, _external=True) 

608 print(" url_reset_password : " + url_reset_password + " generated, maybe due to apache2 managing ssl, flask don't know it is in https ") 

609 url_reset_password = url_reset_password.replace("http://", "https://") 

610 # "/api/v1/safia # api_v1_safia_query 

611 import urllib 

612 message_encoded = urllib.parse.quote(url_reset_password) # urllib.parse.quote_plus(string, safe='', encoding=None, errors=None) 

613 vraiment_pas_bien = url_for("query", _external=True) + "?datou-50=true&object=simple_text_query&input_csv=from_mail_to_send%3Dvr%40opio.fr%2Cuseless%3Ddummy%2Cresult%3D" + message_encoded + "%2Crequest%3Duseless%2Cfile%3Duseless" 

614 

615 print(" password_reset vraiment_pas_bien : " + str(vraiment_pas_bien)) 

616 

617 import requests 

618 ret = requests.get(vraiment_pas_bien) 

619 

620 print(" after request vraiment_pas_bien :" + vraiment_pas_bien) 

621 

622 ipv4 = get_ipv4_from_request(request) 

623 from auth.lib_stat_usage import record_usage 

624 from server.safia import lpgss_singleton 

625 record_usage(user.mtr_user_id, ipv4, type="link_reset_password", info_message= url_reset_password, lpgss=lpgss_singleton) 

626 

627 from lib.lib_www.lib_www_util import send_message 

628 message = """ 

629 <html> 

630 <head></head> 

631 <body> 

632 <p>Click on the link to reset your Saxia password:</p> 

633 <a href='""" + url_reset_password + """'>Reset Password</a> 

634 

635 It will be valid for 10 minutes ! 

636 

637 </body> 

638 </html> 

639 """ 

640 message = "Click on the link to reset your Saxia password:\n" + url_reset_password + "\n<br>\nIt will be valid for 10 minutes only from the same browser and computer or mobile device !" 

641 ret = send_message(to=email, 

642 message=message, 

643 object_message="Reset Password", 

644 verbose=False) 

645 print(str(ret)) 

646 

647# print(" Ici on envoi un email !") 

648# redirect(vraiment_pas_bien) #, code=307) 

649 from flask import flash 

650# flash("Email sent if your mail is correct !", "success") 

651# return redirect(url_for("uld.uld_app")) 

652 #flash("Email sent if your mail is correct !", "success") 

653 return("Email sent if your mail is correct !") 

654 

655 

656 else : 

657 token = request.form.get('token') 

658 

659 from flask import session 

660 import datetime 

661 if token in session: 

662 token_info = session[token] 

663 if token_info["valid_until"] > datetime.datetime.now(): 

664 username = token_info["username"] 

665 user = User.query.filter_by(username=username).first() 

666 if user == None: 

667 return 'Aucun user trouvé avec ce username, please signup !' 

668 new_password = request.form.get('new_password') 

669 user.set_password(new_password) 

670 db.session.commit() 

671 

672 ipv4 = get_ipv4_from_request(request) 

673 from auth.lib_stat_usage import record_usage 

674 from server.safia import lpgss_singleton 

675 record_usage(user.mtr_user_id, ipv4, type="link_reset_password", 

676 info_message="Password Rest", lpgss=lpgss_singleton) 

677 

678 return 'Password reset successful' 

679 else: 

680 return "Token not found, please retry or send a mail to support@fotonower.com with object SAXIA RESET PASSWORD" 

681 else: 

682 print("Token not found in session it shouldn't be found in map_token_user that is deprecetad and not multi process plop") 

683 if token in map_token_user: 

684 username = map_token_user[token] 

685 #user = User.query.get(user_id) 

686 user = User.query.filter_by(username=username).first() 

687 new_password = request.form.get('new_password') 

688 user.set_password(new_password) 

689 db.session.commit() 

690 print("Internal error but it works ! => ON N=e DOIT JAMAIS ETRE LA") 

691 return 'Password reset successful' 

692 else: 

693 print("Indeed") 

694 return "Token not found, please retry or send a mail to support@fotonower.com with object SAXIA reset PASSWORD" 

695# return render_template('auth/password_reset.html') 

696# return 'Aucun user trouvé avec ce username' 

697 else: 

698 token = request.args.get('token') 

699 return render_template('auth/password_reset.html', token=token) 

700 

701 @bp2.route('/api/v1/admin/usage', methods=['GET']) 

702 def api_v1_admin_usage(): 

703 print("Inside saxia admin/all_users endpoint") 

704 from server.safia import register_user_get_data 

705 user, data_needed, lss = register_user_get_data(request, list_data_needed=[]) 

706 

707 user_id = lss.get_user_id() 

708 check = lss.lib_right.is_user_admin_config(user_id) 

709 if check: 

710 type = request.args.get('type') 

711 nb_day = request.args.get('nb_day') 

712 mtr_user_id = request.args.get('mtr_user_id') 

713 ipv4 = request.args.get('ipv4') 

714 

715 from server.safia import lpgss_singleton 

716 from auth.lib_stat_usage import get_usage 

717 

718 data = get_usage(lpgss_singleton, mtr_user_id, ipv4, nb_day=nb_day, type=type) 

719 return data 

720 else : 

721 return "You are not allowed to access this endpoint" 

722 

723 from lib.lib_util import get_b64encoded_qr_image 

724 

725 @bp2.route("/setup-2fa") 

726 @login_required 

727 def setup_two_factor_auth(): 

728 # secret = current_user.secret_token 

729# 

730 uri = current_user.get_authentication_setup_uri() 

731 from server.safia import lpgss_singleton 

732 info = lpgss_singleton.get_user_info_from_id(current_user.mtr_user_id) 

733 secret = info["miscinfo"]["secret_token"] 

734 base64_qr_image = get_b64encoded_qr_image(uri) 

735 return render_template("auth/setup-2fa.html", secret=secret, qr_image=base64_qr_image) 

736 

737 

738# from auth.lib_stat_usage import get_usage, record_usage 

739# def record_usage(mtr_user_id, ipv4, type, info_message, lpgss): 

740 from flask_wtf import FlaskForm 

741 from wtforms import EmailField, PasswordField 

742 from wtforms.validators import DataRequired, Email, EqualTo, Length, InputRequired 

743 from wtforms.fields import StringField, SubmitField 

744 

745 class TwoFactorForm(FlaskForm): 

746 otp = StringField('Enter OTP', validators=[ 

747 InputRequired(), Length(min=6, max=6)]) 

748 

749 class LoginForm(FlaskForm): 

750 username = StringField("Username", validators=[DataRequired()]) 

751 password = PasswordField("Password", validators=[DataRequired()]) 

752 

753 from flask import flash 

754 

755 HOME_URL = "uld.uld_app" 

756 SETUP_2FA_URL = "login.setup_two_factor_auth" 

757 VERIFY_2FA_URL = "login.verify_two_factor_auth" 

758 

759 @bp2.route("/verify-2fa", methods=["GET", "POST"]) 

760 @login_required 

761 def verify_two_factor_auth(): 

762 form = TwoFactorForm(request.form) 

763 if form.validate_on_submit(): 

764 if current_user.is_otp_valid(form.otp.data): 

765 if current_user.get_is_two_factor_authentication_enabled(): 

766 flash("2FA verification successful. You are logged in!", "success") 

767 return redirect(url_for(HOME_URL)) 

768 else: 

769 try: 

770 current_user.set_is_two_factor_authentication_enabled(True) 

771 

772# db.session.commit() 

773 flash("2FA setup successful. You are logged in!", "success") 

774 return redirect(url_for(HOME_URL)) 

775 except Exception: 

776 db.session.rollback() 

777 flash("2FA setup failed. Please try again.", "danger") 

778 return redirect(url_for(VERIFY_2FA_URL)) 

779 else: 

780 flash("Invalid OTP. Please try again.", "danger") 

781 return redirect(url_for(VERIFY_2FA_URL)) 

782 else: 

783 if not current_user.get_is_two_factor_authentication_enabled(): 

784 flash( 

785 "You have not enabled 2-Factor Authentication. Please enable it first.", "info") 

786 return render_template("auth/verify-2fa.html", form=form) 

787 

788 

789 @bp2.route("/login_2fa", methods=["GET", "POST"]) 

790 def login_two_factor_auth(): 

791 if current_user.is_authenticated: 

792 if current_user.get_is_two_factor_authentication_enabled(): 

793 flash("You are already logged in.", "info") 

794 return redirect(url_for(HOME_URL)) 

795 else: 

796 flash("You have not enabled 2-Factor Authentication. Please enable first to login.", "info") 

797 return redirect(url_for(SETUP_2FA_URL)) 

798 

799 form = LoginForm(request.form) 

800 if form.validate_on_submit(): 

801 user = User.query.filter_by(username=form.username.data).first() 

802 if user and bcrypt_app.check_password_hash(user.password, request.form["password"]): 

803 login_user(user) 

804 if not current_user.get_is_two_factor_authentication_enabled(): 

805 flash( 

806 "You have not enabled 2-Factor Authentication. Please enable first to login.", "info") 

807 return redirect(url_for(SETUP_2FA_URL)) 

808 return redirect(url_for(VERIFY_2FA_URL)) 

809 elif not user: 

810 flash("You are not registered. Please register.", "danger") 

811 else: 

812 flash("Invalid username and/or password.", "danger") 

813 return render_template("auth/login-2fa.html", form=form) 

814 

815 @bp2.route("/api/v1/token/create", methods=["POST"]) 

816 @login_required 

817 def api_v1_token_create(): 

818 from server.safia import register_user_get_data 

819 user, data_needed, lss = register_user_get_data(request, list_data_needed=[]) 

820 from flask_login import current_user 

821 if current_user.is_authenticated: 

822 print("On y croit") 

823 

824 # request = flask.request 

825 data_json = request.json 

826 description = data_json.get("token_name", "") 

827 from auth.lib_auth import create_id 

828 token = "safia_" + create_id() 

829 ret = lss.lib_user_data_internal.create_token( 

830 current_user.mtr_user_id, description=description, token=token) 

831 

832 if ret is not None: 

833 from flask import jsonify 

834 return jsonify({"status": "success", "token": token}), 201 

835 else: 

836 from flask import jsonify 

837 return jsonify({"status": "error", "message": "Token creation failed"}), 500 

838 

839 @bp2.route("/api/v1/token/revoke", methods=["POST"]) 

840 @login_required 

841 def api_v1_token_revoke(): 

842 if current_user.is_authenticated: 

843 print("On y croit") 

844 

845# def update_token(self, token, configuration=None, project_id=None, mtr_user_id=None, set_not_valid=True, 

846# description=None): 

847 

848 return "Token could be revoked !" 

849 

850 @bp2.route("/api/v1/token/update", methods=["POST"]) 

851 @login_required 

852 def api_v1_token_update(): 

853 if current_user.is_authenticated: 

854 print("On y croit") 

855 return "Token Update !" 

856 

857 @bp2.route("/api/v1/token/list", methods=["POST", "GET"]) 

858 @login_required 

859 def api_v1_token_index(): 

860 from server.safia import register_user_get_data 

861 user, data_needed, lss = register_user_get_data(request, list_data_needed=[]) 

862 from flask_login import current_user 

863 if current_user.is_authenticated: 

864 print("On y croit") 

865 

866 mtr_user_id_used = current_user.mtr_user_id 

867 user_id_input = request.args.get('user_id', None) 

868 if user_id_input != None and user_id_input != current_user.mtr_user_id : 

869 is_admin = lss.lib_right.is_user_admin_config(current_user.mtr_user_id) 

870 if not is_admin: 

871 from flask import jsonify 

872 return jsonify({"status": "error", "message": "User is not admin"}), 403 

873 else : 

874 mtr_user_id_used = user_id_input 

875 

876 list_tokens = lss.lib_user_data_internal.get_token(mtr_user_id_used) 

877 if list_tokens is None: 

878 list_tokens = [] 

879 from lib.lib_util import humanize_modified_time 

880 for t in list_tokens: 

881 valid_until = t.get("valid_until", None) 

882 if valid_until is not None: 

883 t["valid_until_h"] = humanize_modified_time(valid_until) 

884 from flask import jsonify 

885 if list_tokens is not None: 

886 return jsonify({"status": "success", "tokens": list_tokens}), 200 

887 else: 

888 return jsonify({"status": "error", "message": "Token retrieval failed"}), 500 

889 

890 

891 # VR 8-6-25 : n'y a t'il pas un doublon qqpart ? 

892 @bp2.route("/api/v1/auth/authenticator/deactivate", methods=["POST", "GET"]) 

893 @login_required 

894 def api_v1_auth_authenticator_deactivate(): 

895 from server.safia import register_user_get_data 

896 user, data_needed, lss = register_user_get_data(request, list_data_needed=[]) 

897 from flask_login import current_user 

898 from flask import jsonify 

899 

900 if current_user.is_authenticated: 

901 print("On y croit") 

902 

903 # check if user is admin 

904 from server.safia import lpgss_singleton 

905 is_admin = lss.lib_right.is_user_admin_config(current_user.mtr_user_id) 

906 user_id_input = request.args.get('user_id', None) 

907 

908 mtr_user_id_used = current_user.mtr_user_id 

909 user_used = current_user 

910 user_id_input = request.json.get('user_id', None) 

911 if user_id_input != None and user_id_input != current_user.mtr_user_id: 

912 is_admin = lss.lib_right.is_user_admin_config(current_user.mtr_user_id) 

913 if not is_admin: 

914 from flask import jsonify 

915 return jsonify({"status": "error", "message": "User is not admin"}), 403 

916 else: 

917 mtr_user_id_used = user_id_input 

918 user_used = User.query.filter_by(mtr_user_id=mtr_user_id_used).first() 

919 

920 

921 

922# login_user(user) 

923 

924# User(mtr_user_id=user_id_input) 

925 print(str(user_used)) 

926 is_activated = user_used.get_is_two_factor_authentication_enabled() 

927 if is_activated: 

928 print("Still to do") 

929 

930 user_used.set_is_two_factor_authentication_enabled(False) 

931 

932# return "Authenticator deactivated !" 

933 return jsonify({"status": "success", "message": "Authenticator deactivated !"}), 200 

934 else : 

935 print("Already deactivated") 

936 #return "Authenticator already deactivated !" 

937 return jsonify({"status": "success", "message": "Authenticator already deactivated !"}), 200 

938 else: 

939 return jsonify({"status": "error", "message": "User not authenticated"}), 401 

940 

941 return bp2, User 

942 

943