Coverage for lib/datou/lib_parallel.py: 92%
112 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-01-26 23:58 +0100
« prev ^ index » next coverage.py v7.9.1, created at 2026-01-26 23:58 +0100
4import os
6from lib.datou.lib_datou_step_template import aux_map_reduce_loop
7def datou_parallel_aux(list_texts_aux, res_json_field,
8 aux_input_var, list_steps, list_param_json_steps,
9 param_json, verbose, privacy, with_audit, strat_reduce, hit_main, id_step_incomplete_args, i, q):
10 list_audit_map_reduce = []
11 reduced_result = ""
12 reduced_result = aux_map_reduce_loop(list_texts_aux, res_json_field, aux_input_var, list_steps, list_param_json_steps,
13 param_json,
14 verbose, privacy, with_audit, strat_reduce, reduced_result,
15 list_audit_map_reduce, hit_main, id_step_incomplete_args)
16 q.put((reduced_result, list_audit_map_reduce, i))
18def datou_parallel_map_reduce(list_texts, res_json_field,
19 aux_input_var, list_steps, list_param_json_steps, param_json, verbose,
20 privacy, with_audit, strat_reduce,
21 nb_thread = 10,
22 hit_main = "unknown_hit",
23 id_step_incomplete_args = None):
25 import queue as Queue
26 import threading
28 q = Queue.Queue()
29 print(" queue : " + str(len(q.queue)))
30 sub_list_pids = []
31 threads = []
32 # VR 18-3-18 : pourquoi y a t-il // ?
33 each_thread = len(list_texts) // nb_thread + 1
35 for j in range(0, nb_thread - 1):
36 sub_list_pids.append(list_texts[j * each_thread:(j + 1) * each_thread])
38 sub_list_pids.append(list_texts[(nb_thread - 1) * each_thread:])
40 print(" queue : " + str(len(q.queue)))
41 # for i in range(0, nb_thread):
42 for i in range(nb_thread):
43 print(" queue : " + str(len(q.queue)))
44 thread_temp = threading.Thread(
45 target=datou_parallel_aux,
46 args=(sub_list_pids[i], res_json_field,
47 aux_input_var, list_steps, list_param_json_steps,
48 param_json, verbose, privacy, with_audit, strat_reduce, hit_main, id_step_incomplete_args, i, q)
49 )
50 thread_temp.start()
51 threads.append(thread_temp)
52 print(" queue : " + str(len(q.queue)))
54 for item in threads:
55 item.join()
57 print(" queue : " + str(len(q.queue)))
58 map_reduced_result = {}
60 reduced_result = ""
61 list_audit_map_reduce = []
63 while len(q.queue) > 0:
64 try:
65 item = q.get()
66# page_number = int(item[0][-5:-4])
67 reduced_result_aux = item[0]
68 list_audit_map_reduce_aux = item[1]
69 i = item[2]
70 map_reduced_result[i] = reduced_result_aux
71 list_audit_map_reduce.append(list_audit_map_reduce_aux)
72 except Exception as e:
73 print(str(e))
75 q.task_done()
76 for i in range(nb_thread):
77 reduced_result += map_reduced_result[i]
78 print(" DOIT ON FAIRE PAREIL POUR LES list_audit_map_reduce ??")
80 return reduced_result, list_audit_map_reduce
82def test_parrallel_read_ocr(sub_list_pids, q, model, verbose, map_file_size, map_file_text,
83 folder_export_boxes, begin_page, end_page, file_output,
84 layer_api = None, vllm_model = None, request_used = None):
85 from lib.datou.lib_datou_step_template import sub_func_read_ocr
86 count = 0
87 for f in sub_list_pids:
88 print(f)
89 count, text, sdp = sub_func_read_ocr(f, count, model, verbose, map_file_size, map_file_text,
90 folder_export_boxes, begin_page, end_page, file_output,
91 layer_api = layer_api, vllm_model = vllm_model, request_used = request_used)
92 q.put((f, "some result vraiment bien", sdp, text, count))
93 if len(sub_list_pids) == 0:
94# else:
95 q.put(("XXXXXX", "some result vraiment bien", None, None, count))
98def multi_thread_image_read(model, verbose, map_file_size, map_file_text,
99 folder_export_boxes, begin_page, end_page, file_output,
100 nb_thread = 10,
101 list_pngs = [],
102 layer_api=None, vllm_model=None,
103 request_used = None):
104 from lib.datou.lib_datou_step_template import sub_func_read_ocr
106# IN list_pngs, count, model, verbose, map_file_size, map_file_text, folder_export_boxes,
107# # begin_page, end_page, file_output
108# OUT list_page_content, complete_text, count
112# for f in list_pngs:
113# if limit > 0 and count > limit:
114# break
115# count, text, sdp = sub_func_read_ocr(f, count, model, verbose, map_file_size, map_file_text, folder_export_boxes,
116# begin_page, end_page, file_output)
117#
118# list_page_content.append(sdp)
119# complete_text += begin_page_txt + text + end_page_txt
120# count = count + 1
124 import queue as Queue
125 import threading
127 map_id_url = dict()
128 for photo in list_pngs:
129 map_id_url[list_pngs.index(photo)] = photo
131 q = Queue.Queue()
132 print(" queue : " + str(len(q.queue)))
133 sub_list_pids = []
134 threads = []
135 # VR 18-3-18 : pourquoi y a t-il // ?
136 each_thread = len(list_pngs) // nb_thread + 1
138 for j in range(0, nb_thread - 1):
139 sub_list_pids.append(list_pngs[j * each_thread:(j + 1) * each_thread])
141 sub_list_pids.append(list_pngs[(nb_thread - 1) * each_thread:])
143 # GW 170118 : creation du dossier pour télécharger les photos
144 if not os.path.exists("temp"):
145 os.mkdir("temp")
147 unique_local_name = "plop_voila_quoi"
149 print(" queue : " + str(len(q.queue)))
150# for i in range(0, nb_thread):
151 for i in range(nb_thread):
152 print(" queue : " + str(len(q.queue)))
153 thread_temp = threading.Thread(#target=download_sub,
154 target = test_parrallel_read_ocr,
155# target=sub_func_read_ocr,
156 # args=(sub_list_pids[i], map_id_url, unique_local_name, verbose, q)
157 args=(sub_list_pids[i], q, model, verbose, map_file_size, map_file_text,
158 folder_export_boxes, begin_page, end_page, file_output,
159 layer_api, vllm_model, request_used)
160 )
161 thread_temp.start()
162 threads.append(thread_temp)
163 print(" queue : " + str(len(q.queue)))
165 for item in threads:
166 item.join()
168 print(" queue : " + str(len(q.queue)))
170 # (map_subpids_path, map_subpids_extension, photo_id_missing)
171 map_pids_path = dict()
172 map_pids_extension = dict()
173 total_photo_id_missing = []
174 map_sdp = dict()
175 map_text = dict()
176 # list_pids_new = []
177 # map_photo_id_path_extension = {}
178 # list_photo_id_path_extension = []
179 # list_pid_missing = []
181 print(" queue : " + str(len(q.queue)))
183 while len(q.queue) > 0:
184 try:
185 item = q.get()
186 print(" parsing item[0] " + str(item[0][-10:-1]))
187 page_name = item[0]
188 if page_name == "XXXXXX":
189 print(" page_name == XXXXXX")
190 continue
191 list_info_page = os.path.basename(page_name).split("_")
192 if len(list_info_page) != 2:
193 print(" page_name != 2")
194# continue
195# page_number_as_string = list_info_page[1][0:-4]
196# page_number_as_int = int(page_number_as_string)
197# print(" page_number_as_int " + str(page_number_as_int))
198 # bug pour 10 et au dela (mais au 9-6-24 seul le cas 10 existe)
199 page_number = int(item[0][-5:-4])
200 print(" page_number " + str(page_number))
201 page_number_as_int = page_number
202 else :
203 page_number_as_string = list_info_page[1][0:-4]
204 page_number_as_int = int(page_number_as_string)
205 map_pids_path[page_number_as_int] = item[0]
206 map_sdp[page_number_as_int] = item[2]
207 map_text[page_number_as_int] = item[3]
208# if (len(item[0]) + len(item[1]) + len(item[2])) > 0:
209# map_pids_path = map_pids_path.update(item[0]) # merge_two_dicts(map_pids_path, item[0])
210# map_pids_extension = map_pids_extension.update(item[1]) #merge_two_dicts(map_pids_extension, item[1])
211# total_photo_id_missing.extend(item[2])
212# # list_pid_missing.extend(item[3])
213# # map_photo_id_path_extension[item[1]] = {"path" : item[0], "extension" : item[2]}
214 except Exception as e:
215 print("erreur in queue, sans doute des thread sans données à traité" + str(e))
217 q.task_done()
218 # TODO MG 13/04/18 : need to return list_pid_missing and handle it
219 # return list_filenames , list_pids_new, list_photo_id_path_extension
220 return map_pids_path, map_sdp, map_text