Coverage for lib/lib_util.py: 45%
1285 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-18 02:40 +0100
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-18 02:40 +0100
1import datetime
2import os.path
3import time
4import uuid
6def parse_key_and_size(sub_key = "", data = {}, min_size_iterate = 1000000, min_size_display = 10000):
7 if type(data) == list or type(data) == str:
8 return
9 for k in data:
10 try:
11 size = len(str(data[k]))
12 except Exception as e:
13 print(str(e))
14 if size > min_size_display:
15 print(" k : " + str(sub_key + "." + str(k)) + " size : " + str(size))
16 if size > min_size_iterate:
17 parse_key_and_size(sub_key + "." + str(k), data[k], min_size_iterate = min_size_iterate, min_size_display = min_size_display)
20def filter_key_deep(sub_key = "", data = {}, filter = "input.list_audit_map_reduce"):
21 import sys
22 sys.stdout.write(">ç")
23 if type(data) == list:
24 return data
26 if sub_key.endswith(".text.text.text"):
27 print(" Here comes the circularity ! gasps, How come does it happen sometime and sometime not !")
28 print(" sub_key : " + str(sub_key))
29 print("CORRECTING !")
30 return {}
32 elif type(data) == dict:
33 new_data = {}
34 for k in data:
35 sub_sub_key = sub_key + "." + str(k)
36 if sub_sub_key.endswith(filter):
37 continue
38 else :
39 new_data[k] = filter_key_deep(sub_sub_key, data[k])
40 return new_data
41 else:
42 return data
44def get_unique_id():
45 return uuid.uuid4()
46# return ':'.join(['{:02x}'.format((uuid.getnode() >> i) & 0xff) for i in range(0,8*6,8)][::-1])
48def count_and_display_elapsed_time(begin_time, message = "", verbose = False, min_time = 1) -> (float, str):
49 end_time = time.time()
50 elapsed = end_time - begin_time
51 message_with_time = str(message) + " elapsed time : " + str(elapsed)
52 if verbose or elapsed > min_time:
53 print (message_with_time)
54 return time.time(), message_with_time
56def replace_non_alpha_with_underscore(s):
57 import re
58 return re.sub(r'[^a-zA-Z0-9]', '_', s)
60def select_datou_step_from_extension_type_upload(file_name, file_extension, file_content_type):
61 is_managed = False
62 actions = ""
63 extension = ""
65 the_zips = [".zip", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz"]
67 map_datou_step_extension = {"image_to_text,request_gpt,send_mail" : [".jpeg", ".jpg", ".png"],
68 "speech_to_text,request_gpt,send_mail" : [".mp3", ".ogg", ".amr", ".m4a", ".wav"],
69 "prepare_json_for_safia,import_safia_from_json" : [".pdf,.txt"],
70 "import_safia_from_json" : [".json"]}
72 map_datou_step_extension = {"jpg" : [".jpeg", ".jpg", ".png"],
73 "amr" : [".mp3", ".ogg", ".amr", ".mp4", ".m4a", ".wav", ".webm"],
74 "pdf" : [".pages", ".pptx", ".pdf", ".txt", ".md", ".py", ".docx", ".zip", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz"],
75 "json" : [".json"]}
77 from lib.datou.datou_exec import list_datous
79 # TOOD mp4 if we want
80 list_file_extension_managed = [".mp3", ".ogg", ".amr", ".mp4", ".webm", ".m4a", ".wav", ".jpeg", ".jpg", ".png", ".pages", ".pptx", ".pdf", ".txt", ".md", ".docx", ".json", ".zip", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz"]
82 list_content_type_managed = ['image/jpeg', 'audio/webm;codecs=opus']
83 # <FileStorage: 'blob' ('audio/webm;codecs=opus')>
85 map_list_content_to_extension = {'image/jpeg' : [".jpeg", ".jpg"],
86 'audio/webm;codecs=opus':['.ogg'],
87 'audio/amr':['.amr'],
88 'audio/mp4':[".webm"]} # and not mp4
90 if file_extension == "":
91 if file_content_type in map_list_content_to_extension.keys():
92 print(" TODO change the extension of the file : " + file_name + " from " + file_extension + " to " + str(map_list_content_to_extension[file_content_type.lower()]) + " one of them " )
94 extension = map_list_content_to_extension[file_content_type.lower()][0]
95 file_extension = extension
96 is_managed = True
98 datou_as_key = ""
99 for datou_as_key in map_datou_step_extension:
100 if file_extension.lower() in map_datou_step_extension[datou_as_key]:
101 actions = list_datous[datou_as_key].copy() # TODO VR : il faut peut-etre faire mieux pour ne pas éditer/modifier la liste des actions
102 is_managed = True
104 if datou_as_key == "" and is_managed:
105 print(" We have different problems, due to new extension of type manageable ! ")
107 # TODO VR 10-6-23 : inconsistance dans certains cas ou l'extension n'est pas listé dans le filetype attendu, donc sans doute à rajouter
109 if is_managed == False:
110 print("TODO : check if we can do something with these file or not !")
112 return is_managed, actions, extension
116def humanize_modified_time(modified_at):
117 import humanize
118 from humanize import i18n
119 import datetime
120 from datetime import timezone
121 today = datetime.datetime.now()
122 todaynn = datetime.datetime.now(timezone.utc)
124 # Change la langue en français
125 i18n.activate('fr_FR')
127 from lib.lib_github import util_is_naive
128 # Suppose que proj.modified_at est un objet datetime
129 if util_is_naive(modified_at):
130 time_difference = todaynn - modified_at
131 else:
132 time_difference = today - modified_at
133 human_diff = humanize.naturaltime(time_difference)
134 return human_diff
136def import_all_for_coverage_with_zero_percent():
138 # For complete coverage
141 #import test.conftest => 18 lignes
142 #import lib.stockage.lib_abstract_stockage => 0 ligne
143 #import gunicorn.conf #server. => 10 lignes
145 #import test.func.first_test_func => 34 lignes
147 # A REMETTRE
148 import auth.lib_privacy
149 import auth.lib_stat_usage
150 import auth.lib_privacy
151 import auth.lib_stat_usage
152 import auth.lib_cost
153 import auth.lib_auth
154 import auth.lib_user_conf
155 import lib.lib_www.lib_routes
156 import lib.lib_www.lib_html
157 import lib.stockage.lib_pg_dataset_pg
158 #import prompt
159 import lib.stockage.lib_pyfvs
160 import lib.import_util.lib_path_to_vec
161 import lib.import_util.lib_import_retrieval.models.models
162 import lib.import_util.lib_import_retrieval.scripts.process_json.process_json
163 import lib.lib_github
164 import lib.lib_mail
165 # import lib.lib_ocr => due to cv2
166 import lib.lib_prompt_issue
168# now inserted
169# import lib.lib_graph
170# import lib.datou.lib_datou_step_template
172 print("Imported !")
174def compute_token(input):
176 import tiktoken
177 tokenizer = tiktoken.get_encoding("cl100k_base") # The encoding scheme to use for tokenization
178 tokens = tokenizer.encode(input, disallowed_special=())
180 nb_tokens = len(tokens)
181 return nb_tokens
184def check_and_truncate_query_max_token(input, number_token_max = 100000, #7000, #8096, # 8096
185 verbose = False,
186 strategy_condition = "prop_char_token_marg10"):
188 len_char = len(input)
189 nb_tokens = compute_token(input)
191 under_limit = True
192 if nb_tokens < number_token_max:
193 return under_limit, ""
194 else : # pragma no cover scale
195 under_limit = False
196 input_truncated = ""
197 if strategy_condition == "prop_char_token_marg10":
198 new_size = int(float(number_token_max) / float(nb_tokens) * len_char)
199 input_truncated = input[:new_size]
200 else :
201 print("This strategy " + strategy_condition + " is not managed, no truncated query proposed !")
203 return under_limit, input_truncated
205# Voici comment on pourrait procéder pour créer ces deux fonctions. Ici, j'utilise le module struct po
207import struct
209# Valeurs sous forme de float
210# - [ ] TODO VR refacto name
211def write_as_float(list_data_map, filename_prefix, offset_media_id = 0,
212 offset_data_file_id = 0,
213 photo_desc_type=1777,
214 type_data = "float",
215 dim_input = 0):
216 num_data = 0
217 file = open(f'{filename_prefix}{offset_data_file_id}.dat', 'wb')
218 file_index_desc = open(f'{filename_prefix}photo_list.index', 'a')
219 display_dimenstion_desc = True
220 number_dimension = 0
221 for map_data in list_data_map:
222 descchaine = map_data["embedding"]
224 type_list_desc = str(type(descchaine))
226 import sys
227 if type_list_desc == "<class 'str'>":
228 sys.stdout.write("s")
229 desc = list(map(float, descchaine.lstrip("[").rstrip("]").split(",")))
230 else :
231 sys.stdout.write("n")
232# print(" emb : " + str(emb))
233 desc = list(map(lambda x: x.astype('double'), descchaine))
234 if display_dimenstion_desc:
235 number_dimension = len(desc)
236 if number_dimension == 0:
237 print("Internal Error")
238 print(f" Dimension desc : {number_dimension}")
239 display_dimenstion_desc = False
240 offset_media_id += 1
241 if dim_input > 0 and dim_input < number_dimension:
242 desc = desc[:dim_input]
243 number_dimension = dim_input
244 for value in desc:
245 if type_data == "float":
246 file.write(struct.pack('f', float(value)))
247 elif type_data == "tinyint":
248 file.write(struct.pack('B', min(255, max(0, int(value)))))
249 else :
250 print("Type " + str(type_data) + " not supported ! nothin written !")
251 num_data += 1
252 # num_data * number_dimension > 10000
253 if num_data == 1000:
254 file.close()
255 offset_data_file_id += 1
256 num_data = 0
257 file = open(f'{filename_prefix}{offset_data_file_id}.dat', 'wb')
258 file_index_desc.write(f'{offset_media_id},{offset_data_file_id},{photo_desc_type}\n')
259 file.close()
260 file_index_desc.close()
261 return number_dimension, offset_media_id, offset_data_file_id
263# Valeurs sous forme de tiny int
264# - [ ] TODO VR to delete
265def write_as_tiny_int(data_2d, filename_prefix):
266 file_index = 0
267 num_lines = 0
268 file = open(f'{filename_prefix}{file_index}.dat', 'wb')
269 for sublist in data_2d:
270 for value in sublist:
271 file.write(struct.pack('B', min(255, max(0, int(value)))))
272 num_lines += 1
273 if num_lines == 10000:
274 file.close()
275 file_index += 1
276 num_lines = 0
277 file = open(f'{filename_prefix}{file_index}.dat', 'wb')
278 file.close()
283def subprocessCommand(command, timeout = 10, verbose = False):
284 import subprocess
285 """ permet de faire appel a des commandes shell """
286 proc = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
287 try:
288 outs, errs = proc.communicate(timeout=timeout)
289 if verbose :
290 print(str(command) + " Error : " + str(errs))
291 print(" Output : " + str(outs))
292 except subprocess.TimeoutExpired:
293 proc.kill()
294 outs, errs = proc.communicate()
295 if verbose :
296 print(str(outs) + " : " + str(errs))
297 return outs
300# - [ ] TODO VR move in lib.lib_suiviprod.lib_manage_log ?
302def display_real_dict_row_shorten(result_pg_query):
303 from copy import deepcopy
304 result_pg_query_to_display = deepcopy(result_pg_query)
305 for r in result_pg_query_to_display:
306 if "embedding" in r:
307 r["embedding"] = r["embedding"][:50]
308# del r["embedding"]
310 print(result_pg_query_to_display)
312def build_gif_from_png(list_pngs, out_folder_image, uuid = None):
313 from uuid import uuid4
314 import os
315 if uuid == None:
316 uuid = uuid4()
317 movie = os.path.join(out_folder_image, "one_gif_" + str(uuid) + ".gif")
318 movie2 = movie + "_2.gif"
320 width = int(720 / 4)
321 height = 120
323 width = int(7200 / 4)
324 height = 1200
326 import imageio
327 from PIL import Image
328 frames = []
329 with imageio.get_writer(movie2, mode='I', duration=500) as writer:
331 for filename in list_pngs:
332 if "jpg" in filename.lower():
333 ext = "jpg"
334 if "png" in filename.lower():
335 ext = "png"
336 if not "jpg" in filename.lower() and not "png" in filename.lower():
337 continue
338 # for filename in filenames:
339 img = Image.open(filename)
340 filename_small = filename + ".small." + ext
342 # image = imageio.imread(filename)
343 # new_image = image.copy()
344 # new_image = np.resize(image, (height, width, 3), Image.ANTIALIAS)
346 new_image = img.resize((width, height)) # , Resampling.LANCZOS) # Image.ANTIALIAS)
347 new_image.save(filename_small)
349 # Image.ANTIALIAS
350 # cv2.imwrite(os.path.join(folder_small, f), new_image)
352 image = imageio.imread(filename_small)
353 frames.append(imageio.imread(filename_small))
355 # new_image = imageio.new(new_image)
356 # image.resize(width, height)
357 import sys
358 sys.stdout.write("+")
359 writer.append_data(image)
361# imageio.imsave(movie, frames, format='GIF', fps=2)
362# kargs = {'duration': 5}
363# imageio.mimsave(movie, frames, 'GIF', **kargs)
365 import numpy as np
366# from matplotlib.animation import FuncAnimation
367# from IPython import display
368 import matplotlib.pyplot as plt
369# anim_created = FuncAnimation(Figure, AnimationFunction, frames=100, interval=5)
371# video = anim_created.to_html5_video()
372# html = display.HTML(video)
373# display.display(html)
375 # good practice to close the plt object.
376# plt.close()
378 return movie2
382# Fonction dupliquer depuis lib_tent_pyfvs pour en garder une version dans safia
383def is_port_available(port):
384 res = ""
385 try :
386 host = "localhost"
387 file_res_fvs = "temp/temp_res_nc.log"
388 import os
389 if not os.path.exists('temp'):
390 os.makedirs('temp')
391 list_cmds = ["/usr/bin/nc", "-zv", host, str(port), ">", file_res_fvs]
392 cmd_launch = " ".join(list_cmds)
393 print(cmd_launch)
394 cmd_launch_and_write = cmd_launch
395 import os
396 os.system(cmd_launch_and_write)
397 with open(file_res_fvs, "r") as f:
398 res = f.read()
399 # ret_sp = subprocess.run(list_cmds, capture_output=True, text=True)
400 # res = ret_sp.stdout.strip("\n")
402 print(res)
403 # nc: connectx to 127.0.0.1 port 45 (tcp) failed: Connection refused
405 os.rmdir(file_res_fvs)
406 except Exception as e:
407 print(str(e))
408 return True
410 if "Connection refused" in res :
411 return True
412 else :
413 return False
415# Fonction dupliquer depuis lib_tent_pyfvs pour en garder une version dans safia
416def get_random_port():
417 import random
418 port = random.randint(5000, 6000)
419 while not is_port_available(port):
420 port = random.randint(5000, 6000)
421 return port
424def display_confusion_matrix(df):
425 import numpy as np
426 columns_array = df.columns.astype(str).values
427 header = columns_array.reshape(1, -1)
429 # Conversion des index en array et ajout comme première colonne
430 index_array = df.index.astype(str).values
431 index_as_col = index_array.reshape(-1, 1)
433 # Concaténation des noms des lignes avec les données du DataFrame
434 data_with_index = np.concatenate((index_as_col, df.values.astype(str)), axis=1)
436 # Concaténation des noms des colonnes
437 # modif foireuse de VR sur le code foireux de GPT et voila, merci le try except
438 full_array = np.concatenate((np.concatenate([""], header), data_with_index), axis=0)
439 return full_array
444def from_pdf_to_list_pngs(in_files, tempfolder = "temp", dpi = 72,
445 hash_id_treatment = None,
446 only_count = False):
447 if len(in_files) == 0:
448 return []
450 list_of_list_of_pages = []
452 import fitz, os # PyMuPDF # TODO add in list install
454 from uuid import uuid4
455 # VR 6-5-24 : in order to have the correct link when uploading from interface
456 tempfolder = os.path.join(os.path.dirname(in_files[0]), hash_id_treatment) if hash_id_treatment != None else os.path.join(tempfolder, str(uuid4()))
457 if not os.path.exists(tempfolder):
458 os.makedirs(tempfolder)
460# from pypdf import PdfReader
461# >> > reader = PdfReader('example.pdf')
462# >> > box = reader.pages[0].mediabox
464 list_images = []
465 count_per_batch = []
466 cum_page_number = 0
467 id_page_number_to_list = 1
468 for in_file in in_files:
469 # Ouvrir le fichier PDF
470 pdf_document = fitz.open(in_file)
472 xref = pdf_document.page_xref(0) # xref of page 0
473 # pprint(doc.xref_get_keys(xref)) # primary level keys of a page
474 print(pdf_document.xref_get_keys(xref))
475 # ('Type', 'Contents', 'Resources', 'MediaBox', 'Parent')
476 # pprint(doc.xref_get_keys(-1)) # primary level keys of the trailer
477 # ('Type', 'Index', 'Size', 'W', 'Root', 'Info', 'ID', 'Length', 'Filter')
478 print(pdf_document.xref_get_keys(-1))
479 nb_page_this_batch = len(pdf_document)
481 list_of_pages = []
482 # Parcourir chaque page
483 for page_number in range(len(pdf_document)):
484 # Obtenir la page
485 page = pdf_document[page_number]
487 # if portrait
488 size_expected_by_fitz = page.mediabox.height / 72
489 size_expected_by_fitz = page.mediabox.width / 72
490 print(" size_expected_by_fitz : " + str(size_expected_by_fitz))
491 # Since it should be 21
492 apply_factor_dpi_correct_size = dpi / 72 * 21 / size_expected_by_fitz
493 used_dpi = int(apply_factor_dpi_correct_size * 72)
494 # used_dpi = dpi
496 # Rendre la page en tant qu'image
497 pix = page.get_pixmap(dpi=used_dpi)
498 # Définir le chemin de sortie de l'image PNG
499 output_image_path = f"{tempfolder}/page_{cum_page_number + page_number + 1}.png"
500 # Sauvegarder l'image en tant que PNG
501 if not only_count:
502 pix.save(output_image_path)
503 # import cv2
504 # im = cv2.imread(output_image_path)
505 # output_image_path_png = f"{tempfolder}/page_png_{page_number + 1}.png"
506 # cv2.imwrite(output_image_path, im)
507 list_images.append(output_image_path)
508 list_of_pages.append(id_page_number_to_list)
509 id_page_number_to_list = id_page_number_to_list + 1
511 list_of_list_of_pages.append(list_of_pages)
512 count_per_batch.append(nb_page_this_batch)
513 cum_page_number += nb_page_this_batch
514 # Fermer le document PDF
515 pdf_document.close()
517 return list_images, count_per_batch, list_of_list_of_pages
519def split_text(text, max_length=10000, overlap = 1000):
520 """Split text into chunks of maximum length."""
521 end = max_length
522 list_texts = [text[:end]]
523 while end < len(text):
524 list_texts.append(text[end:end + max_length])
525 end = end + max_length - overlap
527 return list_texts
529def parse_list_page_as_begin_end_separated(l):
530 try:
531 print(" parse_list_page_as_begin_end_separated : " + str(l))
532 # faire une expression régulière pour détecter un entier suivi de caractère quelconque suivi d'un entier
533 begin_page = ""
534 end_page = ""
535 # on va faire une boucle pour chercher le premier entier puis les séparateur puis le deuxième entier, il est nécessaire d'utiliser des variables pour définir les trois etats de l'analyse, pendant la lecture du premier entier, du séparateur puis deu deuxième
536 state = 0
537 for c in l:
538 is_digit_je_fais_pas_confiance = c in "0123456789"
539 if state == 0 and is_digit_je_fais_pas_confiance:
540 begin_page += c
541 elif state == 0 and not is_digit_je_fais_pas_confiance:
542 state = 1
543 elif state == 1 and not is_digit_je_fais_pas_confiance:
544 pass
545 elif state == 1 and is_digit_je_fais_pas_confiance:
546 state = 2
547 end_page += c
548 elif state == 2 and is_digit_je_fais_pas_confiance:
549 end_page += c
550 else :
551 print("Unexpected behavior while parsing " + str(l))
553 if begin_page == "" or end_page == "" or state != 2:
554 print("Unexpected behavior while parsing " + str(l))
556 begin = int(begin_page)
557 end = int(end_page)
558 if begin > end:
559 print("Unexpected begin > end : " + str(begin) + " > " + str(end))
560 return []
561 return list(range(begin, end + 1))
562 except Exception as e:
563 print("In parsing begin end : " + str(e))
564 return []
568def read_list_one_doc_csv_with_sometime_tiret(list_page):
569 list_page_one_document = []
570 for l in list_page:
571 try :
572 list_page_one_document.append(int(l))
573 except Exception as e:
574 sub_list_page = parse_list_page_as_begin_end_separated(l)
575 list_page_one_document.extend(sub_list_page)
576 return list_page_one_document
579# [x] TODO 27-12-23 : rename the parameters since page is also a list and
580# - [ ] TODO flute fonction à dédupliquer
581def split_text_by_doc(list_page_content, list_page_per_doc):
582 list_documents = list_page_per_doc.split(";")
583 list_texts = []
584 for l in list_documents:
585 if l == ";":
586 print("WARNING TREATED THAT COULD NEED AUDIT")
587 continue
588 try:
589 list_page_one_document = read_list_one_doc_csv_with_sometime_tiret(l.split(","))
590 except Exception as e:
591 print("ERROR treated as warning, trying to parse list_page_per_doc as begin and end separated : " + str(e))
592 list_page_one_document = parse_list_page_as_begin_end_separated(l)
593 content = ""
594 for ll in list_page_one_document:
595 content += list_page_content[ll - 1]
596 list_texts.append(content)
597 return list_texts
599# VR 16-6-25 : cela me semble buggue et inutilisé
600def split_list_page_by_page(list_page_content_text):
601 list_list_page_content = []
602 for l in list_page_content_text:
603 list_list_page_content.append([l])
604 return list_list_page_content
606def split_list_page_by_doc(list_page_content_text, list_page_per_doc):
607 list_documents = list_page_per_doc.split(";")
608 list_list_page_content = []
609 for l in list_documents:
610 if l == ";":
611 print("WARNING TREATED THAT COULD NEED AUDIT")
612 continue
613 try:
614 list_page_one_document = list(map(int, l.split(",")))
615 list_page_one_document = read_list_one_doc_csv_with_sometime_tiret(l.split(","))
616 except Exception as e:
617 print("ERROR treated as warning, trying to parse list_page_per_doc as begin and end separated : " + str(e))
618 list_page_one_document = parse_list_page_as_begin_end_separated(l)
619 aux_list_page_content = []
620 for ll in list_page_one_document:
621 aux_list_page_content.append(list_page_content_text[ll - 1])
622 list_list_page_content.append(aux_list_page_content)
623 return list_list_page_content
625def list_file_anon(folder):
626 import os
627 list_file = os.listdir(folder)
628 list_file = list(map(lambda x: os.path.join(folder, x), list_file))
629 pdf_files = []
630 pdf_anon_files = []
631 content_pdf_files = []
632 anon_json_files = []
633 for f in list_file:
634 if f.lower().endswith(".pdf") and not f.lower().endswith("_anon.pdf"):
635 pdf_files.append(f.lower())
636 if f.lower().endswith("_anon.pdf"):
637 pdf_anon_files.append(f.lower())
638 if f.lower().endswith("_content.txt"):
639 content_pdf_files.append(f.lower())
640 if f.lower().endswith("_anon.json"):
641 anon_json_files.append(f.lower())
643 map_pdf_file_anon_pdf_and_anon_strat = {}
644 for pdf in pdf_files:
645 map_pdf_file_anon_pdf_and_anon_strat[pdf] = {}
646 pdf_anon = pdf + "_anon.pdf"
647 if pdf_anon in pdf_anon_files:
648 map_pdf_file_anon_pdf_and_anon_strat[pdf]["pdf_anon"] = pdf_anon
649 json_anon = pdf + "_anon.json"
650 if json_anon in anon_json_files:
651 map_pdf_file_anon_pdf_and_anon_strat[pdf]["json_anon"] = json_anon
652 with open(json_anon, "r") as f:
653 map_pdf_file_anon_pdf_and_anon_strat[pdf]["json_anon_content"] = f.read()
654 content_pdf = pdf + "_content.txt"
655 if content_pdf in content_pdf_files:
656 map_pdf_file_anon_pdf_and_anon_strat[pdf]["content_pdf_file"] = content_pdf
657 with open(content_pdf, "r") as f:
658 map_pdf_file_anon_pdf_and_anon_strat[pdf]["content_pdf"] = len(f.read())
662 return map_pdf_file_anon_pdf_and_anon_strat
664#from pydantic import BaseModel, Field
666from typing import Optional
667class SubDocPage() : #BaseModel):
668 page_number : Optional[int] = 0 #Field(0)
669 content : Optional[str] = "" #Field("")
670 source_image : Optional[str] = None #Field(None)
671 list_boxes : Optional[list] = [] #Field([])
672 maxx : Optional[int] = None
673 maxy : Optional[int] = None
674 list_blocks : Optional[dict] = {} #Field()
676 def __init__(self, page_number : Optional[int] = 0,
677 content : Optional[str] = "",
678 source_image : Optional[str] = None,
679 list_boxes : Optional[list] = [],
680 maxx : Optional[int] = None,
681 maxy : Optional[int] = None,
682 list_blocks : Optional[dict] = {}):
683# super()
684 self.page_number = page_number
685 self.content = content
686 self.source_image = source_image
687 self.list_boxes = list_boxes # list of boxes only by token (word) from tesseract
688 self.list_blocks = list_blocks # by line, paragraph or token (word) from gcp_doc_ai
689 self.maxx = maxx
690 self.maxy = maxy
692 def toJSON(self):
693 import json
694 return {"page_number" : self.page_number,
695 "content" : self.content,
696 "source_image" : self.source_image,
697 "paragraphs" : self.list_blocks["paragraphs"] if "paragraphs" in self.list_blocks else [],
698 "tokens" : self.list_blocks["tokens"] if "tokens" in self.list_blocks else []}
699# return json.dumps(self, default=lambda o: o.__dict__,
700# sort_keys=True, indent=4)
702 def toText(self):
703 return self.content
705def parse_date(str_date, current_date = datetime.datetime.now(), settings={'DATE_ORDER': 'DMY'}):
706 import dateparser
707 print(" parse_date : " + str(parse_date))
708 if type(str_date) == list: # str
709 print(" Unexpected list of date str_date : " + str(str_date))
710 str_date = str_date[0]
712 parsed_or_forced = True
714 try:
715 date_parsed = dateparser.parse(str_date, settings=settings)
716 # '2018-10-25'
717 # '12/09/2017'
718 # '12-01-2023'
719 except Exception as e:
720 print(str(e))
721 print("str_date not parsed : " + str_date)
722 import datetime
723 date_parsed = current_date
724 parsed_or_forced = False
725 if date_parsed == None:
726 print("str_date not parsed : " + str_date)
727 date_parsed = current_date
728 parsed_or_forced = False
729 return date_parsed, parsed_or_forced
731def remove_comment_end_of_line_and_print_them(s):
732 import re
733 res = re.sub(r'//.*', '', s)
734 if res != s:
735 print("Comment removed : " + s + " DO BETTER PLEASE !")
736 return res
738def parse_json_from_prompt_result(result, verbose = False, lazy = False):
739 import json, re
740 json_obj = {}
742 pattern = r"```json\s*(\{.*?\})\s*```"
743 #pattern = r"```json\s*(\{[^\{\}]\})\s*```"
744# pattern = r"```json\s*(\[?[\{.*?\}]*\]?)\s*```"
745 if lazy: # TODO on doit vérifier qu'il n'y a pas d'occurence de ``````json dans le string sinon on plante
746 pattern = r"```json\s*(.*)\s*```"
747 else:
748 pattern = r"```json\s*(.*?)\s*```"
749# pattern = r"""(\{(?:(?>[^{}"'\/]+)|(?>"(?:(?>[^\\"]+)|\\.)*")|(?>'(?:(?>[^\\']+)|\\.)*')|(?>\/\/.*\n)|(?>\/\*.*?\*\/)|(?-1))*\})"""
751 # Search the document content for our pattern
752# match = re.search(pattern, result, re.DOTALL)
754 if type(result) == type(None):
755 print("Error treated as warning, we have a None result, we set to '' ")
756 result = ""
757 if type(result) == list and len(result) > 0:
758 print("WARNING we could have extract better information by collecting from the different part of the folder !")
759 result = result[0]
760 if type(result) == list and len(result) == 0:
761 print("ERROR in parse_json_from_prompt_result !")
762 if type(result) == dict:
763 print("WARNING in parse_json_from_prompt_result, we have a dict, we convert it to string !")
764 result = json.dumps(result)
765 result = result.replace("null", "\"\"")
766 result = result.replace("\\", "")
767 result = result.replace("...", "\"\"") # il faudrait protéger dans un deuxième temps
769 match = re.findall(pattern, result, re.DOTALL)
771 all_results = []
772# if match:
774 current_date = datetime.datetime.now() - datetime.timedelta(days=100000)
776 for m in match:
777 # Extract the JSON-like string from the matched content
779 # Convert the JSON-like string into a Python dictionary object (JSON object)
780 internal_match_pattern = r"(\{.*?\})"
781 internal_matches = re.findall(internal_match_pattern, m, re.DOTALL)
782 list_internal_json = [] # We expect one !
783 for mm in internal_matches:
784 try:
785 mm = remove_comment_end_of_line_and_print_them(mm)
786 json_obj = json.loads(mm)
787 if "date" in json_obj:
788 current_date, parsed_or_forced = parse_date(json_obj["date"])
789 json_obj["datet"] = current_date
790 json_obj["date_parsed_or_forced"] = parsed_or_forced
791 if "date_fin_arret_travail" in json_obj:
792 current_date, parsed_or_forced = parse_date(json_obj["date_fin_arret_travail"])
793 json_obj["date_fin_arret_travailt"] = current_date
794 if "date_entree_hospitalisation" in json_obj:
795 current_date, parsed_or_forced = parse_date(json_obj["date_entree_hospitalisation"])
796 json_obj["date_entree_hospitalisationt"] = current_date
797 if "date_sortie_hospitalisation" in json_obj:
798 current_date, parsed_or_forced = parse_date(json_obj["date_sortie_hospitalisation"])
799 json_obj["date_sortie_hospitalisationt"] = current_date
800 if "date_debut_arret_travail" in json_obj:
801 current_date, parsed_or_forced = parse_date(json_obj["date_debut_arret_travail"])
802 json_obj["date_debut_arret_travailt"] = current_date
803 for k in json_obj:
804 if str(json_obj[k]).lower() == "nan":
805 json_obj[k] = 0
806 if verbose:
807 print("Extracted JSON:", json_obj)
808 for k in json_obj:
809 if str(json_obj[k]).lower() == "nan":
810 json_obj[k] = 0
811 list_internal_json.append(json_obj)
812 except json.JSONDecodeError as e:
813 print("ERROR Failed to parse JSON:", e)
814 print("JSON content:", mm)
815 print("WE add an empty json to avoid decalage in the list of results !")
816 list_internal_json.append({})
817 if len(list_internal_json) == 1:
818 all_results.append(list_internal_json[0])
819 elif len(list_internal_json) == 0:
820 print("Internal error parsing json")
821 else :
822 print("ERROR We keep only the first one : Inconsistent split by document to be reported : " + str(list_internal_json))
823 all_results.append(list_internal_json[0])
824# else:
825# print("No JSON content found matching the pattern")
827# if len(all_results) == 1:
828# return all_results[0]
829# else :
830 return all_results
832def append_id_by_order(list_json):
833 for i in range(len(list_json)):
834 list_json[i]["id"] = i
835 return list_json
837def complete_date_and_order_json_to_mettre_en_forme(list_json):
839 if len(list_json) == 0:
840 return None
842 first_with_date = 0
843 while first_with_date < len(list_json) and not "datet" in list_json[first_with_date]:
844 first_with_date += 1
845 if first_with_date == len(list_json):
846 print(" No Dates ! ")
847 return list_json
848 start_date = list_json[0]["datet"]
849 for i in range(first_with_date):
850 # Enlever i - first_with_date jours à la date de début
851 import datetime
852 current_date = start_date - datetime.timedelta(days=first_with_date - i)
853 list_json[i]["datet"] = current_date
855 current_date = start_date
857 for i in range(first_with_date, len(list_json)):
858 if "datet" not in list_json[i]:
859 list_json[i]["datet"] = current_date
860 else :
861 current_date = list_json[i]["datet"]
863 list_json = sorted(list_json, key=lambda x : x["datet"])
865 return list_json
867def reorder_paragraph_by_order_lex_token(one_paragraph, list_tokens):
868 id_paragraph = one_paragraph["id_paragraph"]
870def find_list_tokens_to_keep(list_ids_paragraph_to_keep, list_tokens):
871 list_token_to_keep = []
872 for token in list_tokens:
873 if token["id_paragraph"] in list_ids_paragraph_to_keep:
874 list_token_to_keep.append(token)
876 return list_token_to_keep
880def order_token_and_concat(list_tokens): # ca seems de ne pas changer
881 # Je veux ré-ordonner les token selon leur ordre lexicographique y, x
882 # En fait ca ne va pas car on ne veut pas utiliser de manière stricte la position mais à une ligne près, il semble que les tokens soient ordonner par ordre lexicographique des lignes
883# list_tokens_ordered = sorted(list_tokens, key=lambda x : (x["y"], x["x"]))
885 if list_tokens == []:
886 return ""
888 import math
889 import numpy as np
890 width = max(list(map(lambda x: x["x"] + x["w"], list_tokens)))
891 end_token = list(map(lambda x: x["x"] + x["w"], list_tokens))
892 mean_height = np.mean(list(map(lambda x: x["h"], list_tokens)))
893 mean_width_char = np.mean(list(map(lambda x : float(x["w"])/float(len(x["text"].lstrip("\n"))), list_tokens)))
895 list_tokens_ordered = sorted(list_tokens, key=lambda x: x["y"] * float(width) / float(mean_height) + x["x"])
897 new_text = " ".join(list(map(lambda x : x["text"], list_tokens_ordered)))
898# new_text = " ".join(list(map(lambda x: x["text"], list_tokens)))
900 # Group by lines if difference lower than 50% of mean line
902 type_algo = "line_next_line"
903 if type_algo == "stric":
904 # Algo strict => TODO test sur les rest ou tests après coup pour savoir si le document est penché (correction possible avant)
905 map_line_list_token = {}
906 for token in list_tokens:
907 (rest, mod_line) = math.modf(token["y"] / mean_height)
908 if mod_line not in map_line_list_token:
909 map_line_list_token[mod_line] = []
910 map_line_list_token[mod_line].append(token)
912 for mod_line in map_line_list_token:
913 map_line_list_token[mod_line] = sorted(map_line_list_token[mod_line], key = lambda x : x["x"])
915 new_text = ""
916 for mod_line in map_line_list_token:
917 for token in map_line_list_token[mod_line]:
918 new_text += " " + token["text"]
919 elif type_algo == "line_next_line":
920 prop_next_line = 0.5
922 list_tokens_ordered_by_y = sorted(list_tokens, key=lambda x: x["y"])
923 list_token_by_line = []
924 list_current_line = []
925 current_line_y = list_tokens_ordered_by_y[0]["y"]
926 for token in list_tokens_ordered_by_y:
927 this_y = token["y"]
928 if this_y < current_line_y + prop_next_line * mean_height:
929 list_current_line.append(token)
930 else :
931 list_token_by_line.append(sorted(list_current_line, key = lambda x : x["x"]))
932 current_line_y = this_y
933 list_current_line = [token]
934 list_token_by_line.append(sorted(list_current_line, key=lambda x: x["x"]))
936 new_text = ""
937 for one_line in list_token_by_line:
938 for token in one_line:
939 new_text += " " + token["text"]
941 else:
942 print ("Type algo not supported " + type_algo)
944 return new_text
948def concat_content_from_list_page_doc(list_page_doc,
949 reproduce_format_new_page=False,
950 height_line = 0,
951 reorder_paragraph_by_order_lex_token = False,
952 smart_new_line_from_token_pos = False,
953 list_class_copy = None):
954 from_json_content_copy = ""
955 current_position_vertical = 0
956 for page in list_page_doc:
957 list_ids_paragraph_to_keep = []
958 text_one_page = ""
959 for paragraph in page:
960 condition_keep = "class" not in paragraph or paragraph["class"] == "content" or paragraph["class"] == "unknown" or paragraph["class"] == "undefined" # or paragraph["class"] == "autre" # cas de bug
961 if list_class_copy == "all":
962 condition_keep = True
963 if "," in list_class_copy:
964 list_class_copy_as_list = list_class_copy.split(",")
965 condition_keep = "class" not in paragraph or paragraph["class"] in list_class_copy_as_list
966 if condition_keep:
967 list_ids_paragraph_to_keep.append(paragraph["id"])
969 if height_line == 0:
970 height_line = paragraph["h"]
971 text_one_page += paragraph["text"]# + "\n"
972 if reproduce_format_new_page:
973 if current_position_vertical == 0:
974 current_position_vertical = paragraph["y"]
975 else:
976 if paragraph["y"] > current_position_vertical + 0.8 * height_line:
977 text_one_page += "\n"
978 current_position_vertical = paragraph["y"]
979 else:
980 text_one_page += " "
981 else:
982 text_one_page += "\n"
984 if reorder_paragraph_by_order_lex_token:
985 try:
986 text_one_page = order_token_and_concat(find_list_tokens_to_keep(list_ids_paragraph_to_keep, page.list_blocks["tokens"]))
987 except Exception as e:
988 print(" Error in order_token_and_concat : " + str(e) + " for page : " + str(page) + " and list_ids_paragraph_to_keep : " + str(list_ids_paragraph_to_keep))
990 if smart_new_line_from_token_pos:
991 print(" TODO BOUH c'est complique")
993 from_json_content_copy += text_one_page
994 return from_json_content_copy
996def order_df_by_date(df):
997 if "datet" in df.columns:
998 df.sort_values(by="datet", ascending=True, inplace=True)
999 df.reset_index(drop=True, inplace=True)
1000 return df
1001 else:
1002 return df
1004def order_by_document_type(df):
1005 import pandas as pd
1006 if type(df) != pd.DataFrame or 'document_type' not in df.columns:
1007 print("No document_type in the DataFrame")
1008 return df
1009 # we just want to set the certif_at at the end
1010 # Sélectionnez les lignes où DocumentType n'est pas égal à 'certif'
1011 non_certif_fact = df[(df['document_type'] != 'certif_at') & (df['document_type'] != 'facture_inutile') & (df['document_type'] != 'facture_utile') & (df['document_type'] != 'facture')]
1012 # Sélectionnez les lignes où DocumentType est égal à 'certif'
1013 certif = df[df['document_type'] == 'certif_at']
1014 fact = df[(df['document_type'] == 'facture_inutile') | (df['document_type'] == 'facture_utile') | (df['document_type'] == 'facture')]
1015 # Concaténez les deux DataFrames en mettant les 'certif' à la fin
1016 df_sorted = pd.concat([non_certif_fact, certif, fact], ignore_index=True)
1017 # ``` Si vous tenez à conserver les index originaux dans le DataFrame résultant,
1018 # omettez `ignore_index=True`
1020 return df_sorted
1022def add_blank_line(df, nb_blank_line):
1023 import pandas as pd
1024 if type(df) != pd.DataFrame:
1025 print("No DataFrame")
1026 return df
1027 if nb_blank_line == 0:
1028 return df
1029 nb_current_line = len(df)
1030 df = pd.concat([df, pd.DataFrame([[""] * len(df.columns) for i in range(nb_blank_line)], columns=df.columns)], ignore_index=True)
1031 for i in range(nb_current_line, nb_current_line + nb_blank_line):
1032 df.loc[i, "id"] = i
1033 return df
1035def add_parsing_meta_info_to_table(df, list_json_to_mettre_en_forme, verbose = False):
1036 import pandas as pd
1037 if len(list_json_to_mettre_en_forme) == 0:
1038 return df
1039 # Franchement violent mais bon !
1040 df_from_json_parsed = pd.DataFrame(list_json_to_mettre_en_forme)
1042 if "date_parsed_or_forced" in df.columns:
1043 df_from_json_parsed.drop("date_parsed_or_forced", axis=1, inplace=True)
1044 if "document_type" in df.columns:
1045 df_from_json_parsed.drop("document_type", axis=1, inplace=True)
1046 # date_parsed_or_forced
1048 # df = df.reset_index()
1049 # df_from_json_parsed = df_from_json_parsed.reset_index()
1050# df = df.reset_index(drop=True)
1051# df.reset_index(inplace=True, drop=True)
1053# df = pd.concat([df, df_from_json_parsed], ignore_index=True)
1054 if verbose:
1055 print(" df : " + df.to_string()) if type(df) == pd.DataFrame else print(" df : " + str(df))
1056 print(" df_from_json_parsed : " + df_from_json_parsed.to_string()) if type(df_from_json_parsed) == pd.DataFrame else print(" df_from_json_parsed : " + str(df_from_json_parsed))
1057 if type(df) == pd.DataFrame and type(df_from_json_parsed) == pd.DataFrame:
1058 print(" Merge : " + str(df.shape) + " " + str(df_from_json_parsed.shape))
1059 if "Titre" in df.columns and "Titre" in df_from_json_parsed.columns:
1060 df_from_json_parsed = df_from_json_parsed.rename(columns={'Titre': 'TitreMeta'})
1061 df = pd.concat([df, df_from_json_parsed], axis=1) #.reset_index(drop=True)
1062 # on va plutot le cacher ! VR 25-4-24
1063# if "compte_rendu_complet_medecin" in df.columns: # since it comes from json and we do not ask for compte_rendu_complet_medecin in split_by_doc which creates the markdown table to be loaded in dataframe and we don't want to display it here from the parsing by document
1064# df = df.drop("compte_rendu_complet_medecin", axis=1)#, inplace=True)
1066 if "Commentaires" in df and "probleme_rencontre_extraction" in df:
1067 for i in range(len(df)):
1068 if df["probleme_rencontre_extraction"][i] != "":
1069 # df["Commentaires"][i] = str(df["Commentaires"][i]) + "\n" + str(df["probleme_rencontre_extraction"][i]) => ca c'est du chained indexing, pandas n'aime pas et puis quoi encore ?
1070 df.loc[i, "Commentaires"] = str(df.loc[i, "Commentaires"]) + "\n" + str(df.loc[i, "probleme_rencontre_extraction"])
1071# df["Commentaires"] = df["Commentaires"] + "\n" + df["probleme_rencontre_extraction"]
1072 df.drop("probleme_rencontre_extraction", axis=1, inplace=True)
1074 if "indication_medecin" in df.columns:
1075 df.drop(columns=["indication_medecin"], axis=1, inplace=True)
1077 return df
1081def create_json_match_date(intro, list_dates):
1082 intro_read = intro
1083 json_match_date = []
1084 if len(list_dates) == 0:
1085 return [{"text" : intro, "type" : "text"}]
1086 idx_char_wo_date = 0
1087 len_previous_part_intro = 0
1088 while idx_char_wo_date < len(intro_read):
1089 first_date = 0
1090 idx_first_date_arg_min = -1
1091 for idx_date in range(len(list_dates)):
1092 # Si Je n'ai pas envie de gérer une erreur, donc je ne vais etre uniquement dans le cas ou idx_char_wo_date == 0 et utiliser len_previous_part_intro et réduire la taille de intro au fur et à mesure
1093 next_char = intro_read.find(list_dates[idx_date])
1094 if next_char != -1:
1095 if first_date == 0 or next_char < first_date:
1096 first_date = next_char
1097 idx_first_date_arg_min = idx_date
1099 if idx_first_date_arg_min == -1:
1100 json_match_date.append({"text" : intro_read[idx_char_wo_date:], "type" : "text"})
1101 break
1102 else :
1103 if idx_char_wo_date < first_date:
1104 json_match_date.append({"text" : intro_read[idx_char_wo_date:first_date], "type" : "text"})
1105 json_match_date.append({"text" : list_dates[idx_first_date_arg_min], "type" : "date"})
1106 idx_char_wo_date = 0
1107 len_previous_part_intro += first_date + len(list_dates[idx_first_date_arg_min])
1108 if len(intro_read) > first_date + len(list_dates[idx_first_date_arg_min]):
1109 intro_read = intro_read[first_date + len(list_dates[idx_first_date_arg_min]):]
1110 else:
1111 print("Internal error, avoiding the rest of the intro !")
1113 return json_match_date
1115# Ajoute une bordure gauche
1116def add_border_left(paragraph):
1117 from docx.oxml import OxmlElement
1118 from docx.oxml.ns import qn
1119 # Crée un nouvel élément 'pBdr' (bordure de paragraphe)
1120 p_bdr = OxmlElement('w:pBdr')
1121 # Crée un élément 'left' pour la bordure gauche
1122 left_bdr = OxmlElement('w:left')
1123 # Définit les attributs de la bordure gauche - ici, un trait solide ('single') et la taille de la bordure
1124 left_bdr.set(qn('w:val'), 'single')
1125 left_bdr.set(qn('w:sz'), '4') # 4/8" taille de la bordure, où 8 est l'unité de mesure
1126 left_bdr.set(qn('w:space'), '4') # 4/20" d'espace entre la bordure et le texte
1127 left_bdr.set(qn('w:color'), 'auto') # 000000 Couleur de la bordure, en hexadécimal (noir ici)
1128 # Ajoute la bordure gauche à l'élément 'pBdr'
1129 p_bdr.append(left_bdr)
1131 p_pPr = OxmlElement('w:pPr')
1132 p_pPr.append(p_bdr)
1134 # Ajoute la bordure au paragraphe en modifiant son élément XML
1135# paragraph._element.insert(1, p_bdr)
1136 paragraph._element.insert(1, p_pPr)
1140def write_table_list_inner_document_0424_bis(df, input_col_intro,
1141 input_col_cr, out_file,
1142 hash_id_treatment, out_folder,
1143 format_info = {},
1144 verbose = False,
1145 content_resume = "", append_resume = False):
1146 audit_info_write = {}
1147 list_document_type_no_cr = format_info["list_type_doc_no_cr"] if "list_type_doc_no_cr" in format_info else []
1148 document_format = format_info["document"] if "document" in format_info else {}
1149 left_indent = document_format["left_indent"] if "left_indent" in document_format else 0
1150 new_line_intro_cr = document_format["new_line_intro_cr"] if "new_line_intro_cr" in document_format else False
1151 size_intro = document_format["size_intro"] if "size_intro" in document_format else 12
1152 size_cr = document_format["size_cr"] if "size_cr" in document_format else 11
1153 italic_cr = document_format["italic_cr"] if "italic_cr" in document_format else False
1154 list_variable_underline = format_info["list_variable_underline"] if "list_variable_underline" in format_info else []
1155 list_variable_bold = format_info["list_variable_bold"] if "list_variable_bold" in format_info else []
1156 cr_left_border = document_format["cr_left_border"] if "cr_left_border" in document_format else 0
1158 format_date = format_info["format_date"] if "format_date" in format_info else "%d %B %Y"
1159 regexp_date = format_info["regexp_date"] if "regexp_date" in format_info else r"(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})"
1160 if format_date == "%d %B %Y":
1161 regexp_date = r'(1er \w+ \d{4}|\d{1,2} \w+ \d{4})'
1162 if format_date == "%d/%m/%Y" or format_date == "%d-%m-%y":
1163 regexp_date = r'(1er[-/]\d{1,2}[-/]\d{4}|\d{1,2} \w+ \d{4})'
1165 total_text = ""
1167 # Argument to modularize : df, input_col_intro, input_col_cr, out_file, hash_id_treatment
1168 # Output : nb_file, nb_page, nb_modif_manual, total_text
1170 # Loop over df data and add the content of the document
1171 # VR TO MOVE
1172 from docx import Document
1173 # from docx.shared import Inches
1174 from docx.shared import Pt
1175 path_template = ""
1176 try :
1177 import os
1178 git_safia = os.getenv("GITSAFIA")
1179 input_file = "template_justif.docx"
1180 input_file = "O_DocumentEcritParGHetJustifier_sur_MSWORD.docx"
1181 path_template = os.path.join(git_safia, "prompt/python/data/template", input_file)
1182 except Exception as e:
1183 print(str(e))
1184 if os.path.exists(path_template) and False:
1185 document = Document(path_template)
1186 else:
1187 document = Document()
1189 if verbose:
1190 print(document.settings._element.xml)
1192 correct_settings_str = """<w:settings xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" xmlns:o="urn:schemas-microsoft-com:office:office" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main">
1193 <w:view w:val="print"/>
1194 <w:mirrorMargins w:val="0"/>
1195 <w:bordersDoNotSurroundHeader w:val="0"/>
1196 <w:bordersDoNotSurroundFooter w:val="0"/>
1197 <w:displayBackgroundShape/>
1198 <w:revisionView w:markup="1" w:comments="1" w:insDel="1" w:formatting="0"/>
1199 <w:defaultTabStop w:val="720"/>
1200 <w:autoHyphenation w:val="0"/>
1201 <w:evenAndOddHeaders w:val="0"/>
1202 <w:bookFoldPrinting w:val="0"/>
1203 <w:noLineBreaksAfter w:lang="français" w:val="‘“(〔[{〈《「『【⦅〘〖«〝︵︷︹︻︽︿﹁﹃﹇﹙﹛﹝「"/>
1204 <w:noLineBreaksBefore w:lang="français" w:val="’”)〕]}〉"/>
1205 <w:doNotExpandShiftReturn />
1206 <w:compat>
1207 <w:compatSetting w:name="compatibilityMode" w:uri="http://schemas.microsoft.com/office/word" w:val="15"/>
1208 </w:compat>
1209 <w:clrSchemeMapping w:bg1="light1" w:t1="dark1" w:bg2="light2" w:t2="dark2" w:accent1="accent1" w:accent2="accent2" w:accent3="accent3" w:accent4="accent4" w:accent5="accent5" w:accent6="accent6" w:hyperlink="hyperlink" w:followedHyperlink="followedHyperlink"/>
1210 </w:settings>"""
1212 from docx.oxml import parse_xml
1213 correct_settings_xml = parse_xml(correct_settings_str)
1214# document.settings._element = correct_settings_xml
1216 if verbose:
1217 print(document.settings._element.xml)
1219 map_nb_word_per_doc = {}
1220 map_type_document_per_doc = {}
1221 map_list_page_per_doc = {}
1223 # run = document.add_paragraph().add_run()
1224 '''Apply style'''
1225 style = document.styles['Normal']
1226 style.paragraph_format.line_spacing = 1
1227 style.paragraph_format.space_after = Pt(0)
1228 font = style.font
1229 font.name = document_format["font"] if "font" in document_format else 'Times New Roman'
1230 font.size = Pt(size_cr)
1232 # <w:pPr>
1233 # <w:pBdr>
1234 # <w:left w:val="single" w:sz="4" w:space="4" w:color="auto"/>
1235 # </w:pBdr>
1236 # <w:rPr>
1237 # <w:lang w:val="fr-FR"/>
1238 # </w:rPr>
1239 # </w:pPr>
1241 cr_justify = False
1242 if "cr_alignment" in document_format:
1243 if document_format["cr_alignment"] == "justify":
1244 cr_justify = True
1246 #paragraph.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY
1248# table.rows[1].cells[1].add_paragraph(item['description'].replace('\n', ' ')).alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
1250 # paragraph = document.add_paragraph('Some text\n')
1251 '''Add another sentence to the paragraph'''
1252 # sentence = paragraph.add_run('A new line that should have a different font')
1253 '''Then format the sentence'''
1254 # sentence.font.name = 'Arial'
1255 # sentence.font.size = docx.shared.Pt(10)
1257 if append_resume:
1258 p = document.add_paragraph()
1259 p.add_run("Resume fourni tel quel : " + content_resume + "\n\n").bold = True
1262 # VR 22/1/26 TODO : Add bordereau de la liste des pièces
1266# document.add_heading('Compte Rendu de dossier medical', 0)
1267 list_of_pages_as_csv_list = []
1268 list_of_pages_as_map_csv_min_int = []
1269 for index, row in df.iterrows():
1270 map_type_document_per_doc[index] = row["document_type"]
1271 liste_of_page = row["Liste des pages"]
1273 if liste_of_page == "":
1274 continue
1276 map_list_page_per_doc[index] = liste_of_page
1278 if liste_of_page != "" and liste_of_page != None:
1279 liste_of_page = str(liste_of_page)
1280 if liste_of_page.replace(" ", "").replace(",", "").isdigit():
1281 list_of_pages_as_map_csv_min_int.append({"csv" : liste_of_page.replace(" ", ""), "min_int" : min(list(map(int, liste_of_page.split(","))))})
1282 list_of_pages_as_csv_list.append(liste_of_page.replace(" ", ""))
1283 if input_col_intro in df.columns:
1284 intro = row[input_col_intro]
1285 else:
1286 intro = "Missing data wtf"
1287 if input_col_cr in df.columns:
1288 cr = row[input_col_cr]
1289 else :
1290 cr = "No Data Provided"
1291 # intro = df.loc[index, input_col_intro]
1292 # cr = df.loc[index, input_col_cr]
1294 # paragraph = document.add_paragraph(intro)
1295 '''Add another sentence to the paragraph'''
1296 # sentence = paragraph.add_run(cr)
1297 '''Then format the sentence'''
1298 # sentence.font.name = 'Arial'
1299 # sentence.font.size = docx.shared.Pt(10)
1301 p = document.add_paragraph()
1302 if type(intro) == float:
1303 print("WTF : " + str(intro))
1304 intro = str(intro)
1306 if len(list_variable_underline) == 0 and len(list_variable_bold) == 0:
1307 p.add_run(intro).font.size = Pt(size_intro)
1308 else:
1309 # VR TODO 9-8-24 Ici on va adapter le style des dates, soit on a la liste des dates, mais je ne sais pas exactement ou la construire, soit on a parse, mais il faut créer les expression régulière à partir du format des dates, cela peut se faire au cas par cas meme si ce n'est pas l'idéal
1310 all_dates = re.findall(regexp_date, intro)
1311 list_dates = []
1312 if all_dates:
1313 for date in all_dates:
1314 list_dates.append(date)
1316 # On pourrait traiter les cas particulier, ou une seule date est présente ou bien ou chaque date est présente une unique fois
1317 # VR TODO 9-8-24 : on pourrait aussi faire une fonction auxiliaire
1318 from lib.lib_util import create_json_match_date
1319 json_match_date = create_json_match_date(intro, list_dates)
1321 underline_date = len(list_variable_underline) > 0 and "date" in list_variable_underline
1322 bold_date = len(list_variable_bold) > 0 and "date" in list_variable_bold
1323 if underline_date and bold_date:
1324 print("Only bold will be done !")
1326 for part in json_match_date:
1327 if part["type"] == "text":
1328 p.add_run(part["text"]).font.size = Pt(size_intro)
1329 else:
1330 if underline_date:
1331 p.add_run(part["text"]).underline = True
1332 if bold_date:
1333 p.add_run(part["text"]).bold = True
1335 # p = document.add_paragraph()
1336 # p.add_run(cr_content).underline = True
1337 # p.add_run(cr_content).bold = True
1339# p.add_run(intro).font.size = Pt(12)
1341 try:
1342 doc_type = row["document_type"]
1343 except Exception as e:
1344 doc_type = "unknown"
1345 print(str(e))
1346 if doc_type in list_document_type_no_cr:
1347 cr = "\n"
1348 p.add_run(cr)
1349 else:
1350 cr_strip = cr.strip("\n")
1351 if cr_strip == "":
1352 continue
1353 cr_content = "«" + cr_strip + "»" + "\n"
1354 print("Warning on pourrait faire cela plus propre enfin bon")
1355 cr_content = cr_content.replace("««", "«")
1356 cr_content = cr_content.replace("»»", "»")
1358 if new_line_intro_cr:
1359 cr_content = "\n" + cr_content
1361 for one_cr_content_line in cr_content.split("\n"):
1362 p = document.add_paragraph()
1363 if cr_justify:
1364 from docx.enum.text import WD_ALIGN_PARAGRAPH
1365 # p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY
1366 p.paragraph_format.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY
1367 if left_indent > 0:
1368 p.paragraph_format.left_indent = Pt(left_indent)
1369 #cr_content = "\t" + cr_content.replace("\n", "\n\t")
1371 if italic_cr:
1372 p.add_run(one_cr_content_line).italic = True
1373 else:
1374 p.add_run(one_cr_content_line)
1376 if cr_left_border:
1377 add_border_left(p)
1379 map_nb_word_per_doc[index] = len(intro.split(" ")) + len(cr.split(" ")) + 2
1381 total_text += intro
1382 total_text += str(cr)
1384 # ordonner le dictionnaire list_of_pages_as_map_csv_min_int par la clef min_int pour avoir un ordre croissant
1385 list_of_pages_as_map_csv_min_int = sorted(list_of_pages_as_map_csv_min_int, key=lambda x : x["min_int"])
1386 list_of_pages_as_csv_list = list(map(lambda x : x["csv"], list_of_pages_as_map_csv_min_int))
1387 liste_of_page_as_ccsv = ";".join(list_of_pages_as_csv_list)
1388 outfile_name_docx = hash_id_treatment + ".docx"
1389 link_for_manax_temp = os.path.join(out_folder, outfile_name_docx)
1390 if out_file == None:
1391 out_file = link_for_manax_temp
1392 else:
1393 out_file = os.path.join(out_folder, out_file)
1394 print("About to out_file : " + out_file)
1395 if not os.path.exists(os.path.dirname(out_file)):
1396 os.makedirs(os.path.dirname(out_file))
1397 document.save(out_file)
1398 document.save(link_for_manax_temp)
1399 print("out_file : " + out_file)
1400 os.system("chmod 755 " + out_file)
1401 os.system("chmod 755 " + link_for_manax_temp)
1403 audit_info_write["map_nb_word_per_doc"] = map_nb_word_per_doc
1404 audit_info_write["map_type_document_per_doc"] = map_type_document_per_doc
1405 audit_info_write["list_of_pages_as_sccsv"] = liste_of_page_as_ccsv
1407 return total_text, os.path.basename(out_file), audit_info_write #outfile_name_docx
1409# depreadacte 15/5/24
1410def write_table_list_inner_document(df, doc, with_hyperlink = False):
1412 import docx
1413 from docx.oxml.shared import OxmlElement
1414 if doc == None:
1415 from docx import Document
1416 # Créer un nouveau document Word
1417 doc = Document()
1419 # Ajouter un titre
1420 doc.add_heading('Index For Documents', level=1)
1421# toc_paragraph = document.add_paragraph("Table des Matières\n", style='Heading1')
1423 # Ajouter une table au document Word avec une ligne pour les en-têtes
1424 table = doc.add_table(rows=1, cols=len(df.columns))
1426 # Définir le style de la table
1427 table.style = 'Table Grid'
1429 styles = doc.styles
1430 print(str(len(styles)))
1432 # Ajouter les en-têtes de colonnes
1433 hdr_cells = table.rows[0].cells
1434 for i, col_name in enumerate(df.columns):
1435 hdr_cells[i].text = col_name
1437 # Ajouter les lignes de données à la table
1438 for index, row in df.iterrows():
1439 row_cells = table.add_row().cells
1440 for i, value in enumerate(row):
1441 row_cells[i].text = str(value)
1443 if with_hyperlink:
1444 toc_paragraph = doc.add_paragraph("Listes des liens (un peu rustique du coup)\n", style='Heading1')
1446 for index, row in df.iterrows():
1447 anchor = f"#_Ref{index}"
1448 # add_hyperlink(toc_paragraph, row['titre'], anchor)
1449 hyperlink = docx.oxml.shared.OxmlElement('w:hyperlink')
1450 hyperlink.set(docx.oxml.shared.qn('w:anchor'), anchor)
1453 run = toc_paragraph.add_run()
1454 run.text = "Lien vers un paragraph\n"
1455 rPr = run._r.get_or_add_rPr()
1457 rStyle = OxmlElement('w:rStyle')
1458 rStyle.set(docx.oxml.shared.qn('w:val'), 'Hyperlink') # Utilisez le style de lien hypertexte ici
1459 rPr.append(rStyle)
1463 # [ ] TODO VR : l'archi est vraiment merdique de la creation des documents, faut-il une classe ou autre chose, notament pour avoir une interface web etcaetera ?
1464# doc.add_heading('Compte Rendu de dossier medical', 0)
1465 return doc
1467def to_save_work_use_or_suppress_with_hyperlink(document, index_for_hyperlink = 0):
1469 p = document.add_paragraph()
1471 import docx
1472 from docx.oxml.shared import OxmlElement
1474 # p = document.add_paragraph(style='Heading2')
1475 # Ajouter un élément de signet pour la navigation
1476 bookmark_start = OxmlElement('w:bookmarkStart')
1477 bookmark_start.set(docx.oxml.shared.qn('w:id'), str(index_for_hyperlink))
1478 bookmark_start.set(docx.oxml.shared.qn('w:name'), f"_Ref{index_for_hyperlink}")
1479 p._p.addnext(bookmark_start)
1481 bookmark_end = OxmlElement('w:bookmarkEnd')
1482 bookmark_end.set(docx.oxml.shared.qn('w:id'), str(index_for_hyperlink))
1483 bookmark_start.addnext(bookmark_end)
1485def format_one_res(input,
1486 new_format_info, format_premier, format_date,
1487 verbose = False, list_variable_bold = [], list_variable_underline = []): # on pourrait juste mettre à -1 quand on ne l'a pas
1489 print("format_one_res")
1490 try:
1491 import locale
1492# loc = locale.getlocale(locale.LC_ALL) # get current locale
1493 locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8') # use German locale; name might vary with platform
1494 except Exception as e:
1495 print(" Pb setting local")
1496 print(str(e))
1498 format_date_hc = "%d %B %Y"
1499 format_date_used = format_date if format_date != "" else format_date_hc
1501 new_new_content = new_format_info
1502 for k in input: # Il y a un truc à faire pour les dates
1503 if k == "datet" or k == "date_fin_arret_travailt" or k == "date_entree_hospitalisationt" or k == "date_sortie_hospitalisationt" or k == "date_debut_arret_travailt":
1504 replace = input[k].strftime(format_date_used)
1505 if input[k].day == 1 and format_premier == "jd":
1506 replace = replace.replace("01 ", "1\u1D31\u1D3F ")
1507# if k in list_variable_bold:
1508# replace = "\033[1m{}\033[0m".format(replace)
1509 # Vérifiez si la clé doit être soulignée
1510# if k in list_variable_underline:
1511# replace = "\033[4m{}\033[0m".format(replace)
1513# replace = "1\u1D31\u1D3F " + input[k].strftime("%B %Y")
1514 print("-² 1er")
1515 print(" U+2091 U+02B3 U+1D31 U+1D3F 1\u1D31\u1D3F \u2091 \u02B3")
1516 new_new_content = new_new_content.replace("{" + k + "}", str(replace))
1517 else :
1518 new_new_content = new_new_content.replace("{" + k + "}", str(input[k]))
1520 print(" new_new_content : " + new_new_content)
1522 return new_new_content
1526def compute_list_input_to_format_per_document(format_info):
1528 map_list_input_by_document = {}
1529 for doc_type in format_info:
1530 list_needed_input = []
1531 # Plutot que d'utiliser la liste des nom des meta données, on va faire un regexp pour chercher les mot entre accolade
1532 import re
1533 # On va chercher les mots entre accolades
1534 matchs = re.findall(r'\{(.*?)\}', format_info[doc_type])
1535 if matchs:
1536 for match in matchs:
1537 if match not in list_needed_input:
1538 list_needed_input.append(match)
1539 else :
1540 print("Awkward !")
1541 map_list_input_by_document[doc_type] = list_needed_input
1543 return map_list_input_by_document
1545def show_frame(frame, bbox_list=None, text=None,
1546 save_flag=False, save_name=None, wait_flag=False):
1547 import cv2
1549 # A list of colors to indicate the order of bounding boxes drawn.
1550 color_list = [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0],
1551 [255, 0, 255], [0, 255, 255]]
1552 color_list = color_list + [255, 255, 255]*20
1554 # Convert the frame to a BGR image if the input is grayscale.
1555 if len(frame.shape) == 2:
1556 frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
1558 # Draw a bounding box, if a bounding box was given.
1559 if bbox_list:
1560 for i, bbox in enumerate(bbox_list):
1561 tl, br = bbox[0], bbox[1]
1562 frame = cv2.rectangle(frame, tl, br, color_list[i], 4)
1564 # Draw a text box, if a text string given. Add rectangle to emphasize text.
1565 if text:
1566 tbox_tl, tbox_br = (0, 0), (220, 25)
1567 frame = cv2.rectangle(frame, tbox_tl, tbox_br, (255, 255, 255), -1)
1569 # Add the text on top of the rectangle to the displayed frame. The
1570 # cv2.putText() function places text based on the bottom left corner.
1571 text_bl = (tbox_tl[0] + 5, tbox_br[1] - 5)
1572 frame = cv2.putText(frame, text, text_bl,
1573 cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)
1575 # Display the frame and wait for input if the wait flag is enabled.
1576 cv2.imshow('frame', frame)
1577 if wait_flag:
1578 cv2.waitKey(0)
1580 # Save the frame if the save_flag is enabled.
1581 if save_flag:
1582 cv2.imwrite(save_name, frame)
1586import re
1587def parse_id_date_nb_page_folder(text):
1588 # nb_5_id_3_d_210224
1590 import os
1591 text = os.path.basename(text)
1593 # Création des patterns pour NB, ID et D
1594 nb_pattern = r'NB[_:]\s*(\d+)'
1595 id_pattern = r'ID[_:]\s*(\d+)'
1596 date_pattern = r'(?:^|[^I])D[_:]\s*(\d{2,5,6,7,8}|\d{2,8})' # Supposition d'un format de date comme ddmmyy ou ddmmyyyy
1597# date_pattern = r'D[_:]\s*(\d{8})' # Supposition d'un format de date comme ddmmyy ou ddmmyyyy
1599 # Recherche des patterns dans le texte
1600 nb_result = re.search(nb_pattern, text.upper())
1601 id_result = re.search(id_pattern, text.upper())
1602 date_result = re.search(date_pattern, text.upper())
1604 # Extraction des résultats
1605 nb = nb_result.group(1) if nb_result else 0
1606 id = id_result.group(1) if id_result else 0
1607 date = date_result.group(1) if date_result else None
1609 return nb, id, date
1613def create_prefix_file_name_from_json_prefix(json_prefix):
1614 nb = json_prefix["nb"] if "nb" in json_prefix else 0
1615 id = json_prefix["id"] if "id" in json_prefix else 0
1616 date = json_prefix["date"] if "date" in json_prefix else ""
1618 return create_prefix_file_name_from_id_date_nb_page_folder(nb, id, date)
1620def create_prefix_file_name_from_id_date_nb_page_folder(nb, id, date):
1621 prefix_file_name = "id_" + str(id) + "_nb_" + str(nb) + "_d_" + str(date)
1622 return prefix_file_name
1624def get_id_order_document(document_id):
1625 if "_" not in document_id:
1626 return -1
1627 else:
1628 last_info = document_id.split("_")[len(document_id.split("_"))-1]
1629 return int(last_info)
1631import requests
1632# from https://www.tutorialspoint.com/how-to-check-whether-user-s-internet-is-on-or-off-using-python
1633def internet_connection():
1634 try:
1635 response = requests.get("https://www.fotonower.com", timeout=5)
1636 return True
1637 except requests.ConnectionError:
1638 return False
1639#if internet_connection():
1640# print("The Internet is connected.")
1641#else:
1642# print("The Internet is not connected.")
1644def prepare_pagination(nb_page, page, max = 5):
1645 if nb_page <= max:
1646 return list(range(1, nb_page + 1))
1647 else:
1648 if page <= max // 2:
1649 return list(range(1, max + 1)) + ["..."] + [nb_page]
1650 elif page > nb_page - max // 2:
1651 return [1] + ["..."] + list(range(nb_page - max + 1, nb_page + 1))
1652 else:
1653 return [1] + ["..."] + list(range(page - max // 2, page + max // 2 + 1)) + ["..."] + [nb_page]
1656def remove_circular_refs(ob, _seen=None, verbose = False):
1657 if _seen is None:
1658 _seen = set()
1659 if id(ob) in _seen:
1660 if verbose:
1661 print(" Remove circular reference with id " + str(id(ob)) + " : ob " + str(ob))
1662 # circular reference, remove it.
1663 return None
1664 _seen.add(id(ob))
1665 res = ob
1666 if isinstance(ob, dict):
1667 res = {
1668 remove_circular_refs(k, _seen, verbose = verbose): remove_circular_refs(v, _seen, verbose = verbose)
1669 for k, v in ob.items()}
1670 elif isinstance(ob, (list, tuple, set, frozenset)):
1671 res = type(ob)(remove_circular_refs(v, _seen, verbose = verbose) for v in ob)
1672 # remove id again; only *nested* references count
1673 _seen.remove(id(ob))
1674 return res
1677def change_nan_to_string(ob):
1678 import math
1680 res = ob
1681 if isinstance(ob, dict):
1682 res = {k: change_nan_to_string(v)
1683 for k, v in ob.items()}
1684 elif isinstance(ob, (list, tuple, set, frozenset)):
1685 res = type(ob)(change_nan_to_string(v) for v in ob)
1686 else:
1687 if isinstance(ob, float) and (math.isnan(ob) or math.isinf(ob)):
1688 res = "NaN"
1689 elif isinstance(ob, str):
1690 res = ob
1691 else:
1692 res = ob
1694 return res
1697def load_json(file_path):
1698 import json
1699 with open(file_path, 'r') as json_file:
1700 data = json.load(json_file)
1701 return data
1703def load_csv_as_dict(input_csv):
1704 input_from_csv = {}
1705 for k in input_csv.split(","):
1706 if k != "":
1707 try:
1708 key, value = k.split("=")
1709 except Exception as e:
1710 print("Error in input_csv : " + str(e))
1711 continue
1712 input_from_csv[key] = value
1713 return input_from_csv
1716def aux_parse_date_in_text(input_text):
1717 list_dates = []
1719# Voici un exemple de texte d'entrée contenant des dates dans différents formats
1720 texte = """
1721Le rendez-vous aura lieu le 12/04/2021 et sera suivi d'une autre rencontre le 23-05-2022.
1722Il y a aussi événement prévu pour le 01/08, sans oublier le 15 janvier 2023.
1723Nous avons aussi noté une réunion le 2 février et un appel le mercredi 10 mars 2021.
1724"""
1726# Liste des motifs pour les dates dans différents formats
1727 motifs_dates = [
1728 r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', # Format DD/MM/YYYY ou DD-MM-YYYY
1729 r'\b\d{1,2}[/-]\d{1,2}\b', # Format DD/MM ou DD-MM
1730 r'\b\d{1,2}\s+janvier|\b\d{1,2}\s+février|\b\d{1,2}\s+mars|\b\d{1,2}\s+avril|\b\d{1,2}\s+mai|\b\d{1,2}\s+juin|\b\d{1,2}\s+juillet|\b\d{1,2}\s+août|\b\d{1,2}\s+septembre|\b\d{1,2}\s+octobre|\b\d{1,2}\s+novembre|\b\d{1,2}\s+décembre', # Format DD mois
1731 r'\b\d{1,2}\s+mois\s+\d{4}', # Format DD mois YYYY avec 'mois' comme séparateur
1732]
1734# Fonction qui cherche et affiche toutes les dates trouvées dans le texte
1736 dates = []
1737 for motif in motifs_dates:
1738 correspondances = re.findall(motif, input_text)
1739 dates.extend(correspondances)
1741 return dates
1743def parse_date_test_before_own_datou_step(list_page_content, verbose = True):
1744 map_res_page_date = {}
1745 for sdp in list_page_content:
1746# if verbose:
1747# print(" sdp : " + str(sdp))
1748 dates = aux_parse_date_in_text(sdp.content)
1749 if verbose:
1750 print(" sdp.page_number : " + str(sdp.page_number))
1751 print(" sdp.file : " + str(sdp.source_image))
1752 print(" dates : " + str(dates))
1754 filename = os.path.basename(sdp.source_image)
1755 map_res_page_date[filename] = dates
1757 return map_res_page_date
1760# refacto for smart split
1761def create_transcript_group_of_pages(list_of_list_of_pages,
1762 map_text = {},
1763 list_of_sub_doc_page_with_content = None,
1764 begin_page = True,
1765 end_page = True,
1766 verbose = False):
1767 print(" begin_page : " + str(begin_page) + " end_page : " + str(end_page))
1768 complete_texts = []
1769 begin_page_txt = ""
1770 end_page_txt = ""
1771 for list_of_pages in list_of_list_of_pages:
1772 complete_text = ""
1773 for page in list_of_pages:
1774 text = map_text[page]
1775 # list_of_sub_doc_page_with_content[page].content
1776 if begin_page:
1777 print(" begin_page is true : ")
1778 begin_page_txt = "\n------\nBegin Page " + str(page) + "\n------\n"
1779 if verbose:
1780 print(" begin_page_text was set ")
1781 print(" begin_page_txt : " + str(begin_page_txt) + " end_page_txt : " + str(end_page_txt))
1782 print("\n------\nBegin Page " + str(page) + "\n------\n")
1783 print(str(page))
1784 else:
1785 print(" begin_page is false : ")
1786 if end_page:
1787 end_page_txt = "\n------\nEnd Page " + str(page) + "\n------\n"
1788 print(" begin_page_txt : " + str(begin_page_txt[:50]).replace("\n", "§§") + " end_page_txt : " + str(end_page_txt[:50]).replace("\n", "§§"))
1789 complete_text += begin_page_txt + text + end_page_txt
1790 complete_texts.append(complete_text)
1792 return complete_texts
1796# pages above nb_page will be ignored
1797def build_list_of_list_from_split(end_page_as_csv, nb_page):
1798 if end_page_as_csv == "":
1799 return [list(range(1, nb_page))]
1801 end_page_as_list = list(map(int, end_page_as_csv.split(",")))
1803 if nb_page not in end_page_as_list:
1804 end_page_as_list.append(nb_page)
1806 end_page_as_list_ordered = sorted(end_page_as_list)
1807 if 0 in end_page_as_list_ordered:
1808 end_page_as_list_ordered.remove(0)
1810 if len(end_page_as_list_ordered) == 0:
1811 print("Internal error in build_list_of_list_from_split ! ")
1812 # would be done by the loop since we had nb_page !
1813 return [list(range(1, nb_page))]
1815 id_page = 1
1816 id_end_input = 0
1817 id_page_end = end_page_as_list_ordered[id_end_input]
1818 if id_page > id_page_end:
1819 print("Internal error in build_list_of_list_from_split on id_page ! : " + str(id_page))
1820 # would be done by last condition, ehh
1821 return [list(range(1, nb_page))]
1824 list_of_list_of_page = []
1825 current_list = [id_page]
1826 while id_page <= nb_page and id_end_input < len(end_page_as_list_ordered):
1827 id_page_end = end_page_as_list_ordered[id_end_input]
1828 if id_page == id_page_end:
1829 if len(current_list) > 0:
1830 list_of_list_of_page.append(current_list)
1831 current_list = []
1832 id_end_input = id_end_input + 1
1834 id_page += 1
1835 if id_page <= nb_page:
1836 current_list.append(id_page)
1837 # VR TODO 26-4-25 : not clear why we need to do this !
1838 if len(list_of_list_of_page) == 0:
1839 list_of_list_of_page.append(current_list)
1841 return list_of_list_of_page
1843def director_cut(id_file,
1844 df_by_doc,
1845 paragraphs):
1846 import graphviz
1848 dot = graphviz.Digraph(id_file + "_treatment", comment='Traitement du dossier d\'expertise ' + id_file)
1850 all_pages = dot.subgraph(name='All Pages') #, label='all_pages')
1851 all_text_cr = dot.subgraph(name='All Text CR') #, label='all_text_cr')
1852 all_docs = dot.subgraph(name='All Doc') #, label='all_docs')
1854 dot.node('pdf', 'Dossier anonymisé')
1855 dot.node('poubelle', 'Texte caviardé')
1856 dot.node('docx', 'Données extraites pour traitement')
1858 id_doc = 0
1859 for list_page in df_by_doc["Liste des pages"]:
1860 list_of_page = list(map(int, list_page.split(","))) if list_page != "" else []
1861 medecin_nom = "medecin_nom"
1862 document_type = "document_type"
1863 try:
1864 document_type = df_by_doc["document_type"][id_doc]
1865 medecin_nom = df_by_doc["medecin_nom"][id_doc]
1866 except Exception as e:
1867 print(str(e))
1868 all_docs.node("doc_" + str(id_doc), 'Document ' + str(id_doc) + " : " + document_type + " : " + medecin_nom)
1869 for page in list_of_page:
1870 all_pages.node("page_" + str(page), 'Page ' + str(page))
1871 dot.edge('pdf', "page_" + str(page), constraint='false')
1872 for p in paragraphs[page - 1]:
1873 all_text_cr.node("par_" + str(page) + "_" + str(p['id']), p["text"])
1874 dot.edge("page_" + str(page), "par_" + str(page) + "_" + str(p['id']), constraint='false')
1875 dot.edge("par_" + str(page) + "_" + str(p['id']), "doc_" + str(id_doc), constraint='false')
1876# dot.edge("page_" + str(page), "poubelle", constraint='false')
1877 # dot.edge("page_" + str(page), "doc_" + str(id_doc), constraint='false')
1878 # dot.edge("page_" + str(page), "doc_" + str(id_doc), constraint='false')
1879 dot.edge("doc_" + str(id_doc), 'docx')
1881 id_doc = id_doc + 1
1883 dot.attr('node', shape='oval', fontname='Helvetica')
1884 dot.attr('edge', fontsize='12')
1885 dot.attr('graph', splines='true', overlap='false')
1887 # graph [splines=true overlap=false];
1889 # neato, fdp (needs overlap=prism ?) , sfdp
1890 dot.attr(layout='fdp')
1891 dot.attr(overlap='prism')
1893# dot.source
1894 dot.render(directory='doctest-output').replace('\\', '/')
1896 return 0
1898# util parser usage devops coverage pytest et local_storage pour graph pourcentage coverage
1899def parse_coverage_from_python(file_contents):
1900 import BeautifulSoup
1901 # Parsez le contenu HTML avec BeautifulSoup
1902 soup = BeautifulSoup(file_contents, 'html.parser')
1904 span_value = "-1"
1905 try:
1906 # Recherchez l'élément span avec la classe 'pc_cov' et extrayez son texte
1907 span_value = soup.find('span', {'class': 'pc_cov'}).text
1909 # Affichez la valeur
1910 print(span_value)
1911 except Exception as e:
1912 print(str(e))
1913 span_value = "1666%"
1915 return span_value
1919def humanize_size_file(value_in_byte):
1920 import math
1921 if value_in_byte == None or math.isnan(value_in_byte):
1922 return "n/c"
1923 if value_in_byte < 1024:
1924 return str(value_in_byte) + " B"
1925 elif value_in_byte < 1048576:
1926 return str(int(value_in_byte / 1024)) + " KB"
1927 elif value_in_byte < 1048576 * 1024:
1928 round_size = value_in_byte / 1048576
1929 if round_size < 100:
1930 return str(int(10 * round_size) / 10) + " MB"
1931 else :
1932 return str(int(round_size)) + " MB"
1933 elif value_in_byte < 1048576 * 1048576:
1934 return str(int(value_in_byte / (1024 * 1048576))) + " GB"
1935 elif value_in_byte < 1048576 * 1048576 * 1024:
1936 return str(int(value_in_byte / (1048576 * 1048576))) + "TB"
1937 else :
1938 return "TOO BIG, WILL FAIL !"
1940from io import BytesIO
1941import qrcode
1942from base64 import b64encode
1945def get_b64encoded_qr_image(data):
1946 print(data)
1947 qr = qrcode.QRCode(version=1, box_size=10, border=5)
1948 qr.add_data(data)
1949 qr.make(fit=True)
1950 img = qr.make_image(fill_color='black', back_color='white')
1951 buffered = BytesIO()
1952 img.save(buffered)
1953 return b64encode(buffered.getvalue()).decode("utf-8")
1955def from_list_page_per_doc_ccsv_to_list_of_list_of_page(list_page_per_doc):
1956 """
1957 Convert a semicolon-separated string of page numbers into a list of lists.
1958 Each sublist contains the page numbers for a specific document.
1959 """
1960 list_of_list_of_page = []
1961 nb_page = 0
1962 max_page = 0
1963 try:
1964 if list_page_per_doc == "":
1965 return list_of_list_of_page, nb_page, max_page
1967 for doc_pages in list_page_per_doc.split(";"):
1968 if doc_pages.strip() != "":
1969 list_of_page = list(map(int, doc_pages.split(",")))
1970 max_page = max(max_page, max(list_of_page))
1971 nb_page += len(list_of_page)
1972 list_of_list_of_page.append(list_of_page)
1973 except Exception as e:
1974 print("Error in from_list_page_per_doc_ccsv_to_list_of_list_of_page : " + str(e))
1975 return list_of_list_of_page, nb_page, max_page
1976 if nb_page < max_page:
1977 print("ERROR treated as Warning : nb_page < max_page, this is not expected, check your input list_page_per_doc : " + str(list_page_per_doc))
1978 return list_of_list_of_page, nb_page, max_page
1980# peut-etre deprecated, mais faudrait utiliser from_list_page_per_doc_ccsv_to_list_of_list_of_page
1981def managing_deprecated_input_text_concat_into_list(text, list_page_per_doc):
1982 list_of_list_of_page, nb_page, max_page = from_list_page_per_doc_ccsv_to_list_of_list_of_page(list_page_per_doc)
1983 if nb_page < max_page:
1984 print("ERROR treated as Warning : nb_page < max_page, this is not expected, check your input list_page_per_doc : " + str(list_page_per_doc))
1985 list_texts_per_doc = []
1986# id_page = 1
1987 list_text_per_page = text.split("""------
1988Begin Page""")
1989 list_text_per_page = list_text_per_page[1:] # On enlève le premier élément qui est vide
1990 for i in range(len(list_text_per_page)):
1991 list_text_per_page[i] = '''------
1992Begin Page''' + list_text_per_page[i]
1993 if len(list_text_per_page) != nb_page:
1994 print("ERROR treated as Warning : len(list_text_per_page) != nb_page, this is not expected, check your input text : " + str(text))
1995 try:
1996 for list_of_page in list_of_list_of_page:
1997 one_doc_content_concat = ""
1998 one_doc_content_concat = "\n".join([list_text_per_page[page - 1] for page in list_of_page])
1999 list_texts_per_doc.append(one_doc_content_concat)
2000 except Exception as e:
2001 print("Error in from_list_page_per_doc_ccsv_to_list_of_list_of_page : " + str(e))
2002 return []
2003 return list_texts_per_doc