Coverage for lib/datou/lib_parallel.py: 92%

112 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-01-26 23:58 +0100

1 

2 

3 

4import os 

5 

6from lib.datou.lib_datou_step_template import aux_map_reduce_loop 

7def datou_parallel_aux(list_texts_aux, res_json_field, 

8 aux_input_var, list_steps, list_param_json_steps, 

9 param_json, verbose, privacy, with_audit, strat_reduce, hit_main, id_step_incomplete_args, i, q): 

10 list_audit_map_reduce = [] 

11 reduced_result = "" 

12 reduced_result = aux_map_reduce_loop(list_texts_aux, res_json_field, aux_input_var, list_steps, list_param_json_steps, 

13 param_json, 

14 verbose, privacy, with_audit, strat_reduce, reduced_result, 

15 list_audit_map_reduce, hit_main, id_step_incomplete_args) 

16 q.put((reduced_result, list_audit_map_reduce, i)) 

17 

18def datou_parallel_map_reduce(list_texts, res_json_field, 

19 aux_input_var, list_steps, list_param_json_steps, param_json, verbose, 

20 privacy, with_audit, strat_reduce, 

21 nb_thread = 10, 

22 hit_main = "unknown_hit", 

23 id_step_incomplete_args = None): 

24 

25 import queue as Queue 

26 import threading 

27 

28 q = Queue.Queue() 

29 print(" queue : " + str(len(q.queue))) 

30 sub_list_pids = [] 

31 threads = [] 

32 # VR 18-3-18 : pourquoi y a t-il // ? 

33 each_thread = len(list_texts) // nb_thread + 1 

34 

35 for j in range(0, nb_thread - 1): 

36 sub_list_pids.append(list_texts[j * each_thread:(j + 1) * each_thread]) 

37 

38 sub_list_pids.append(list_texts[(nb_thread - 1) * each_thread:]) 

39 

40 print(" queue : " + str(len(q.queue))) 

41 # for i in range(0, nb_thread): 

42 for i in range(nb_thread): 

43 print(" queue : " + str(len(q.queue))) 

44 thread_temp = threading.Thread( 

45 target=datou_parallel_aux, 

46 args=(sub_list_pids[i], res_json_field, 

47 aux_input_var, list_steps, list_param_json_steps, 

48 param_json, verbose, privacy, with_audit, strat_reduce, hit_main, id_step_incomplete_args, i, q) 

49 ) 

50 thread_temp.start() 

51 threads.append(thread_temp) 

52 print(" queue : " + str(len(q.queue))) 

53 

54 for item in threads: 

55 item.join() 

56 

57 print(" queue : " + str(len(q.queue))) 

58 map_reduced_result = {} 

59 

60 reduced_result = "" 

61 list_audit_map_reduce = [] 

62 

63 while len(q.queue) > 0: 

64 try: 

65 item = q.get() 

66# page_number = int(item[0][-5:-4]) 

67 reduced_result_aux = item[0] 

68 list_audit_map_reduce_aux = item[1] 

69 i = item[2] 

70 map_reduced_result[i] = reduced_result_aux 

71 list_audit_map_reduce.append(list_audit_map_reduce_aux) 

72 except Exception as e: 

73 print(str(e)) 

74 

75 q.task_done() 

76 for i in range(nb_thread): 

77 reduced_result += map_reduced_result[i] 

78 print(" DOIT ON FAIRE PAREIL POUR LES list_audit_map_reduce ??") 

79 

80 return reduced_result, list_audit_map_reduce 

81 

82def test_parrallel_read_ocr(sub_list_pids, q, model, verbose, map_file_size, map_file_text, 

83 folder_export_boxes, begin_page, end_page, file_output, 

84 layer_api = None, vllm_model = None, request_used = None): 

85 from lib.datou.lib_datou_step_template import sub_func_read_ocr 

86 count = 0 

87 for f in sub_list_pids: 

88 print(f) 

89 count, text, sdp = sub_func_read_ocr(f, count, model, verbose, map_file_size, map_file_text, 

90 folder_export_boxes, begin_page, end_page, file_output, 

91 layer_api = layer_api, vllm_model = vllm_model, request_used = request_used) 

92 q.put((f, "some result vraiment bien", sdp, text, count)) 

93 if len(sub_list_pids) == 0: 

94# else: 

95 q.put(("XXXXXX", "some result vraiment bien", None, None, count)) 

96 

97 

98def multi_thread_image_read(model, verbose, map_file_size, map_file_text, 

99 folder_export_boxes, begin_page, end_page, file_output, 

100 nb_thread = 10, 

101 list_pngs = [], 

102 layer_api=None, vllm_model=None, 

103 request_used = None): 

104 from lib.datou.lib_datou_step_template import sub_func_read_ocr 

105 

106# IN list_pngs, count, model, verbose, map_file_size, map_file_text, folder_export_boxes, 

107# # begin_page, end_page, file_output 

108# OUT list_page_content, complete_text, count 

109 

110 

111 

112# for f in list_pngs: 

113# if limit > 0 and count > limit: 

114# break 

115# count, text, sdp = sub_func_read_ocr(f, count, model, verbose, map_file_size, map_file_text, folder_export_boxes, 

116# begin_page, end_page, file_output) 

117# 

118# list_page_content.append(sdp) 

119# complete_text += begin_page_txt + text + end_page_txt 

120# count = count + 1 

121 

122 

123 

124 import queue as Queue 

125 import threading 

126 

127 map_id_url = dict() 

128 for photo in list_pngs: 

129 map_id_url[list_pngs.index(photo)] = photo 

130 

131 q = Queue.Queue() 

132 print(" queue : " + str(len(q.queue))) 

133 sub_list_pids = [] 

134 threads = [] 

135 # VR 18-3-18 : pourquoi y a t-il // ? 

136 each_thread = len(list_pngs) // nb_thread + 1 

137 

138 for j in range(0, nb_thread - 1): 

139 sub_list_pids.append(list_pngs[j * each_thread:(j + 1) * each_thread]) 

140 

141 sub_list_pids.append(list_pngs[(nb_thread - 1) * each_thread:]) 

142 

143 # GW 170118 : creation du dossier pour télécharger les photos 

144 if not os.path.exists("temp"): 

145 os.mkdir("temp") 

146 

147 unique_local_name = "plop_voila_quoi" 

148 

149 print(" queue : " + str(len(q.queue))) 

150# for i in range(0, nb_thread): 

151 for i in range(nb_thread): 

152 print(" queue : " + str(len(q.queue))) 

153 thread_temp = threading.Thread(#target=download_sub, 

154 target = test_parrallel_read_ocr, 

155# target=sub_func_read_ocr, 

156 # args=(sub_list_pids[i], map_id_url, unique_local_name, verbose, q) 

157 args=(sub_list_pids[i], q, model, verbose, map_file_size, map_file_text, 

158 folder_export_boxes, begin_page, end_page, file_output, 

159 layer_api, vllm_model, request_used) 

160 ) 

161 thread_temp.start() 

162 threads.append(thread_temp) 

163 print(" queue : " + str(len(q.queue))) 

164 

165 for item in threads: 

166 item.join() 

167 

168 print(" queue : " + str(len(q.queue))) 

169 

170 # (map_subpids_path, map_subpids_extension, photo_id_missing) 

171 map_pids_path = dict() 

172 map_pids_extension = dict() 

173 total_photo_id_missing = [] 

174 map_sdp = dict() 

175 map_text = dict() 

176 # list_pids_new = [] 

177 # map_photo_id_path_extension = {} 

178 # list_photo_id_path_extension = [] 

179 # list_pid_missing = [] 

180 

181 print(" queue : " + str(len(q.queue))) 

182 

183 while len(q.queue) > 0: 

184 try: 

185 item = q.get() 

186 print(" parsing item[0] " + str(item[0][-10:-1])) 

187 page_name = item[0] 

188 if page_name == "XXXXXX": 

189 print(" page_name == XXXXXX") 

190 continue 

191 list_info_page = os.path.basename(page_name).split("_") 

192 if len(list_info_page) != 2: 

193 print(" page_name != 2") 

194# continue 

195# page_number_as_string = list_info_page[1][0:-4] 

196# page_number_as_int = int(page_number_as_string) 

197# print(" page_number_as_int " + str(page_number_as_int)) 

198 # bug pour 10 et au dela (mais au 9-6-24 seul le cas 10 existe) 

199 page_number = int(item[0][-5:-4]) 

200 print(" page_number " + str(page_number)) 

201 page_number_as_int = page_number 

202 else : 

203 page_number_as_string = list_info_page[1][0:-4] 

204 page_number_as_int = int(page_number_as_string) 

205 map_pids_path[page_number_as_int] = item[0] 

206 map_sdp[page_number_as_int] = item[2] 

207 map_text[page_number_as_int] = item[3] 

208# if (len(item[0]) + len(item[1]) + len(item[2])) > 0: 

209# map_pids_path = map_pids_path.update(item[0]) # merge_two_dicts(map_pids_path, item[0]) 

210# map_pids_extension = map_pids_extension.update(item[1]) #merge_two_dicts(map_pids_extension, item[1]) 

211# total_photo_id_missing.extend(item[2]) 

212# # list_pid_missing.extend(item[3]) 

213# # map_photo_id_path_extension[item[1]] = {"path" : item[0], "extension" : item[2]} 

214 except Exception as e: 

215 print("erreur in queue, sans doute des thread sans données à traité" + str(e)) 

216 

217 q.task_done() 

218 # TODO MG 13/04/18 : need to return list_pid_missing and handle it 

219 # return list_filenames , list_pids_new, list_photo_id_path_extension 

220 return map_pids_path, map_sdp, map_text