Coverage for lib/lib_util.py: 45%

1285 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-18 02:40 +0100

1import datetime 

2import os.path 

3import time 

4import uuid 

5 

6def parse_key_and_size(sub_key = "", data = {}, min_size_iterate = 1000000, min_size_display = 10000): 

7 if type(data) == list or type(data) == str: 

8 return 

9 for k in data: 

10 try: 

11 size = len(str(data[k])) 

12 except Exception as e: 

13 print(str(e)) 

14 if size > min_size_display: 

15 print(" k : " + str(sub_key + "." + str(k)) + " size : " + str(size)) 

16 if size > min_size_iterate: 

17 parse_key_and_size(sub_key + "." + str(k), data[k], min_size_iterate = min_size_iterate, min_size_display = min_size_display) 

18 

19 

20def filter_key_deep(sub_key = "", data = {}, filter = "input.list_audit_map_reduce"): 

21 import sys 

22 sys.stdout.write(">ç") 

23 if type(data) == list: 

24 return data 

25 

26 if sub_key.endswith(".text.text.text"): 

27 print(" Here comes the circularity ! gasps, How come does it happen sometime and sometime not !") 

28 print(" sub_key : " + str(sub_key)) 

29 print("CORRECTING !") 

30 return {} 

31 

32 elif type(data) == dict: 

33 new_data = {} 

34 for k in data: 

35 sub_sub_key = sub_key + "." + str(k) 

36 if sub_sub_key.endswith(filter): 

37 continue 

38 else : 

39 new_data[k] = filter_key_deep(sub_sub_key, data[k]) 

40 return new_data 

41 else: 

42 return data 

43 

44def get_unique_id(): 

45 return uuid.uuid4() 

46# return ':'.join(['{:02x}'.format((uuid.getnode() >> i) & 0xff) for i in range(0,8*6,8)][::-1]) 

47 

48def count_and_display_elapsed_time(begin_time, message = "", verbose = False, min_time = 1) -> (float, str): 

49 end_time = time.time() 

50 elapsed = end_time - begin_time 

51 message_with_time = str(message) + " elapsed time : " + str(elapsed) 

52 if verbose or elapsed > min_time: 

53 print (message_with_time) 

54 return time.time(), message_with_time 

55 

56def replace_non_alpha_with_underscore(s): 

57 import re 

58 return re.sub(r'[^a-zA-Z0-9]', '_', s) 

59 

60def select_datou_step_from_extension_type_upload(file_name, file_extension, file_content_type): 

61 is_managed = False 

62 actions = "" 

63 extension = "" 

64 

65 the_zips = [".zip", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz"] 

66 

67 map_datou_step_extension = {"image_to_text,request_gpt,send_mail" : [".jpeg", ".jpg", ".png"], 

68 "speech_to_text,request_gpt,send_mail" : [".mp3", ".ogg", ".amr", ".m4a", ".wav"], 

69 "prepare_json_for_safia,import_safia_from_json" : [".pdf,.txt"], 

70 "import_safia_from_json" : [".json"]} 

71 

72 map_datou_step_extension = {"jpg" : [".jpeg", ".jpg", ".png"], 

73 "amr" : [".mp3", ".ogg", ".amr", ".mp4", ".m4a", ".wav", ".webm"], 

74 "pdf" : [".pages", ".pptx", ".pdf", ".txt", ".md", ".py", ".docx", ".zip", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz"], 

75 "json" : [".json"]} 

76 

77 from lib.datou.datou_exec import list_datous 

78 

79 # TOOD mp4 if we want 

80 list_file_extension_managed = [".mp3", ".ogg", ".amr", ".mp4", ".webm", ".m4a", ".wav", ".jpeg", ".jpg", ".png", ".pages", ".pptx", ".pdf", ".txt", ".md", ".docx", ".json", ".zip", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz"] 

81 

82 list_content_type_managed = ['image/jpeg', 'audio/webm;codecs=opus'] 

83 # <FileStorage: 'blob' ('audio/webm;codecs=opus')> 

84 

85 map_list_content_to_extension = {'image/jpeg' : [".jpeg", ".jpg"], 

86 'audio/webm;codecs=opus':['.ogg'], 

87 'audio/amr':['.amr'], 

88 'audio/mp4':[".webm"]} # and not mp4 

89 

90 if file_extension == "": 

91 if file_content_type in map_list_content_to_extension.keys(): 

92 print(" TODO change the extension of the file : " + file_name + " from " + file_extension + " to " + str(map_list_content_to_extension[file_content_type.lower()]) + " one of them " ) 

93 

94 extension = map_list_content_to_extension[file_content_type.lower()][0] 

95 file_extension = extension 

96 is_managed = True 

97 

98 datou_as_key = "" 

99 for datou_as_key in map_datou_step_extension: 

100 if file_extension.lower() in map_datou_step_extension[datou_as_key]: 

101 actions = list_datous[datou_as_key].copy() # TODO VR : il faut peut-etre faire mieux pour ne pas éditer/modifier la liste des actions 

102 is_managed = True 

103 

104 if datou_as_key == "" and is_managed: 

105 print(" We have different problems, due to new extension of type manageable ! ") 

106 

107 # TODO VR 10-6-23 : inconsistance dans certains cas ou l'extension n'est pas listé dans le filetype attendu, donc sans doute à rajouter 

108 

109 if is_managed == False: 

110 print("TODO : check if we can do something with these file or not !") 

111 

112 return is_managed, actions, extension 

113 

114 

115 

116def humanize_modified_time(modified_at): 

117 import humanize 

118 from humanize import i18n 

119 import datetime 

120 from datetime import timezone 

121 today = datetime.datetime.now() 

122 todaynn = datetime.datetime.now(timezone.utc) 

123 

124 # Change la langue en français 

125 i18n.activate('fr_FR') 

126 

127 from lib.lib_github import util_is_naive 

128 # Suppose que proj.modified_at est un objet datetime 

129 if util_is_naive(modified_at): 

130 time_difference = todaynn - modified_at 

131 else: 

132 time_difference = today - modified_at 

133 human_diff = humanize.naturaltime(time_difference) 

134 return human_diff 

135 

136def import_all_for_coverage_with_zero_percent(): 

137 

138 # For complete coverage 

139 

140 

141 #import test.conftest => 18 lignes 

142 #import lib.stockage.lib_abstract_stockage => 0 ligne 

143 #import gunicorn.conf #server. => 10 lignes 

144 

145 #import test.func.first_test_func => 34 lignes 

146 

147 # A REMETTRE 

148 import auth.lib_privacy 

149 import auth.lib_stat_usage 

150 import auth.lib_privacy 

151 import auth.lib_stat_usage 

152 import auth.lib_cost 

153 import auth.lib_auth 

154 import auth.lib_user_conf 

155 import lib.lib_www.lib_routes 

156 import lib.lib_www.lib_html 

157 import lib.stockage.lib_pg_dataset_pg 

158 #import prompt 

159 import lib.stockage.lib_pyfvs 

160 import lib.import_util.lib_path_to_vec 

161 import lib.import_util.lib_import_retrieval.models.models 

162 import lib.import_util.lib_import_retrieval.scripts.process_json.process_json 

163 import lib.lib_github 

164 import lib.lib_mail 

165 # import lib.lib_ocr => due to cv2 

166 import lib.lib_prompt_issue 

167 

168# now inserted 

169# import lib.lib_graph 

170# import lib.datou.lib_datou_step_template 

171 

172 print("Imported !") 

173 

174def compute_token(input): 

175 

176 import tiktoken 

177 tokenizer = tiktoken.get_encoding("cl100k_base") # The encoding scheme to use for tokenization 

178 tokens = tokenizer.encode(input, disallowed_special=()) 

179 

180 nb_tokens = len(tokens) 

181 return nb_tokens 

182 

183 

184def check_and_truncate_query_max_token(input, number_token_max = 100000, #7000, #8096, # 8096 

185 verbose = False, 

186 strategy_condition = "prop_char_token_marg10"): 

187 

188 len_char = len(input) 

189 nb_tokens = compute_token(input) 

190 

191 under_limit = True 

192 if nb_tokens < number_token_max: 

193 return under_limit, "" 

194 else : # pragma no cover scale 

195 under_limit = False 

196 input_truncated = "" 

197 if strategy_condition == "prop_char_token_marg10": 

198 new_size = int(float(number_token_max) / float(nb_tokens) * len_char) 

199 input_truncated = input[:new_size] 

200 else : 

201 print("This strategy " + strategy_condition + " is not managed, no truncated query proposed !") 

202 

203 return under_limit, input_truncated 

204 

205# Voici comment on pourrait procéder pour créer ces deux fonctions. Ici, j'utilise le module struct po 

206 

207import struct 

208 

209# Valeurs sous forme de float 

210# - [ ] TODO VR refacto name 

211def write_as_float(list_data_map, filename_prefix, offset_media_id = 0, 

212 offset_data_file_id = 0, 

213 photo_desc_type=1777, 

214 type_data = "float", 

215 dim_input = 0): 

216 num_data = 0 

217 file = open(f'{filename_prefix}{offset_data_file_id}.dat', 'wb') 

218 file_index_desc = open(f'{filename_prefix}photo_list.index', 'a') 

219 display_dimenstion_desc = True 

220 number_dimension = 0 

221 for map_data in list_data_map: 

222 descchaine = map_data["embedding"] 

223 

224 type_list_desc = str(type(descchaine)) 

225 

226 import sys 

227 if type_list_desc == "<class 'str'>": 

228 sys.stdout.write("s") 

229 desc = list(map(float, descchaine.lstrip("[").rstrip("]").split(","))) 

230 else : 

231 sys.stdout.write("n") 

232# print(" emb : " + str(emb)) 

233 desc = list(map(lambda x: x.astype('double'), descchaine)) 

234 if display_dimenstion_desc: 

235 number_dimension = len(desc) 

236 if number_dimension == 0: 

237 print("Internal Error") 

238 print(f" Dimension desc : {number_dimension}") 

239 display_dimenstion_desc = False 

240 offset_media_id += 1 

241 if dim_input > 0 and dim_input < number_dimension: 

242 desc = desc[:dim_input] 

243 number_dimension = dim_input 

244 for value in desc: 

245 if type_data == "float": 

246 file.write(struct.pack('f', float(value))) 

247 elif type_data == "tinyint": 

248 file.write(struct.pack('B', min(255, max(0, int(value))))) 

249 else : 

250 print("Type " + str(type_data) + " not supported ! nothin written !") 

251 num_data += 1 

252 # num_data * number_dimension > 10000 

253 if num_data == 1000: 

254 file.close() 

255 offset_data_file_id += 1 

256 num_data = 0 

257 file = open(f'{filename_prefix}{offset_data_file_id}.dat', 'wb') 

258 file_index_desc.write(f'{offset_media_id},{offset_data_file_id},{photo_desc_type}\n') 

259 file.close() 

260 file_index_desc.close() 

261 return number_dimension, offset_media_id, offset_data_file_id 

262 

263# Valeurs sous forme de tiny int 

264# - [ ] TODO VR to delete 

265def write_as_tiny_int(data_2d, filename_prefix): 

266 file_index = 0 

267 num_lines = 0 

268 file = open(f'{filename_prefix}{file_index}.dat', 'wb') 

269 for sublist in data_2d: 

270 for value in sublist: 

271 file.write(struct.pack('B', min(255, max(0, int(value))))) 

272 num_lines += 1 

273 if num_lines == 10000: 

274 file.close() 

275 file_index += 1 

276 num_lines = 0 

277 file = open(f'{filename_prefix}{file_index}.dat', 'wb') 

278 file.close() 

279 

280 

281 

282 

283def subprocessCommand(command, timeout = 10, verbose = False): 

284 import subprocess 

285 """ permet de faire appel a des commandes shell """ 

286 proc = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) 

287 try: 

288 outs, errs = proc.communicate(timeout=timeout) 

289 if verbose : 

290 print(str(command) + " Error : " + str(errs)) 

291 print(" Output : " + str(outs)) 

292 except subprocess.TimeoutExpired: 

293 proc.kill() 

294 outs, errs = proc.communicate() 

295 if verbose : 

296 print(str(outs) + " : " + str(errs)) 

297 return outs 

298 

299 

300# - [ ] TODO VR move in lib.lib_suiviprod.lib_manage_log ? 

301 

302def display_real_dict_row_shorten(result_pg_query): 

303 from copy import deepcopy 

304 result_pg_query_to_display = deepcopy(result_pg_query) 

305 for r in result_pg_query_to_display: 

306 if "embedding" in r: 

307 r["embedding"] = r["embedding"][:50] 

308# del r["embedding"] 

309 

310 print(result_pg_query_to_display) 

311 

312def build_gif_from_png(list_pngs, out_folder_image, uuid = None): 

313 from uuid import uuid4 

314 import os 

315 if uuid == None: 

316 uuid = uuid4() 

317 movie = os.path.join(out_folder_image, "one_gif_" + str(uuid) + ".gif") 

318 movie2 = movie + "_2.gif" 

319 

320 width = int(720 / 4) 

321 height = 120 

322 

323 width = int(7200 / 4) 

324 height = 1200 

325 

326 import imageio 

327 from PIL import Image 

328 frames = [] 

329 with imageio.get_writer(movie2, mode='I', duration=500) as writer: 

330 

331 for filename in list_pngs: 

332 if "jpg" in filename.lower(): 

333 ext = "jpg" 

334 if "png" in filename.lower(): 

335 ext = "png" 

336 if not "jpg" in filename.lower() and not "png" in filename.lower(): 

337 continue 

338 # for filename in filenames: 

339 img = Image.open(filename) 

340 filename_small = filename + ".small." + ext 

341 

342 # image = imageio.imread(filename) 

343 # new_image = image.copy() 

344 # new_image = np.resize(image, (height, width, 3), Image.ANTIALIAS) 

345 

346 new_image = img.resize((width, height)) # , Resampling.LANCZOS) # Image.ANTIALIAS) 

347 new_image.save(filename_small) 

348 

349 # Image.ANTIALIAS 

350 # cv2.imwrite(os.path.join(folder_small, f), new_image) 

351 

352 image = imageio.imread(filename_small) 

353 frames.append(imageio.imread(filename_small)) 

354 

355 # new_image = imageio.new(new_image) 

356 # image.resize(width, height) 

357 import sys 

358 sys.stdout.write("+") 

359 writer.append_data(image) 

360 

361# imageio.imsave(movie, frames, format='GIF', fps=2) 

362# kargs = {'duration': 5} 

363# imageio.mimsave(movie, frames, 'GIF', **kargs) 

364 

365 import numpy as np 

366# from matplotlib.animation import FuncAnimation 

367# from IPython import display 

368 import matplotlib.pyplot as plt 

369# anim_created = FuncAnimation(Figure, AnimationFunction, frames=100, interval=5) 

370 

371# video = anim_created.to_html5_video() 

372# html = display.HTML(video) 

373# display.display(html) 

374 

375 # good practice to close the plt object. 

376# plt.close() 

377 

378 return movie2 

379 

380 

381 

382# Fonction dupliquer depuis lib_tent_pyfvs pour en garder une version dans safia 

383def is_port_available(port): 

384 res = "" 

385 try : 

386 host = "localhost" 

387 file_res_fvs = "temp/temp_res_nc.log" 

388 import os 

389 if not os.path.exists('temp'): 

390 os.makedirs('temp') 

391 list_cmds = ["/usr/bin/nc", "-zv", host, str(port), ">", file_res_fvs] 

392 cmd_launch = " ".join(list_cmds) 

393 print(cmd_launch) 

394 cmd_launch_and_write = cmd_launch 

395 import os 

396 os.system(cmd_launch_and_write) 

397 with open(file_res_fvs, "r") as f: 

398 res = f.read() 

399 # ret_sp = subprocess.run(list_cmds, capture_output=True, text=True) 

400 # res = ret_sp.stdout.strip("\n") 

401 

402 print(res) 

403 # nc: connectx to 127.0.0.1 port 45 (tcp) failed: Connection refused 

404 

405 os.rmdir(file_res_fvs) 

406 except Exception as e: 

407 print(str(e)) 

408 return True 

409 

410 if "Connection refused" in res : 

411 return True 

412 else : 

413 return False 

414 

415# Fonction dupliquer depuis lib_tent_pyfvs pour en garder une version dans safia 

416def get_random_port(): 

417 import random 

418 port = random.randint(5000, 6000) 

419 while not is_port_available(port): 

420 port = random.randint(5000, 6000) 

421 return port 

422 

423 

424def display_confusion_matrix(df): 

425 import numpy as np 

426 columns_array = df.columns.astype(str).values 

427 header = columns_array.reshape(1, -1) 

428 

429 # Conversion des index en array et ajout comme première colonne 

430 index_array = df.index.astype(str).values 

431 index_as_col = index_array.reshape(-1, 1) 

432 

433 # Concaténation des noms des lignes avec les données du DataFrame 

434 data_with_index = np.concatenate((index_as_col, df.values.astype(str)), axis=1) 

435 

436 # Concaténation des noms des colonnes 

437 # modif foireuse de VR sur le code foireux de GPT et voila, merci le try except 

438 full_array = np.concatenate((np.concatenate([""], header), data_with_index), axis=0) 

439 return full_array 

440 

441 

442 

443 

444def from_pdf_to_list_pngs(in_files, tempfolder = "temp", dpi = 72, 

445 hash_id_treatment = None, 

446 only_count = False): 

447 if len(in_files) == 0: 

448 return [] 

449 

450 list_of_list_of_pages = [] 

451 

452 import fitz, os # PyMuPDF # TODO add in list install 

453 

454 from uuid import uuid4 

455 # VR 6-5-24 : in order to have the correct link when uploading from interface 

456 tempfolder = os.path.join(os.path.dirname(in_files[0]), hash_id_treatment) if hash_id_treatment != None else os.path.join(tempfolder, str(uuid4())) 

457 if not os.path.exists(tempfolder): 

458 os.makedirs(tempfolder) 

459 

460# from pypdf import PdfReader 

461# >> > reader = PdfReader('example.pdf') 

462# >> > box = reader.pages[0].mediabox 

463 

464 list_images = [] 

465 count_per_batch = [] 

466 cum_page_number = 0 

467 id_page_number_to_list = 1 

468 for in_file in in_files: 

469 # Ouvrir le fichier PDF 

470 pdf_document = fitz.open(in_file) 

471 

472 xref = pdf_document.page_xref(0) # xref of page 0 

473 # pprint(doc.xref_get_keys(xref)) # primary level keys of a page 

474 print(pdf_document.xref_get_keys(xref)) 

475 # ('Type', 'Contents', 'Resources', 'MediaBox', 'Parent') 

476 # pprint(doc.xref_get_keys(-1)) # primary level keys of the trailer 

477 # ('Type', 'Index', 'Size', 'W', 'Root', 'Info', 'ID', 'Length', 'Filter') 

478 print(pdf_document.xref_get_keys(-1)) 

479 nb_page_this_batch = len(pdf_document) 

480 

481 list_of_pages = [] 

482 # Parcourir chaque page 

483 for page_number in range(len(pdf_document)): 

484 # Obtenir la page 

485 page = pdf_document[page_number] 

486 

487 # if portrait 

488 size_expected_by_fitz = page.mediabox.height / 72 

489 size_expected_by_fitz = page.mediabox.width / 72 

490 print(" size_expected_by_fitz : " + str(size_expected_by_fitz)) 

491 # Since it should be 21 

492 apply_factor_dpi_correct_size = dpi / 72 * 21 / size_expected_by_fitz 

493 used_dpi = int(apply_factor_dpi_correct_size * 72) 

494 # used_dpi = dpi 

495 

496 # Rendre la page en tant qu'image 

497 pix = page.get_pixmap(dpi=used_dpi) 

498 # Définir le chemin de sortie de l'image PNG 

499 output_image_path = f"{tempfolder}/page_{cum_page_number + page_number + 1}.png" 

500 # Sauvegarder l'image en tant que PNG 

501 if not only_count: 

502 pix.save(output_image_path) 

503 # import cv2 

504 # im = cv2.imread(output_image_path) 

505 # output_image_path_png = f"{tempfolder}/page_png_{page_number + 1}.png" 

506 # cv2.imwrite(output_image_path, im) 

507 list_images.append(output_image_path) 

508 list_of_pages.append(id_page_number_to_list) 

509 id_page_number_to_list = id_page_number_to_list + 1 

510 

511 list_of_list_of_pages.append(list_of_pages) 

512 count_per_batch.append(nb_page_this_batch) 

513 cum_page_number += nb_page_this_batch 

514 # Fermer le document PDF 

515 pdf_document.close() 

516 

517 return list_images, count_per_batch, list_of_list_of_pages 

518 

519def split_text(text, max_length=10000, overlap = 1000): 

520 """Split text into chunks of maximum length.""" 

521 end = max_length 

522 list_texts = [text[:end]] 

523 while end < len(text): 

524 list_texts.append(text[end:end + max_length]) 

525 end = end + max_length - overlap 

526 

527 return list_texts 

528 

529def parse_list_page_as_begin_end_separated(l): 

530 try: 

531 print(" parse_list_page_as_begin_end_separated : " + str(l)) 

532 # faire une expression régulière pour détecter un entier suivi de caractère quelconque suivi d'un entier 

533 begin_page = "" 

534 end_page = "" 

535 # on va faire une boucle pour chercher le premier entier puis les séparateur puis le deuxième entier, il est nécessaire d'utiliser des variables pour définir les trois etats de l'analyse, pendant la lecture du premier entier, du séparateur puis deu deuxième 

536 state = 0 

537 for c in l: 

538 is_digit_je_fais_pas_confiance = c in "0123456789" 

539 if state == 0 and is_digit_je_fais_pas_confiance: 

540 begin_page += c 

541 elif state == 0 and not is_digit_je_fais_pas_confiance: 

542 state = 1 

543 elif state == 1 and not is_digit_je_fais_pas_confiance: 

544 pass 

545 elif state == 1 and is_digit_je_fais_pas_confiance: 

546 state = 2 

547 end_page += c 

548 elif state == 2 and is_digit_je_fais_pas_confiance: 

549 end_page += c 

550 else : 

551 print("Unexpected behavior while parsing " + str(l)) 

552 

553 if begin_page == "" or end_page == "" or state != 2: 

554 print("Unexpected behavior while parsing " + str(l)) 

555 

556 begin = int(begin_page) 

557 end = int(end_page) 

558 if begin > end: 

559 print("Unexpected begin > end : " + str(begin) + " > " + str(end)) 

560 return [] 

561 return list(range(begin, end + 1)) 

562 except Exception as e: 

563 print("In parsing begin end : " + str(e)) 

564 return [] 

565 

566 

567 

568def read_list_one_doc_csv_with_sometime_tiret(list_page): 

569 list_page_one_document = [] 

570 for l in list_page: 

571 try : 

572 list_page_one_document.append(int(l)) 

573 except Exception as e: 

574 sub_list_page = parse_list_page_as_begin_end_separated(l) 

575 list_page_one_document.extend(sub_list_page) 

576 return list_page_one_document 

577 

578 

579# [x] TODO 27-12-23 : rename the parameters since page is also a list and 

580# - [ ] TODO flute fonction à dédupliquer 

581def split_text_by_doc(list_page_content, list_page_per_doc): 

582 list_documents = list_page_per_doc.split(";") 

583 list_texts = [] 

584 for l in list_documents: 

585 if l == ";": 

586 print("WARNING TREATED THAT COULD NEED AUDIT") 

587 continue 

588 try: 

589 list_page_one_document = read_list_one_doc_csv_with_sometime_tiret(l.split(",")) 

590 except Exception as e: 

591 print("ERROR treated as warning, trying to parse list_page_per_doc as begin and end separated : " + str(e)) 

592 list_page_one_document = parse_list_page_as_begin_end_separated(l) 

593 content = "" 

594 for ll in list_page_one_document: 

595 content += list_page_content[ll - 1] 

596 list_texts.append(content) 

597 return list_texts 

598 

599# VR 16-6-25 : cela me semble buggue et inutilisé 

600def split_list_page_by_page(list_page_content_text): 

601 list_list_page_content = [] 

602 for l in list_page_content_text: 

603 list_list_page_content.append([l]) 

604 return list_list_page_content 

605 

606def split_list_page_by_doc(list_page_content_text, list_page_per_doc): 

607 list_documents = list_page_per_doc.split(";") 

608 list_list_page_content = [] 

609 for l in list_documents: 

610 if l == ";": 

611 print("WARNING TREATED THAT COULD NEED AUDIT") 

612 continue 

613 try: 

614 list_page_one_document = list(map(int, l.split(","))) 

615 list_page_one_document = read_list_one_doc_csv_with_sometime_tiret(l.split(",")) 

616 except Exception as e: 

617 print("ERROR treated as warning, trying to parse list_page_per_doc as begin and end separated : " + str(e)) 

618 list_page_one_document = parse_list_page_as_begin_end_separated(l) 

619 aux_list_page_content = [] 

620 for ll in list_page_one_document: 

621 aux_list_page_content.append(list_page_content_text[ll - 1]) 

622 list_list_page_content.append(aux_list_page_content) 

623 return list_list_page_content 

624 

625def list_file_anon(folder): 

626 import os 

627 list_file = os.listdir(folder) 

628 list_file = list(map(lambda x: os.path.join(folder, x), list_file)) 

629 pdf_files = [] 

630 pdf_anon_files = [] 

631 content_pdf_files = [] 

632 anon_json_files = [] 

633 for f in list_file: 

634 if f.lower().endswith(".pdf") and not f.lower().endswith("_anon.pdf"): 

635 pdf_files.append(f.lower()) 

636 if f.lower().endswith("_anon.pdf"): 

637 pdf_anon_files.append(f.lower()) 

638 if f.lower().endswith("_content.txt"): 

639 content_pdf_files.append(f.lower()) 

640 if f.lower().endswith("_anon.json"): 

641 anon_json_files.append(f.lower()) 

642 

643 map_pdf_file_anon_pdf_and_anon_strat = {} 

644 for pdf in pdf_files: 

645 map_pdf_file_anon_pdf_and_anon_strat[pdf] = {} 

646 pdf_anon = pdf + "_anon.pdf" 

647 if pdf_anon in pdf_anon_files: 

648 map_pdf_file_anon_pdf_and_anon_strat[pdf]["pdf_anon"] = pdf_anon 

649 json_anon = pdf + "_anon.json" 

650 if json_anon in anon_json_files: 

651 map_pdf_file_anon_pdf_and_anon_strat[pdf]["json_anon"] = json_anon 

652 with open(json_anon, "r") as f: 

653 map_pdf_file_anon_pdf_and_anon_strat[pdf]["json_anon_content"] = f.read() 

654 content_pdf = pdf + "_content.txt" 

655 if content_pdf in content_pdf_files: 

656 map_pdf_file_anon_pdf_and_anon_strat[pdf]["content_pdf_file"] = content_pdf 

657 with open(content_pdf, "r") as f: 

658 map_pdf_file_anon_pdf_and_anon_strat[pdf]["content_pdf"] = len(f.read()) 

659 

660 

661 

662 return map_pdf_file_anon_pdf_and_anon_strat 

663 

664#from pydantic import BaseModel, Field 

665 

666from typing import Optional 

667class SubDocPage() : #BaseModel): 

668 page_number : Optional[int] = 0 #Field(0) 

669 content : Optional[str] = "" #Field("") 

670 source_image : Optional[str] = None #Field(None) 

671 list_boxes : Optional[list] = [] #Field([]) 

672 maxx : Optional[int] = None 

673 maxy : Optional[int] = None 

674 list_blocks : Optional[dict] = {} #Field() 

675 

676 def __init__(self, page_number : Optional[int] = 0, 

677 content : Optional[str] = "", 

678 source_image : Optional[str] = None, 

679 list_boxes : Optional[list] = [], 

680 maxx : Optional[int] = None, 

681 maxy : Optional[int] = None, 

682 list_blocks : Optional[dict] = {}): 

683# super() 

684 self.page_number = page_number 

685 self.content = content 

686 self.source_image = source_image 

687 self.list_boxes = list_boxes # list of boxes only by token (word) from tesseract 

688 self.list_blocks = list_blocks # by line, paragraph or token (word) from gcp_doc_ai 

689 self.maxx = maxx 

690 self.maxy = maxy 

691 

692 def toJSON(self): 

693 import json 

694 return {"page_number" : self.page_number, 

695 "content" : self.content, 

696 "source_image" : self.source_image, 

697 "paragraphs" : self.list_blocks["paragraphs"] if "paragraphs" in self.list_blocks else [], 

698 "tokens" : self.list_blocks["tokens"] if "tokens" in self.list_blocks else []} 

699# return json.dumps(self, default=lambda o: o.__dict__, 

700# sort_keys=True, indent=4) 

701 

702 def toText(self): 

703 return self.content 

704 

705def parse_date(str_date, current_date = datetime.datetime.now(), settings={'DATE_ORDER': 'DMY'}): 

706 import dateparser 

707 print(" parse_date : " + str(parse_date)) 

708 if type(str_date) == list: # str 

709 print(" Unexpected list of date str_date : " + str(str_date)) 

710 str_date = str_date[0] 

711 

712 parsed_or_forced = True 

713 

714 try: 

715 date_parsed = dateparser.parse(str_date, settings=settings) 

716 # '2018-10-25' 

717 # '12/09/2017' 

718 # '12-01-2023' 

719 except Exception as e: 

720 print(str(e)) 

721 print("str_date not parsed : " + str_date) 

722 import datetime 

723 date_parsed = current_date 

724 parsed_or_forced = False 

725 if date_parsed == None: 

726 print("str_date not parsed : " + str_date) 

727 date_parsed = current_date 

728 parsed_or_forced = False 

729 return date_parsed, parsed_or_forced 

730 

731def remove_comment_end_of_line_and_print_them(s): 

732 import re 

733 res = re.sub(r'//.*', '', s) 

734 if res != s: 

735 print("Comment removed : " + s + " DO BETTER PLEASE !") 

736 return res 

737 

738def parse_json_from_prompt_result(result, verbose = False, lazy = False): 

739 import json, re 

740 json_obj = {} 

741 

742 pattern = r"```json\s*(\{.*?\})\s*```" 

743 #pattern = r"```json\s*(\{[^\{\}]\})\s*```" 

744# pattern = r"```json\s*(\[?[\{.*?\}]*\]?)\s*```" 

745 if lazy: # TODO on doit vérifier qu'il n'y a pas d'occurence de ``````json dans le string sinon on plante 

746 pattern = r"```json\s*(.*)\s*```" 

747 else: 

748 pattern = r"```json\s*(.*?)\s*```" 

749# pattern = r"""(\{(?:(?>[^{}"'\/]+)|(?>"(?:(?>[^\\"]+)|\\.)*")|(?>'(?:(?>[^\\']+)|\\.)*')|(?>\/\/.*\n)|(?>\/\*.*?\*\/)|(?-1))*\})""" 

750 

751 # Search the document content for our pattern 

752# match = re.search(pattern, result, re.DOTALL) 

753 

754 if type(result) == type(None): 

755 print("Error treated as warning, we have a None result, we set to '' ") 

756 result = "" 

757 if type(result) == list and len(result) > 0: 

758 print("WARNING we could have extract better information by collecting from the different part of the folder !") 

759 result = result[0] 

760 if type(result) == list and len(result) == 0: 

761 print("ERROR in parse_json_from_prompt_result !") 

762 if type(result) == dict: 

763 print("WARNING in parse_json_from_prompt_result, we have a dict, we convert it to string !") 

764 result = json.dumps(result) 

765 result = result.replace("null", "\"\"") 

766 result = result.replace("\\", "") 

767 result = result.replace("...", "\"\"") # il faudrait protéger dans un deuxième temps 

768 

769 match = re.findall(pattern, result, re.DOTALL) 

770 

771 all_results = [] 

772# if match: 

773 

774 current_date = datetime.datetime.now() - datetime.timedelta(days=100000) 

775 

776 for m in match: 

777 # Extract the JSON-like string from the matched content 

778 

779 # Convert the JSON-like string into a Python dictionary object (JSON object) 

780 internal_match_pattern = r"(\{.*?\})" 

781 internal_matches = re.findall(internal_match_pattern, m, re.DOTALL) 

782 list_internal_json = [] # We expect one ! 

783 for mm in internal_matches: 

784 try: 

785 mm = remove_comment_end_of_line_and_print_them(mm) 

786 json_obj = json.loads(mm) 

787 if "date" in json_obj: 

788 current_date, parsed_or_forced = parse_date(json_obj["date"]) 

789 json_obj["datet"] = current_date 

790 json_obj["date_parsed_or_forced"] = parsed_or_forced 

791 if "date_fin_arret_travail" in json_obj: 

792 current_date, parsed_or_forced = parse_date(json_obj["date_fin_arret_travail"]) 

793 json_obj["date_fin_arret_travailt"] = current_date 

794 if "date_entree_hospitalisation" in json_obj: 

795 current_date, parsed_or_forced = parse_date(json_obj["date_entree_hospitalisation"]) 

796 json_obj["date_entree_hospitalisationt"] = current_date 

797 if "date_sortie_hospitalisation" in json_obj: 

798 current_date, parsed_or_forced = parse_date(json_obj["date_sortie_hospitalisation"]) 

799 json_obj["date_sortie_hospitalisationt"] = current_date 

800 if "date_debut_arret_travail" in json_obj: 

801 current_date, parsed_or_forced = parse_date(json_obj["date_debut_arret_travail"]) 

802 json_obj["date_debut_arret_travailt"] = current_date 

803 for k in json_obj: 

804 if str(json_obj[k]).lower() == "nan": 

805 json_obj[k] = 0 

806 if verbose: 

807 print("Extracted JSON:", json_obj) 

808 for k in json_obj: 

809 if str(json_obj[k]).lower() == "nan": 

810 json_obj[k] = 0 

811 list_internal_json.append(json_obj) 

812 except json.JSONDecodeError as e: 

813 print("ERROR Failed to parse JSON:", e) 

814 print("JSON content:", mm) 

815 print("WE add an empty json to avoid decalage in the list of results !") 

816 list_internal_json.append({}) 

817 if len(list_internal_json) == 1: 

818 all_results.append(list_internal_json[0]) 

819 elif len(list_internal_json) == 0: 

820 print("Internal error parsing json") 

821 else : 

822 print("ERROR We keep only the first one : Inconsistent split by document to be reported : " + str(list_internal_json)) 

823 all_results.append(list_internal_json[0]) 

824# else: 

825# print("No JSON content found matching the pattern") 

826 

827# if len(all_results) == 1: 

828# return all_results[0] 

829# else : 

830 return all_results 

831 

832def append_id_by_order(list_json): 

833 for i in range(len(list_json)): 

834 list_json[i]["id"] = i 

835 return list_json 

836 

837def complete_date_and_order_json_to_mettre_en_forme(list_json): 

838 

839 if len(list_json) == 0: 

840 return None 

841 

842 first_with_date = 0 

843 while first_with_date < len(list_json) and not "datet" in list_json[first_with_date]: 

844 first_with_date += 1 

845 if first_with_date == len(list_json): 

846 print(" No Dates ! ") 

847 return list_json 

848 start_date = list_json[0]["datet"] 

849 for i in range(first_with_date): 

850 # Enlever i - first_with_date jours à la date de début 

851 import datetime 

852 current_date = start_date - datetime.timedelta(days=first_with_date - i) 

853 list_json[i]["datet"] = current_date 

854 

855 current_date = start_date 

856 

857 for i in range(first_with_date, len(list_json)): 

858 if "datet" not in list_json[i]: 

859 list_json[i]["datet"] = current_date 

860 else : 

861 current_date = list_json[i]["datet"] 

862 

863 list_json = sorted(list_json, key=lambda x : x["datet"]) 

864 

865 return list_json 

866 

867def reorder_paragraph_by_order_lex_token(one_paragraph, list_tokens): 

868 id_paragraph = one_paragraph["id_paragraph"] 

869 

870def find_list_tokens_to_keep(list_ids_paragraph_to_keep, list_tokens): 

871 list_token_to_keep = [] 

872 for token in list_tokens: 

873 if token["id_paragraph"] in list_ids_paragraph_to_keep: 

874 list_token_to_keep.append(token) 

875 

876 return list_token_to_keep 

877 

878 

879 

880def order_token_and_concat(list_tokens): # ca seems de ne pas changer 

881 # Je veux ré-ordonner les token selon leur ordre lexicographique y, x 

882 # En fait ca ne va pas car on ne veut pas utiliser de manière stricte la position mais à une ligne près, il semble que les tokens soient ordonner par ordre lexicographique des lignes 

883# list_tokens_ordered = sorted(list_tokens, key=lambda x : (x["y"], x["x"])) 

884 

885 if list_tokens == []: 

886 return "" 

887 

888 import math 

889 import numpy as np 

890 width = max(list(map(lambda x: x["x"] + x["w"], list_tokens))) 

891 end_token = list(map(lambda x: x["x"] + x["w"], list_tokens)) 

892 mean_height = np.mean(list(map(lambda x: x["h"], list_tokens))) 

893 mean_width_char = np.mean(list(map(lambda x : float(x["w"])/float(len(x["text"].lstrip("\n"))), list_tokens))) 

894 

895 list_tokens_ordered = sorted(list_tokens, key=lambda x: x["y"] * float(width) / float(mean_height) + x["x"]) 

896 

897 new_text = " ".join(list(map(lambda x : x["text"], list_tokens_ordered))) 

898# new_text = " ".join(list(map(lambda x: x["text"], list_tokens))) 

899 

900 # Group by lines if difference lower than 50% of mean line 

901 

902 type_algo = "line_next_line" 

903 if type_algo == "stric": 

904 # Algo strict => TODO test sur les rest ou tests après coup pour savoir si le document est penché (correction possible avant) 

905 map_line_list_token = {} 

906 for token in list_tokens: 

907 (rest, mod_line) = math.modf(token["y"] / mean_height) 

908 if mod_line not in map_line_list_token: 

909 map_line_list_token[mod_line] = [] 

910 map_line_list_token[mod_line].append(token) 

911 

912 for mod_line in map_line_list_token: 

913 map_line_list_token[mod_line] = sorted(map_line_list_token[mod_line], key = lambda x : x["x"]) 

914 

915 new_text = "" 

916 for mod_line in map_line_list_token: 

917 for token in map_line_list_token[mod_line]: 

918 new_text += " " + token["text"] 

919 elif type_algo == "line_next_line": 

920 prop_next_line = 0.5 

921 

922 list_tokens_ordered_by_y = sorted(list_tokens, key=lambda x: x["y"]) 

923 list_token_by_line = [] 

924 list_current_line = [] 

925 current_line_y = list_tokens_ordered_by_y[0]["y"] 

926 for token in list_tokens_ordered_by_y: 

927 this_y = token["y"] 

928 if this_y < current_line_y + prop_next_line * mean_height: 

929 list_current_line.append(token) 

930 else : 

931 list_token_by_line.append(sorted(list_current_line, key = lambda x : x["x"])) 

932 current_line_y = this_y 

933 list_current_line = [token] 

934 list_token_by_line.append(sorted(list_current_line, key=lambda x: x["x"])) 

935 

936 new_text = "" 

937 for one_line in list_token_by_line: 

938 for token in one_line: 

939 new_text += " " + token["text"] 

940 

941 else: 

942 print ("Type algo not supported " + type_algo) 

943 

944 return new_text 

945 

946 

947 

948def concat_content_from_list_page_doc(list_page_doc, 

949 reproduce_format_new_page=False, 

950 height_line = 0, 

951 reorder_paragraph_by_order_lex_token = False, 

952 smart_new_line_from_token_pos = False, 

953 list_class_copy = None): 

954 from_json_content_copy = "" 

955 current_position_vertical = 0 

956 for page in list_page_doc: 

957 list_ids_paragraph_to_keep = [] 

958 text_one_page = "" 

959 for paragraph in page: 

960 condition_keep = "class" not in paragraph or paragraph["class"] == "content" or paragraph["class"] == "unknown" or paragraph["class"] == "undefined" # or paragraph["class"] == "autre" # cas de bug 

961 if list_class_copy == "all": 

962 condition_keep = True 

963 if "," in list_class_copy: 

964 list_class_copy_as_list = list_class_copy.split(",") 

965 condition_keep = "class" not in paragraph or paragraph["class"] in list_class_copy_as_list 

966 if condition_keep: 

967 list_ids_paragraph_to_keep.append(paragraph["id"]) 

968 

969 if height_line == 0: 

970 height_line = paragraph["h"] 

971 text_one_page += paragraph["text"]# + "\n" 

972 if reproduce_format_new_page: 

973 if current_position_vertical == 0: 

974 current_position_vertical = paragraph["y"] 

975 else: 

976 if paragraph["y"] > current_position_vertical + 0.8 * height_line: 

977 text_one_page += "\n" 

978 current_position_vertical = paragraph["y"] 

979 else: 

980 text_one_page += " " 

981 else: 

982 text_one_page += "\n" 

983 

984 if reorder_paragraph_by_order_lex_token: 

985 try: 

986 text_one_page = order_token_and_concat(find_list_tokens_to_keep(list_ids_paragraph_to_keep, page.list_blocks["tokens"])) 

987 except Exception as e: 

988 print(" Error in order_token_and_concat : " + str(e) + " for page : " + str(page) + " and list_ids_paragraph_to_keep : " + str(list_ids_paragraph_to_keep)) 

989 

990 if smart_new_line_from_token_pos: 

991 print(" TODO BOUH c'est complique") 

992 

993 from_json_content_copy += text_one_page 

994 return from_json_content_copy 

995 

996def order_df_by_date(df): 

997 if "datet" in df.columns: 

998 df.sort_values(by="datet", ascending=True, inplace=True) 

999 df.reset_index(drop=True, inplace=True) 

1000 return df 

1001 else: 

1002 return df 

1003 

1004def order_by_document_type(df): 

1005 import pandas as pd 

1006 if type(df) != pd.DataFrame or 'document_type' not in df.columns: 

1007 print("No document_type in the DataFrame") 

1008 return df 

1009 # we just want to set the certif_at at the end 

1010 # Sélectionnez les lignes où DocumentType n'est pas égal à 'certif' 

1011 non_certif_fact = df[(df['document_type'] != 'certif_at') & (df['document_type'] != 'facture_inutile') & (df['document_type'] != 'facture_utile') & (df['document_type'] != 'facture')] 

1012 # Sélectionnez les lignes où DocumentType est égal à 'certif' 

1013 certif = df[df['document_type'] == 'certif_at'] 

1014 fact = df[(df['document_type'] == 'facture_inutile') | (df['document_type'] == 'facture_utile') | (df['document_type'] == 'facture')] 

1015 # Concaténez les deux DataFrames en mettant les 'certif' à la fin 

1016 df_sorted = pd.concat([non_certif_fact, certif, fact], ignore_index=True) 

1017 # ``` Si vous tenez à conserver les index originaux dans le DataFrame résultant, 

1018 # omettez `ignore_index=True` 

1019 

1020 return df_sorted 

1021 

1022def add_blank_line(df, nb_blank_line): 

1023 import pandas as pd 

1024 if type(df) != pd.DataFrame: 

1025 print("No DataFrame") 

1026 return df 

1027 if nb_blank_line == 0: 

1028 return df 

1029 nb_current_line = len(df) 

1030 df = pd.concat([df, pd.DataFrame([[""] * len(df.columns) for i in range(nb_blank_line)], columns=df.columns)], ignore_index=True) 

1031 for i in range(nb_current_line, nb_current_line + nb_blank_line): 

1032 df.loc[i, "id"] = i 

1033 return df 

1034 

1035def add_parsing_meta_info_to_table(df, list_json_to_mettre_en_forme, verbose = False): 

1036 import pandas as pd 

1037 if len(list_json_to_mettre_en_forme) == 0: 

1038 return df 

1039 # Franchement violent mais bon ! 

1040 df_from_json_parsed = pd.DataFrame(list_json_to_mettre_en_forme) 

1041 

1042 if "date_parsed_or_forced" in df.columns: 

1043 df_from_json_parsed.drop("date_parsed_or_forced", axis=1, inplace=True) 

1044 if "document_type" in df.columns: 

1045 df_from_json_parsed.drop("document_type", axis=1, inplace=True) 

1046 # date_parsed_or_forced 

1047 

1048 # df = df.reset_index() 

1049 # df_from_json_parsed = df_from_json_parsed.reset_index() 

1050# df = df.reset_index(drop=True) 

1051# df.reset_index(inplace=True, drop=True) 

1052 

1053# df = pd.concat([df, df_from_json_parsed], ignore_index=True) 

1054 if verbose: 

1055 print(" df : " + df.to_string()) if type(df) == pd.DataFrame else print(" df : " + str(df)) 

1056 print(" df_from_json_parsed : " + df_from_json_parsed.to_string()) if type(df_from_json_parsed) == pd.DataFrame else print(" df_from_json_parsed : " + str(df_from_json_parsed)) 

1057 if type(df) == pd.DataFrame and type(df_from_json_parsed) == pd.DataFrame: 

1058 print(" Merge : " + str(df.shape) + " " + str(df_from_json_parsed.shape)) 

1059 if "Titre" in df.columns and "Titre" in df_from_json_parsed.columns: 

1060 df_from_json_parsed = df_from_json_parsed.rename(columns={'Titre': 'TitreMeta'}) 

1061 df = pd.concat([df, df_from_json_parsed], axis=1) #.reset_index(drop=True) 

1062 # on va plutot le cacher ! VR 25-4-24 

1063# if "compte_rendu_complet_medecin" in df.columns: # since it comes from json and we do not ask for compte_rendu_complet_medecin in split_by_doc which creates the markdown table to be loaded in dataframe and we don't want to display it here from the parsing by document 

1064# df = df.drop("compte_rendu_complet_medecin", axis=1)#, inplace=True) 

1065 

1066 if "Commentaires" in df and "probleme_rencontre_extraction" in df: 

1067 for i in range(len(df)): 

1068 if df["probleme_rencontre_extraction"][i] != "": 

1069 # df["Commentaires"][i] = str(df["Commentaires"][i]) + "\n" + str(df["probleme_rencontre_extraction"][i]) => ca c'est du chained indexing, pandas n'aime pas et puis quoi encore ? 

1070 df.loc[i, "Commentaires"] = str(df.loc[i, "Commentaires"]) + "\n" + str(df.loc[i, "probleme_rencontre_extraction"]) 

1071# df["Commentaires"] = df["Commentaires"] + "\n" + df["probleme_rencontre_extraction"] 

1072 df.drop("probleme_rencontre_extraction", axis=1, inplace=True) 

1073 

1074 if "indication_medecin" in df.columns: 

1075 df.drop(columns=["indication_medecin"], axis=1, inplace=True) 

1076 

1077 return df 

1078 

1079 

1080 

1081def create_json_match_date(intro, list_dates): 

1082 intro_read = intro 

1083 json_match_date = [] 

1084 if len(list_dates) == 0: 

1085 return [{"text" : intro, "type" : "text"}] 

1086 idx_char_wo_date = 0 

1087 len_previous_part_intro = 0 

1088 while idx_char_wo_date < len(intro_read): 

1089 first_date = 0 

1090 idx_first_date_arg_min = -1 

1091 for idx_date in range(len(list_dates)): 

1092 # Si Je n'ai pas envie de gérer une erreur, donc je ne vais etre uniquement dans le cas ou idx_char_wo_date == 0 et utiliser len_previous_part_intro et réduire la taille de intro au fur et à mesure 

1093 next_char = intro_read.find(list_dates[idx_date]) 

1094 if next_char != -1: 

1095 if first_date == 0 or next_char < first_date: 

1096 first_date = next_char 

1097 idx_first_date_arg_min = idx_date 

1098 

1099 if idx_first_date_arg_min == -1: 

1100 json_match_date.append({"text" : intro_read[idx_char_wo_date:], "type" : "text"}) 

1101 break 

1102 else : 

1103 if idx_char_wo_date < first_date: 

1104 json_match_date.append({"text" : intro_read[idx_char_wo_date:first_date], "type" : "text"}) 

1105 json_match_date.append({"text" : list_dates[idx_first_date_arg_min], "type" : "date"}) 

1106 idx_char_wo_date = 0 

1107 len_previous_part_intro += first_date + len(list_dates[idx_first_date_arg_min]) 

1108 if len(intro_read) > first_date + len(list_dates[idx_first_date_arg_min]): 

1109 intro_read = intro_read[first_date + len(list_dates[idx_first_date_arg_min]):] 

1110 else: 

1111 print("Internal error, avoiding the rest of the intro !") 

1112 

1113 return json_match_date 

1114 

1115# Ajoute une bordure gauche 

1116def add_border_left(paragraph): 

1117 from docx.oxml import OxmlElement 

1118 from docx.oxml.ns import qn 

1119 # Crée un nouvel élément 'pBdr' (bordure de paragraphe) 

1120 p_bdr = OxmlElement('w:pBdr') 

1121 # Crée un élément 'left' pour la bordure gauche 

1122 left_bdr = OxmlElement('w:left') 

1123 # Définit les attributs de la bordure gauche - ici, un trait solide ('single') et la taille de la bordure 

1124 left_bdr.set(qn('w:val'), 'single') 

1125 left_bdr.set(qn('w:sz'), '4') # 4/8" taille de la bordure, où 8 est l'unité de mesure 

1126 left_bdr.set(qn('w:space'), '4') # 4/20" d'espace entre la bordure et le texte 

1127 left_bdr.set(qn('w:color'), 'auto') # 000000 Couleur de la bordure, en hexadécimal (noir ici) 

1128 # Ajoute la bordure gauche à l'élément 'pBdr' 

1129 p_bdr.append(left_bdr) 

1130 

1131 p_pPr = OxmlElement('w:pPr') 

1132 p_pPr.append(p_bdr) 

1133 

1134 # Ajoute la bordure au paragraphe en modifiant son élément XML 

1135# paragraph._element.insert(1, p_bdr) 

1136 paragraph._element.insert(1, p_pPr) 

1137 

1138 

1139 

1140def write_table_list_inner_document_0424_bis(df, input_col_intro, 

1141 input_col_cr, out_file, 

1142 hash_id_treatment, out_folder, 

1143 format_info = {}, 

1144 verbose = False, 

1145 content_resume = "", append_resume = False): 

1146 audit_info_write = {} 

1147 list_document_type_no_cr = format_info["list_type_doc_no_cr"] if "list_type_doc_no_cr" in format_info else [] 

1148 document_format = format_info["document"] if "document" in format_info else {} 

1149 left_indent = document_format["left_indent"] if "left_indent" in document_format else 0 

1150 new_line_intro_cr = document_format["new_line_intro_cr"] if "new_line_intro_cr" in document_format else False 

1151 size_intro = document_format["size_intro"] if "size_intro" in document_format else 12 

1152 size_cr = document_format["size_cr"] if "size_cr" in document_format else 11 

1153 italic_cr = document_format["italic_cr"] if "italic_cr" in document_format else False 

1154 list_variable_underline = format_info["list_variable_underline"] if "list_variable_underline" in format_info else [] 

1155 list_variable_bold = format_info["list_variable_bold"] if "list_variable_bold" in format_info else [] 

1156 cr_left_border = document_format["cr_left_border"] if "cr_left_border" in document_format else 0 

1157 

1158 format_date = format_info["format_date"] if "format_date" in format_info else "%d %B %Y" 

1159 regexp_date = format_info["regexp_date"] if "regexp_date" in format_info else r"(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})" 

1160 if format_date == "%d %B %Y": 

1161 regexp_date = r'(1er \w+ \d{4}|\d{1,2} \w+ \d{4})' 

1162 if format_date == "%d/%m/%Y" or format_date == "%d-%m-%y": 

1163 regexp_date = r'(1er[-/]\d{1,2}[-/]\d{4}|\d{1,2} \w+ \d{4})' 

1164 

1165 total_text = "" 

1166 

1167 # Argument to modularize : df, input_col_intro, input_col_cr, out_file, hash_id_treatment 

1168 # Output : nb_file, nb_page, nb_modif_manual, total_text 

1169 

1170 # Loop over df data and add the content of the document 

1171 # VR TO MOVE 

1172 from docx import Document 

1173 # from docx.shared import Inches 

1174 from docx.shared import Pt 

1175 path_template = "" 

1176 try : 

1177 import os 

1178 git_safia = os.getenv("GITSAFIA") 

1179 input_file = "template_justif.docx" 

1180 input_file = "O_DocumentEcritParGHetJustifier_sur_MSWORD.docx" 

1181 path_template = os.path.join(git_safia, "prompt/python/data/template", input_file) 

1182 except Exception as e: 

1183 print(str(e)) 

1184 if os.path.exists(path_template) and False: 

1185 document = Document(path_template) 

1186 else: 

1187 document = Document() 

1188 

1189 if verbose: 

1190 print(document.settings._element.xml) 

1191 

1192 correct_settings_str = """<w:settings xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" xmlns:o="urn:schemas-microsoft-com:office:office" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main"> 

1193 <w:view w:val="print"/> 

1194 <w:mirrorMargins w:val="0"/> 

1195 <w:bordersDoNotSurroundHeader w:val="0"/> 

1196 <w:bordersDoNotSurroundFooter w:val="0"/> 

1197 <w:displayBackgroundShape/> 

1198 <w:revisionView w:markup="1" w:comments="1" w:insDel="1" w:formatting="0"/> 

1199 <w:defaultTabStop w:val="720"/> 

1200 <w:autoHyphenation w:val="0"/> 

1201 <w:evenAndOddHeaders w:val="0"/> 

1202 <w:bookFoldPrinting w:val="0"/> 

1203 <w:noLineBreaksAfter w:lang="français" w:val="‘“(〔[{〈《「『【⦅〘〖«〝︵︷︹︻︽︿﹁﹃﹇﹙﹛﹝「"/> 

1204 <w:noLineBreaksBefore w:lang="français" w:val="’”)〕]}〉"/> 

1205 <w:doNotExpandShiftReturn /> 

1206 <w:compat> 

1207 <w:compatSetting w:name="compatibilityMode" w:uri="http://schemas.microsoft.com/office/word" w:val="15"/> 

1208 </w:compat> 

1209 <w:clrSchemeMapping w:bg1="light1" w:t1="dark1" w:bg2="light2" w:t2="dark2" w:accent1="accent1" w:accent2="accent2" w:accent3="accent3" w:accent4="accent4" w:accent5="accent5" w:accent6="accent6" w:hyperlink="hyperlink" w:followedHyperlink="followedHyperlink"/> 

1210 </w:settings>""" 

1211 

1212 from docx.oxml import parse_xml 

1213 correct_settings_xml = parse_xml(correct_settings_str) 

1214# document.settings._element = correct_settings_xml 

1215 

1216 if verbose: 

1217 print(document.settings._element.xml) 

1218 

1219 map_nb_word_per_doc = {} 

1220 map_type_document_per_doc = {} 

1221 map_list_page_per_doc = {} 

1222 

1223 # run = document.add_paragraph().add_run() 

1224 '''Apply style''' 

1225 style = document.styles['Normal'] 

1226 style.paragraph_format.line_spacing = 1 

1227 style.paragraph_format.space_after = Pt(0) 

1228 font = style.font 

1229 font.name = document_format["font"] if "font" in document_format else 'Times New Roman' 

1230 font.size = Pt(size_cr) 

1231 

1232 # <w:pPr> 

1233 # <w:pBdr> 

1234 # <w:left w:val="single" w:sz="4" w:space="4" w:color="auto"/> 

1235 # </w:pBdr> 

1236 # <w:rPr> 

1237 # <w:lang w:val="fr-FR"/> 

1238 # </w:rPr> 

1239 # </w:pPr> 

1240 

1241 cr_justify = False 

1242 if "cr_alignment" in document_format: 

1243 if document_format["cr_alignment"] == "justify": 

1244 cr_justify = True 

1245 

1246 #paragraph.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY 

1247 

1248# table.rows[1].cells[1].add_paragraph(item['description'].replace('\n', ' ')).alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY 

1249 

1250 # paragraph = document.add_paragraph('Some text\n') 

1251 '''Add another sentence to the paragraph''' 

1252 # sentence = paragraph.add_run('A new line that should have a different font') 

1253 '''Then format the sentence''' 

1254 # sentence.font.name = 'Arial' 

1255 # sentence.font.size = docx.shared.Pt(10) 

1256 

1257 if append_resume: 

1258 p = document.add_paragraph() 

1259 p.add_run("Resume fourni tel quel : " + content_resume + "\n\n").bold = True 

1260 

1261 

1262 # VR 22/1/26 TODO : Add bordereau de la liste des pièces 

1263 

1264 

1265 

1266# document.add_heading('Compte Rendu de dossier medical', 0) 

1267 list_of_pages_as_csv_list = [] 

1268 list_of_pages_as_map_csv_min_int = [] 

1269 for index, row in df.iterrows(): 

1270 map_type_document_per_doc[index] = row["document_type"] 

1271 liste_of_page = row["Liste des pages"] 

1272 

1273 if liste_of_page == "": 

1274 continue 

1275 

1276 map_list_page_per_doc[index] = liste_of_page 

1277 

1278 if liste_of_page != "" and liste_of_page != None: 

1279 liste_of_page = str(liste_of_page) 

1280 if liste_of_page.replace(" ", "").replace(",", "").isdigit(): 

1281 list_of_pages_as_map_csv_min_int.append({"csv" : liste_of_page.replace(" ", ""), "min_int" : min(list(map(int, liste_of_page.split(","))))}) 

1282 list_of_pages_as_csv_list.append(liste_of_page.replace(" ", "")) 

1283 if input_col_intro in df.columns: 

1284 intro = row[input_col_intro] 

1285 else: 

1286 intro = "Missing data wtf" 

1287 if input_col_cr in df.columns: 

1288 cr = row[input_col_cr] 

1289 else : 

1290 cr = "No Data Provided" 

1291 # intro = df.loc[index, input_col_intro] 

1292 # cr = df.loc[index, input_col_cr] 

1293 

1294 # paragraph = document.add_paragraph(intro) 

1295 '''Add another sentence to the paragraph''' 

1296 # sentence = paragraph.add_run(cr) 

1297 '''Then format the sentence''' 

1298 # sentence.font.name = 'Arial' 

1299 # sentence.font.size = docx.shared.Pt(10) 

1300 

1301 p = document.add_paragraph() 

1302 if type(intro) == float: 

1303 print("WTF : " + str(intro)) 

1304 intro = str(intro) 

1305 

1306 if len(list_variable_underline) == 0 and len(list_variable_bold) == 0: 

1307 p.add_run(intro).font.size = Pt(size_intro) 

1308 else: 

1309 # VR TODO 9-8-24 Ici on va adapter le style des dates, soit on a la liste des dates, mais je ne sais pas exactement ou la construire, soit on a parse, mais il faut créer les expression régulière à partir du format des dates, cela peut se faire au cas par cas meme si ce n'est pas l'idéal 

1310 all_dates = re.findall(regexp_date, intro) 

1311 list_dates = [] 

1312 if all_dates: 

1313 for date in all_dates: 

1314 list_dates.append(date) 

1315 

1316 # On pourrait traiter les cas particulier, ou une seule date est présente ou bien ou chaque date est présente une unique fois 

1317 # VR TODO 9-8-24 : on pourrait aussi faire une fonction auxiliaire 

1318 from lib.lib_util import create_json_match_date 

1319 json_match_date = create_json_match_date(intro, list_dates) 

1320 

1321 underline_date = len(list_variable_underline) > 0 and "date" in list_variable_underline 

1322 bold_date = len(list_variable_bold) > 0 and "date" in list_variable_bold 

1323 if underline_date and bold_date: 

1324 print("Only bold will be done !") 

1325 

1326 for part in json_match_date: 

1327 if part["type"] == "text": 

1328 p.add_run(part["text"]).font.size = Pt(size_intro) 

1329 else: 

1330 if underline_date: 

1331 p.add_run(part["text"]).underline = True 

1332 if bold_date: 

1333 p.add_run(part["text"]).bold = True 

1334 

1335 # p = document.add_paragraph() 

1336 # p.add_run(cr_content).underline = True 

1337 # p.add_run(cr_content).bold = True 

1338 

1339# p.add_run(intro).font.size = Pt(12) 

1340 

1341 try: 

1342 doc_type = row["document_type"] 

1343 except Exception as e: 

1344 doc_type = "unknown" 

1345 print(str(e)) 

1346 if doc_type in list_document_type_no_cr: 

1347 cr = "\n" 

1348 p.add_run(cr) 

1349 else: 

1350 cr_strip = cr.strip("\n") 

1351 if cr_strip == "": 

1352 continue 

1353 cr_content = "«" + cr_strip + "»" + "\n" 

1354 print("Warning on pourrait faire cela plus propre enfin bon") 

1355 cr_content = cr_content.replace("««", "«") 

1356 cr_content = cr_content.replace("»»", "»") 

1357 

1358 if new_line_intro_cr: 

1359 cr_content = "\n" + cr_content 

1360 

1361 for one_cr_content_line in cr_content.split("\n"): 

1362 p = document.add_paragraph() 

1363 if cr_justify: 

1364 from docx.enum.text import WD_ALIGN_PARAGRAPH 

1365 # p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY 

1366 p.paragraph_format.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY 

1367 if left_indent > 0: 

1368 p.paragraph_format.left_indent = Pt(left_indent) 

1369 #cr_content = "\t" + cr_content.replace("\n", "\n\t") 

1370 

1371 if italic_cr: 

1372 p.add_run(one_cr_content_line).italic = True 

1373 else: 

1374 p.add_run(one_cr_content_line) 

1375 

1376 if cr_left_border: 

1377 add_border_left(p) 

1378 

1379 map_nb_word_per_doc[index] = len(intro.split(" ")) + len(cr.split(" ")) + 2 

1380 

1381 total_text += intro 

1382 total_text += str(cr) 

1383 

1384 # ordonner le dictionnaire list_of_pages_as_map_csv_min_int par la clef min_int pour avoir un ordre croissant 

1385 list_of_pages_as_map_csv_min_int = sorted(list_of_pages_as_map_csv_min_int, key=lambda x : x["min_int"]) 

1386 list_of_pages_as_csv_list = list(map(lambda x : x["csv"], list_of_pages_as_map_csv_min_int)) 

1387 liste_of_page_as_ccsv = ";".join(list_of_pages_as_csv_list) 

1388 outfile_name_docx = hash_id_treatment + ".docx" 

1389 link_for_manax_temp = os.path.join(out_folder, outfile_name_docx) 

1390 if out_file == None: 

1391 out_file = link_for_manax_temp 

1392 else: 

1393 out_file = os.path.join(out_folder, out_file) 

1394 print("About to out_file : " + out_file) 

1395 if not os.path.exists(os.path.dirname(out_file)): 

1396 os.makedirs(os.path.dirname(out_file)) 

1397 document.save(out_file) 

1398 document.save(link_for_manax_temp) 

1399 print("out_file : " + out_file) 

1400 os.system("chmod 755 " + out_file) 

1401 os.system("chmod 755 " + link_for_manax_temp) 

1402 

1403 audit_info_write["map_nb_word_per_doc"] = map_nb_word_per_doc 

1404 audit_info_write["map_type_document_per_doc"] = map_type_document_per_doc 

1405 audit_info_write["list_of_pages_as_sccsv"] = liste_of_page_as_ccsv 

1406 

1407 return total_text, os.path.basename(out_file), audit_info_write #outfile_name_docx 

1408 

1409# depreadacte 15/5/24 

1410def write_table_list_inner_document(df, doc, with_hyperlink = False): 

1411 

1412 import docx 

1413 from docx.oxml.shared import OxmlElement 

1414 if doc == None: 

1415 from docx import Document 

1416 # Créer un nouveau document Word 

1417 doc = Document() 

1418 

1419 # Ajouter un titre 

1420 doc.add_heading('Index For Documents', level=1) 

1421# toc_paragraph = document.add_paragraph("Table des Matières\n", style='Heading1') 

1422 

1423 # Ajouter une table au document Word avec une ligne pour les en-têtes 

1424 table = doc.add_table(rows=1, cols=len(df.columns)) 

1425 

1426 # Définir le style de la table 

1427 table.style = 'Table Grid' 

1428 

1429 styles = doc.styles 

1430 print(str(len(styles))) 

1431 

1432 # Ajouter les en-têtes de colonnes 

1433 hdr_cells = table.rows[0].cells 

1434 for i, col_name in enumerate(df.columns): 

1435 hdr_cells[i].text = col_name 

1436 

1437 # Ajouter les lignes de données à la table 

1438 for index, row in df.iterrows(): 

1439 row_cells = table.add_row().cells 

1440 for i, value in enumerate(row): 

1441 row_cells[i].text = str(value) 

1442 

1443 if with_hyperlink: 

1444 toc_paragraph = doc.add_paragraph("Listes des liens (un peu rustique du coup)\n", style='Heading1') 

1445 

1446 for index, row in df.iterrows(): 

1447 anchor = f"#_Ref{index}" 

1448 # add_hyperlink(toc_paragraph, row['titre'], anchor) 

1449 hyperlink = docx.oxml.shared.OxmlElement('w:hyperlink') 

1450 hyperlink.set(docx.oxml.shared.qn('w:anchor'), anchor) 

1451 

1452 

1453 run = toc_paragraph.add_run() 

1454 run.text = "Lien vers un paragraph\n" 

1455 rPr = run._r.get_or_add_rPr() 

1456 

1457 rStyle = OxmlElement('w:rStyle') 

1458 rStyle.set(docx.oxml.shared.qn('w:val'), 'Hyperlink') # Utilisez le style de lien hypertexte ici 

1459 rPr.append(rStyle) 

1460 

1461 

1462 

1463 # [ ] TODO VR : l'archi est vraiment merdique de la creation des documents, faut-il une classe ou autre chose, notament pour avoir une interface web etcaetera ? 

1464# doc.add_heading('Compte Rendu de dossier medical', 0) 

1465 return doc 

1466 

1467def to_save_work_use_or_suppress_with_hyperlink(document, index_for_hyperlink = 0): 

1468 

1469 p = document.add_paragraph() 

1470 

1471 import docx 

1472 from docx.oxml.shared import OxmlElement 

1473 

1474 # p = document.add_paragraph(style='Heading2') 

1475 # Ajouter un élément de signet pour la navigation 

1476 bookmark_start = OxmlElement('w:bookmarkStart') 

1477 bookmark_start.set(docx.oxml.shared.qn('w:id'), str(index_for_hyperlink)) 

1478 bookmark_start.set(docx.oxml.shared.qn('w:name'), f"_Ref{index_for_hyperlink}") 

1479 p._p.addnext(bookmark_start) 

1480 

1481 bookmark_end = OxmlElement('w:bookmarkEnd') 

1482 bookmark_end.set(docx.oxml.shared.qn('w:id'), str(index_for_hyperlink)) 

1483 bookmark_start.addnext(bookmark_end) 

1484 

1485def format_one_res(input, 

1486 new_format_info, format_premier, format_date, 

1487 verbose = False, list_variable_bold = [], list_variable_underline = []): # on pourrait juste mettre à -1 quand on ne l'a pas 

1488 

1489 print("format_one_res") 

1490 try: 

1491 import locale 

1492# loc = locale.getlocale(locale.LC_ALL) # get current locale 

1493 locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8') # use German locale; name might vary with platform 

1494 except Exception as e: 

1495 print(" Pb setting local") 

1496 print(str(e)) 

1497 

1498 format_date_hc = "%d %B %Y" 

1499 format_date_used = format_date if format_date != "" else format_date_hc 

1500 

1501 new_new_content = new_format_info 

1502 for k in input: # Il y a un truc à faire pour les dates 

1503 if k == "datet" or k == "date_fin_arret_travailt" or k == "date_entree_hospitalisationt" or k == "date_sortie_hospitalisationt" or k == "date_debut_arret_travailt": 

1504 replace = input[k].strftime(format_date_used) 

1505 if input[k].day == 1 and format_premier == "jd": 

1506 replace = replace.replace("01 ", "1\u1D31\u1D3F ") 

1507# if k in list_variable_bold: 

1508# replace = "\033[1m{}\033[0m".format(replace) 

1509 # Vérifiez si la clé doit être soulignée 

1510# if k in list_variable_underline: 

1511# replace = "\033[4m{}\033[0m".format(replace) 

1512 

1513# replace = "1\u1D31\u1D3F " + input[k].strftime("%B %Y") 

1514 print("-² 1er") 

1515 print(" U+2091 U+02B3 U+1D31 U+1D3F 1\u1D31\u1D3F \u2091 \u02B3") 

1516 new_new_content = new_new_content.replace("{" + k + "}", str(replace)) 

1517 else : 

1518 new_new_content = new_new_content.replace("{" + k + "}", str(input[k])) 

1519 

1520 print(" new_new_content : " + new_new_content) 

1521 

1522 return new_new_content 

1523 

1524 

1525 

1526def compute_list_input_to_format_per_document(format_info): 

1527 

1528 map_list_input_by_document = {} 

1529 for doc_type in format_info: 

1530 list_needed_input = [] 

1531 # Plutot que d'utiliser la liste des nom des meta données, on va faire un regexp pour chercher les mot entre accolade 

1532 import re 

1533 # On va chercher les mots entre accolades 

1534 matchs = re.findall(r'\{(.*?)\}', format_info[doc_type]) 

1535 if matchs: 

1536 for match in matchs: 

1537 if match not in list_needed_input: 

1538 list_needed_input.append(match) 

1539 else : 

1540 print("Awkward !") 

1541 map_list_input_by_document[doc_type] = list_needed_input 

1542 

1543 return map_list_input_by_document 

1544 

1545def show_frame(frame, bbox_list=None, text=None, 

1546 save_flag=False, save_name=None, wait_flag=False): 

1547 import cv2 

1548 

1549 # A list of colors to indicate the order of bounding boxes drawn. 

1550 color_list = [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0], 

1551 [255, 0, 255], [0, 255, 255]] 

1552 color_list = color_list + [255, 255, 255]*20 

1553 

1554 # Convert the frame to a BGR image if the input is grayscale. 

1555 if len(frame.shape) == 2: 

1556 frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) 

1557 

1558 # Draw a bounding box, if a bounding box was given. 

1559 if bbox_list: 

1560 for i, bbox in enumerate(bbox_list): 

1561 tl, br = bbox[0], bbox[1] 

1562 frame = cv2.rectangle(frame, tl, br, color_list[i], 4) 

1563 

1564 # Draw a text box, if a text string given. Add rectangle to emphasize text. 

1565 if text: 

1566 tbox_tl, tbox_br = (0, 0), (220, 25) 

1567 frame = cv2.rectangle(frame, tbox_tl, tbox_br, (255, 255, 255), -1) 

1568 

1569 # Add the text on top of the rectangle to the displayed frame. The 

1570 # cv2.putText() function places text based on the bottom left corner. 

1571 text_bl = (tbox_tl[0] + 5, tbox_br[1] - 5) 

1572 frame = cv2.putText(frame, text, text_bl, 

1573 cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2) 

1574 

1575 # Display the frame and wait for input if the wait flag is enabled. 

1576 cv2.imshow('frame', frame) 

1577 if wait_flag: 

1578 cv2.waitKey(0) 

1579 

1580 # Save the frame if the save_flag is enabled. 

1581 if save_flag: 

1582 cv2.imwrite(save_name, frame) 

1583 

1584 

1585 

1586import re 

1587def parse_id_date_nb_page_folder(text): 

1588 # nb_5_id_3_d_210224 

1589 

1590 import os 

1591 text = os.path.basename(text) 

1592 

1593 # Création des patterns pour NB, ID et D 

1594 nb_pattern = r'NB[_:]\s*(\d+)' 

1595 id_pattern = r'ID[_:]\s*(\d+)' 

1596 date_pattern = r'(?:^|[^I])D[_:]\s*(\d{2,5,6,7,8}|\d{2,8})' # Supposition d'un format de date comme ddmmyy ou ddmmyyyy 

1597# date_pattern = r'D[_:]\s*(\d{8})' # Supposition d'un format de date comme ddmmyy ou ddmmyyyy 

1598 

1599 # Recherche des patterns dans le texte 

1600 nb_result = re.search(nb_pattern, text.upper()) 

1601 id_result = re.search(id_pattern, text.upper()) 

1602 date_result = re.search(date_pattern, text.upper()) 

1603 

1604 # Extraction des résultats 

1605 nb = nb_result.group(1) if nb_result else 0 

1606 id = id_result.group(1) if id_result else 0 

1607 date = date_result.group(1) if date_result else None 

1608 

1609 return nb, id, date 

1610 

1611 

1612 

1613def create_prefix_file_name_from_json_prefix(json_prefix): 

1614 nb = json_prefix["nb"] if "nb" in json_prefix else 0 

1615 id = json_prefix["id"] if "id" in json_prefix else 0 

1616 date = json_prefix["date"] if "date" in json_prefix else "" 

1617 

1618 return create_prefix_file_name_from_id_date_nb_page_folder(nb, id, date) 

1619 

1620def create_prefix_file_name_from_id_date_nb_page_folder(nb, id, date): 

1621 prefix_file_name = "id_" + str(id) + "_nb_" + str(nb) + "_d_" + str(date) 

1622 return prefix_file_name 

1623 

1624def get_id_order_document(document_id): 

1625 if "_" not in document_id: 

1626 return -1 

1627 else: 

1628 last_info = document_id.split("_")[len(document_id.split("_"))-1] 

1629 return int(last_info) 

1630 

1631import requests 

1632# from https://www.tutorialspoint.com/how-to-check-whether-user-s-internet-is-on-or-off-using-python 

1633def internet_connection(): 

1634 try: 

1635 response = requests.get("https://www.fotonower.com", timeout=5) 

1636 return True 

1637 except requests.ConnectionError: 

1638 return False 

1639#if internet_connection(): 

1640# print("The Internet is connected.") 

1641#else: 

1642# print("The Internet is not connected.") 

1643 

1644def prepare_pagination(nb_page, page, max = 5): 

1645 if nb_page <= max: 

1646 return list(range(1, nb_page + 1)) 

1647 else: 

1648 if page <= max // 2: 

1649 return list(range(1, max + 1)) + ["..."] + [nb_page] 

1650 elif page > nb_page - max // 2: 

1651 return [1] + ["..."] + list(range(nb_page - max + 1, nb_page + 1)) 

1652 else: 

1653 return [1] + ["..."] + list(range(page - max // 2, page + max // 2 + 1)) + ["..."] + [nb_page] 

1654 

1655 

1656def remove_circular_refs(ob, _seen=None, verbose = False): 

1657 if _seen is None: 

1658 _seen = set() 

1659 if id(ob) in _seen: 

1660 if verbose: 

1661 print(" Remove circular reference with id " + str(id(ob)) + " : ob " + str(ob)) 

1662 # circular reference, remove it. 

1663 return None 

1664 _seen.add(id(ob)) 

1665 res = ob 

1666 if isinstance(ob, dict): 

1667 res = { 

1668 remove_circular_refs(k, _seen, verbose = verbose): remove_circular_refs(v, _seen, verbose = verbose) 

1669 for k, v in ob.items()} 

1670 elif isinstance(ob, (list, tuple, set, frozenset)): 

1671 res = type(ob)(remove_circular_refs(v, _seen, verbose = verbose) for v in ob) 

1672 # remove id again; only *nested* references count 

1673 _seen.remove(id(ob)) 

1674 return res 

1675 

1676 

1677def change_nan_to_string(ob): 

1678 import math 

1679 

1680 res = ob 

1681 if isinstance(ob, dict): 

1682 res = {k: change_nan_to_string(v) 

1683 for k, v in ob.items()} 

1684 elif isinstance(ob, (list, tuple, set, frozenset)): 

1685 res = type(ob)(change_nan_to_string(v) for v in ob) 

1686 else: 

1687 if isinstance(ob, float) and (math.isnan(ob) or math.isinf(ob)): 

1688 res = "NaN" 

1689 elif isinstance(ob, str): 

1690 res = ob 

1691 else: 

1692 res = ob 

1693 

1694 return res 

1695 

1696 

1697def load_json(file_path): 

1698 import json 

1699 with open(file_path, 'r') as json_file: 

1700 data = json.load(json_file) 

1701 return data 

1702 

1703def load_csv_as_dict(input_csv): 

1704 input_from_csv = {} 

1705 for k in input_csv.split(","): 

1706 if k != "": 

1707 try: 

1708 key, value = k.split("=") 

1709 except Exception as e: 

1710 print("Error in input_csv : " + str(e)) 

1711 continue 

1712 input_from_csv[key] = value 

1713 return input_from_csv 

1714 

1715 

1716def aux_parse_date_in_text(input_text): 

1717 list_dates = [] 

1718 

1719# Voici un exemple de texte d'entrée contenant des dates dans différents formats 

1720 texte = """ 

1721Le rendez-vous aura lieu le 12/04/2021 et sera suivi d'une autre rencontre le 23-05-2022. 

1722Il y a aussi événement prévu pour le 01/08, sans oublier le 15 janvier 2023. 

1723Nous avons aussi noté une réunion le 2 février et un appel le mercredi 10 mars 2021. 

1724""" 

1725 

1726# Liste des motifs pour les dates dans différents formats 

1727 motifs_dates = [ 

1728 r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', # Format DD/MM/YYYY ou DD-MM-YYYY 

1729 r'\b\d{1,2}[/-]\d{1,2}\b', # Format DD/MM ou DD-MM 

1730 r'\b\d{1,2}\s+janvier|\b\d{1,2}\s+février|\b\d{1,2}\s+mars|\b\d{1,2}\s+avril|\b\d{1,2}\s+mai|\b\d{1,2}\s+juin|\b\d{1,2}\s+juillet|\b\d{1,2}\s+août|\b\d{1,2}\s+septembre|\b\d{1,2}\s+octobre|\b\d{1,2}\s+novembre|\b\d{1,2}\s+décembre', # Format DD mois 

1731 r'\b\d{1,2}\s+mois\s+\d{4}', # Format DD mois YYYY avec 'mois' comme séparateur 

1732] 

1733 

1734# Fonction qui cherche et affiche toutes les dates trouvées dans le texte 

1735 

1736 dates = [] 

1737 for motif in motifs_dates: 

1738 correspondances = re.findall(motif, input_text) 

1739 dates.extend(correspondances) 

1740 

1741 return dates 

1742 

1743def parse_date_test_before_own_datou_step(list_page_content, verbose = True): 

1744 map_res_page_date = {} 

1745 for sdp in list_page_content: 

1746# if verbose: 

1747# print(" sdp : " + str(sdp)) 

1748 dates = aux_parse_date_in_text(sdp.content) 

1749 if verbose: 

1750 print(" sdp.page_number : " + str(sdp.page_number)) 

1751 print(" sdp.file : " + str(sdp.source_image)) 

1752 print(" dates : " + str(dates)) 

1753 

1754 filename = os.path.basename(sdp.source_image) 

1755 map_res_page_date[filename] = dates 

1756 

1757 return map_res_page_date 

1758 

1759 

1760# refacto for smart split 

1761def create_transcript_group_of_pages(list_of_list_of_pages, 

1762 map_text = {}, 

1763 list_of_sub_doc_page_with_content = None, 

1764 begin_page = True, 

1765 end_page = True, 

1766 verbose = False): 

1767 print(" begin_page : " + str(begin_page) + " end_page : " + str(end_page)) 

1768 complete_texts = [] 

1769 begin_page_txt = "" 

1770 end_page_txt = "" 

1771 for list_of_pages in list_of_list_of_pages: 

1772 complete_text = "" 

1773 for page in list_of_pages: 

1774 text = map_text[page] 

1775 # list_of_sub_doc_page_with_content[page].content 

1776 if begin_page: 

1777 print(" begin_page is true : ") 

1778 begin_page_txt = "\n------\nBegin Page " + str(page) + "\n------\n" 

1779 if verbose: 

1780 print(" begin_page_text was set ") 

1781 print(" begin_page_txt : " + str(begin_page_txt) + " end_page_txt : " + str(end_page_txt)) 

1782 print("\n------\nBegin Page " + str(page) + "\n------\n") 

1783 print(str(page)) 

1784 else: 

1785 print(" begin_page is false : ") 

1786 if end_page: 

1787 end_page_txt = "\n------\nEnd Page " + str(page) + "\n------\n" 

1788 print(" begin_page_txt : " + str(begin_page_txt[:50]).replace("\n", "§§") + " end_page_txt : " + str(end_page_txt[:50]).replace("\n", "§§")) 

1789 complete_text += begin_page_txt + text + end_page_txt 

1790 complete_texts.append(complete_text) 

1791 

1792 return complete_texts 

1793 

1794 

1795 

1796# pages above nb_page will be ignored 

1797def build_list_of_list_from_split(end_page_as_csv, nb_page): 

1798 if end_page_as_csv == "": 

1799 return [list(range(1, nb_page))] 

1800 

1801 end_page_as_list = list(map(int, end_page_as_csv.split(","))) 

1802 

1803 if nb_page not in end_page_as_list: 

1804 end_page_as_list.append(nb_page) 

1805 

1806 end_page_as_list_ordered = sorted(end_page_as_list) 

1807 if 0 in end_page_as_list_ordered: 

1808 end_page_as_list_ordered.remove(0) 

1809 

1810 if len(end_page_as_list_ordered) == 0: 

1811 print("Internal error in build_list_of_list_from_split ! ") 

1812 # would be done by the loop since we had nb_page ! 

1813 return [list(range(1, nb_page))] 

1814 

1815 id_page = 1 

1816 id_end_input = 0 

1817 id_page_end = end_page_as_list_ordered[id_end_input] 

1818 if id_page > id_page_end: 

1819 print("Internal error in build_list_of_list_from_split on id_page ! : " + str(id_page)) 

1820 # would be done by last condition, ehh 

1821 return [list(range(1, nb_page))] 

1822 

1823 

1824 list_of_list_of_page = [] 

1825 current_list = [id_page] 

1826 while id_page <= nb_page and id_end_input < len(end_page_as_list_ordered): 

1827 id_page_end = end_page_as_list_ordered[id_end_input] 

1828 if id_page == id_page_end: 

1829 if len(current_list) > 0: 

1830 list_of_list_of_page.append(current_list) 

1831 current_list = [] 

1832 id_end_input = id_end_input + 1 

1833 

1834 id_page += 1 

1835 if id_page <= nb_page: 

1836 current_list.append(id_page) 

1837 # VR TODO 26-4-25 : not clear why we need to do this ! 

1838 if len(list_of_list_of_page) == 0: 

1839 list_of_list_of_page.append(current_list) 

1840 

1841 return list_of_list_of_page 

1842 

1843def director_cut(id_file, 

1844 df_by_doc, 

1845 paragraphs): 

1846 import graphviz 

1847 

1848 dot = graphviz.Digraph(id_file + "_treatment", comment='Traitement du dossier d\'expertise ' + id_file) 

1849 

1850 all_pages = dot.subgraph(name='All Pages') #, label='all_pages') 

1851 all_text_cr = dot.subgraph(name='All Text CR') #, label='all_text_cr') 

1852 all_docs = dot.subgraph(name='All Doc') #, label='all_docs') 

1853 

1854 dot.node('pdf', 'Dossier anonymisé') 

1855 dot.node('poubelle', 'Texte caviardé') 

1856 dot.node('docx', 'Données extraites pour traitement') 

1857 

1858 id_doc = 0 

1859 for list_page in df_by_doc["Liste des pages"]: 

1860 list_of_page = list(map(int, list_page.split(","))) if list_page != "" else [] 

1861 medecin_nom = "medecin_nom" 

1862 document_type = "document_type" 

1863 try: 

1864 document_type = df_by_doc["document_type"][id_doc] 

1865 medecin_nom = df_by_doc["medecin_nom"][id_doc] 

1866 except Exception as e: 

1867 print(str(e)) 

1868 all_docs.node("doc_" + str(id_doc), 'Document ' + str(id_doc) + " : " + document_type + " : " + medecin_nom) 

1869 for page in list_of_page: 

1870 all_pages.node("page_" + str(page), 'Page ' + str(page)) 

1871 dot.edge('pdf', "page_" + str(page), constraint='false') 

1872 for p in paragraphs[page - 1]: 

1873 all_text_cr.node("par_" + str(page) + "_" + str(p['id']), p["text"]) 

1874 dot.edge("page_" + str(page), "par_" + str(page) + "_" + str(p['id']), constraint='false') 

1875 dot.edge("par_" + str(page) + "_" + str(p['id']), "doc_" + str(id_doc), constraint='false') 

1876# dot.edge("page_" + str(page), "poubelle", constraint='false') 

1877 # dot.edge("page_" + str(page), "doc_" + str(id_doc), constraint='false') 

1878 # dot.edge("page_" + str(page), "doc_" + str(id_doc), constraint='false') 

1879 dot.edge("doc_" + str(id_doc), 'docx') 

1880 

1881 id_doc = id_doc + 1 

1882 

1883 dot.attr('node', shape='oval', fontname='Helvetica') 

1884 dot.attr('edge', fontsize='12') 

1885 dot.attr('graph', splines='true', overlap='false') 

1886 

1887 # graph [splines=true overlap=false]; 

1888 

1889 # neato, fdp (needs overlap=prism ?) , sfdp 

1890 dot.attr(layout='fdp') 

1891 dot.attr(overlap='prism') 

1892 

1893# dot.source 

1894 dot.render(directory='doctest-output').replace('\\', '/') 

1895 

1896 return 0 

1897 

1898# util parser usage devops coverage pytest et local_storage pour graph pourcentage coverage 

1899def parse_coverage_from_python(file_contents): 

1900 import BeautifulSoup 

1901 # Parsez le contenu HTML avec BeautifulSoup 

1902 soup = BeautifulSoup(file_contents, 'html.parser') 

1903 

1904 span_value = "-1" 

1905 try: 

1906 # Recherchez l'élément span avec la classe 'pc_cov' et extrayez son texte 

1907 span_value = soup.find('span', {'class': 'pc_cov'}).text 

1908 

1909 # Affichez la valeur 

1910 print(span_value) 

1911 except Exception as e: 

1912 print(str(e)) 

1913 span_value = "1666%" 

1914 

1915 return span_value 

1916 

1917 

1918 

1919def humanize_size_file(value_in_byte): 

1920 import math 

1921 if value_in_byte == None or math.isnan(value_in_byte): 

1922 return "n/c" 

1923 if value_in_byte < 1024: 

1924 return str(value_in_byte) + " B" 

1925 elif value_in_byte < 1048576: 

1926 return str(int(value_in_byte / 1024)) + " KB" 

1927 elif value_in_byte < 1048576 * 1024: 

1928 round_size = value_in_byte / 1048576 

1929 if round_size < 100: 

1930 return str(int(10 * round_size) / 10) + " MB" 

1931 else : 

1932 return str(int(round_size)) + " MB" 

1933 elif value_in_byte < 1048576 * 1048576: 

1934 return str(int(value_in_byte / (1024 * 1048576))) + " GB" 

1935 elif value_in_byte < 1048576 * 1048576 * 1024: 

1936 return str(int(value_in_byte / (1048576 * 1048576))) + "TB" 

1937 else : 

1938 return "TOO BIG, WILL FAIL !" 

1939 

1940from io import BytesIO 

1941import qrcode 

1942from base64 import b64encode 

1943 

1944 

1945def get_b64encoded_qr_image(data): 

1946 print(data) 

1947 qr = qrcode.QRCode(version=1, box_size=10, border=5) 

1948 qr.add_data(data) 

1949 qr.make(fit=True) 

1950 img = qr.make_image(fill_color='black', back_color='white') 

1951 buffered = BytesIO() 

1952 img.save(buffered) 

1953 return b64encode(buffered.getvalue()).decode("utf-8") 

1954 

1955def from_list_page_per_doc_ccsv_to_list_of_list_of_page(list_page_per_doc): 

1956 """ 

1957 Convert a semicolon-separated string of page numbers into a list of lists. 

1958 Each sublist contains the page numbers for a specific document. 

1959 """ 

1960 list_of_list_of_page = [] 

1961 nb_page = 0 

1962 max_page = 0 

1963 try: 

1964 if list_page_per_doc == "": 

1965 return list_of_list_of_page, nb_page, max_page 

1966 

1967 for doc_pages in list_page_per_doc.split(";"): 

1968 if doc_pages.strip() != "": 

1969 list_of_page = list(map(int, doc_pages.split(","))) 

1970 max_page = max(max_page, max(list_of_page)) 

1971 nb_page += len(list_of_page) 

1972 list_of_list_of_page.append(list_of_page) 

1973 except Exception as e: 

1974 print("Error in from_list_page_per_doc_ccsv_to_list_of_list_of_page : " + str(e)) 

1975 return list_of_list_of_page, nb_page, max_page 

1976 if nb_page < max_page: 

1977 print("ERROR treated as Warning : nb_page < max_page, this is not expected, check your input list_page_per_doc : " + str(list_page_per_doc)) 

1978 return list_of_list_of_page, nb_page, max_page 

1979 

1980# peut-etre deprecated, mais faudrait utiliser from_list_page_per_doc_ccsv_to_list_of_list_of_page 

1981def managing_deprecated_input_text_concat_into_list(text, list_page_per_doc): 

1982 list_of_list_of_page, nb_page, max_page = from_list_page_per_doc_ccsv_to_list_of_list_of_page(list_page_per_doc) 

1983 if nb_page < max_page: 

1984 print("ERROR treated as Warning : nb_page < max_page, this is not expected, check your input list_page_per_doc : " + str(list_page_per_doc)) 

1985 list_texts_per_doc = [] 

1986# id_page = 1 

1987 list_text_per_page = text.split("""------ 

1988Begin Page""") 

1989 list_text_per_page = list_text_per_page[1:] # On enlève le premier élément qui est vide 

1990 for i in range(len(list_text_per_page)): 

1991 list_text_per_page[i] = '''------ 

1992Begin Page''' + list_text_per_page[i] 

1993 if len(list_text_per_page) != nb_page: 

1994 print("ERROR treated as Warning : len(list_text_per_page) != nb_page, this is not expected, check your input text : " + str(text)) 

1995 try: 

1996 for list_of_page in list_of_list_of_page: 

1997 one_doc_content_concat = "" 

1998 one_doc_content_concat = "\n".join([list_text_per_page[page - 1] for page in list_of_page]) 

1999 list_texts_per_doc.append(one_doc_content_concat) 

2000 except Exception as e: 

2001 print("Error in from_list_page_per_doc_ccsv_to_list_of_list_of_page : " + str(e)) 

2002 return [] 

2003 return list_texts_per_doc 

2004 

2005