Coverage for lib/import_util/lib_import_retrieval/scripts/process_json/process_json.py: 91%

90 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-01-26 23:58 +0100

1import uuid 

2import json 

3import argparse 

4import asyncio 

5 

6from lib.import_util.lib_import_retrieval.models.models import Document, DocumentMetadata 

7 

8#from datastore.datastore import DataStore 

9#from datastore.factory import get_datastore 

10 

11#from services.extract_metadata import extract_metadata_from_document 

12#from services.pii_detection import screen_text_for_pii 

13 

14DOCUMENT_UPSERT_BATCH_SIZE = 50 

15 

16 

17 

18async def process_json_dump( 

19 filepath: str, 

20 custom_metadata: dict, 

21 screen_for_pii: bool, 

22 extract_metadata: bool, 

23 datastore: dict, #DataStore : c'est un truc avec plein de @abstractmethod ! trop cool ! 

24 lpgss = None, 

25 OPENAI_API_KEY = None, 

26 PG_TABLE = "documents", 

27 verbose = False 

28) -> (int, str): 

29 # load the json file as a list of dictionaries 

30 with open(filepath) as json_file: 

31 data = json.load(json_file) 

32 

33 total_nb_token, used_model = await process_json_dump_aux(data, 

34 custom_metadata, 

35 screen_for_pii, 

36 extract_metadata, 

37 datastore, 

38 lpgss, 

39 OPENAI_API_KEY, 

40 PG_TABLE,verbose) 

41 return total_nb_token, used_model 

42 

43 

44 

45async def process_json_dump_aux( 

46 data: dict, 

47 custom_metadata: dict, 

48 screen_for_pii: bool, 

49 extract_metadata: bool, 

50 datastore: dict, #DataStore : c'est un truc avec plein de @abstractmethod ! trop cool ! 

51 lpgss = None, 

52 OPENAI_API_KEY = None, 

53 PG_TABLE = "documents", 

54 verbose = False 

55) -> (int, str): 

56 total_nb_token = 0 

57 used_model = "" 

58 documents = [] 

59 skipped_items = [] 

60 # iterate over the data and create document objects 

61 for item in data: 

62 if len(documents) % 20 == 0: 

63 print(f"Processed {len(documents)} documents") 

64 

65 try: 

66 # get the id, text, source, source_id, url, created_at and author from the item 

67 # use default values if not specified 

68 id = item.get("id", None) 

69 text = item.get("text", None) 

70 source = item.get("source", None) 

71 source_id = item.get("source_id", None) 

72 url = item.get("url", None) 

73 created_at = item.get("created_at", None) 

74 author = item.get("author", None) 

75 

76 if not text: 

77 print("No document text, skipping...") 

78 continue 

79 

80 # create a metadata object with the source, source_id, url, created_at and author 

81 metadata = DocumentMetadata( 

82 source=source, 

83 source_id=source_id, 

84 url=url, 

85 created_at=created_at, 

86 author=author, 

87 ) 

88 print("metadata: ", str(metadata)) 

89 

90 # update metadata with custom values 

91 for key, value in custom_metadata.items(): 

92 if hasattr(metadata, key): 

93 setattr(metadata, key, value) 

94 

95 # screen for pii if requested 

96# if screen_for_pii: 

97# pii_detected = screen_text_for_pii(text) 

98# # if pii detected, print a warning and skip the document 

99# if pii_detected: 

100# print("PII detected in document, skipping") 

101# skipped_items.append(item) # add the skipped item to the list 

102# continue 

103 

104# Juste une query completion et un parsing du résultat 

105 

106# # extract metadata if requested 

107# if extract_metadata: 

108# # extract metadata from the document text 

109# extracted_metadata = extract_metadata_from_document( 

110# f"Text: {text}; Metadata: {str(metadata)}" 

111# ) 

112# # get a Metadata object from the extracted metadata 

113# metadata = DocumentMetadata(**extracted_metadata) 

114# metadata = DocumentMetadata() 

115 

116 # create a document object with the id or a random id, text and metadata 

117 document = Document( 

118 id=id or str(uuid.uuid4()), 

119 text=text, 

120 metadata=metadata, 

121 ) 

122 documents.append(document) 

123 except Exception as e: 

124 # log the error and continue with the next item 

125 print(f"Error processing {str(item)[:100]}: {e}") 

126 skipped_items.append(item) # add the skipped item to the list 

127 

128 print("Then upserting documents: " + str(len(documents))) 

129 

130 # do this in batches, the upsert method already batches documents but this allows 

131 # us to add more descriptive logging 

132 for i in range(0, len(documents), DOCUMENT_UPSERT_BATCH_SIZE): 

133 # Get the text of the chunks in the current batch 

134 batch_documents = documents[i : i + DOCUMENT_UPSERT_BATCH_SIZE] 

135 print(f"Upserting batch of {len(batch_documents)} documents, batch {i}") 

136 if verbose : 

137 print("documents: ", documents) 

138 

139 from lib.stockage.lib_pgvector import upsert 

140 

141# upsert(documents: List[Document], chunk_token_size: Optional[int] = None, 

142# PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT, 

143# verbose = False 

144 res, nb_token, model = upsert(batch_documents, 

145 lpgss = lpgss, 

146 OPENAI_API_KEY = OPENAI_API_KEY, 

147 PG_TABLE = PG_TABLE, 

148 verbose = verbose) 

149 print(" outside upsert : ") 

150 if verbose: 

151 print(" res : " + str(res)) 

152 total_nb_token += nb_token 

153 if used_model == "": 

154 used_model = model 

155 if model != used_model: 

156 print("ERROR ESTIMATINO COST") 

157 

158# await datastore.upsert(batch_documents) 

159 

160 # print the skipped items 

161 print(f"Skipped {len(skipped_items)} items due to import failure or PII detection") 

162 for item in skipped_items: 

163 print(str(item)[:100]) 

164 

165 return total_nb_token, used_model 

166 

167async def main(): 

168 # parse the command-line arguments 

169 parser = argparse.ArgumentParser() 

170 parser.add_argument("--filepath", required=True, help="The path to the json dump") 

171 parser.add_argument( 

172 "--custom_metadata", 

173 default="{}", 

174 help="A JSON string of key-value pairs to update the metadata of the documents", 

175 ) 

176 parser.add_argument( 

177 "--screen_for_pii", 

178 default=False, 

179 type=bool, 

180 help="A boolean flag to indicate whether to try the PII detection function (using a language model)", 

181 ) 

182 parser.add_argument( 

183 "--extract_metadata", 

184 default=False, 

185 type=bool, 

186 help="A boolean flag to indicate whether to try to extract metadata from the document (using a language model)", 

187 ) 

188 parser.add_argument( 

189 "--verbose", 

190 default=False, 

191 type=bool, 

192 # action=store_true, 

193 help="verbose", 

194 ) 

195 parser.add_argument( 

196 "--pg_table", 

197 default="documents", 

198 type=str, 

199 help="pg_table : documents by default", 

200 ) 

201 parser.add_argument( 

202 "--project_id", 

203 default=0, 

204 type=int, 

205 help="project_id", 

206 ) 

207 

208 args = parser.parse_args() 

209 

210 # get the arguments 

211 filepath = args.filepath 

212 custom_metadata = json.loads(args.custom_metadata) 

213 screen_for_pii = args.screen_for_pii 

214 extract_metadata = args.extract_metadata 

215 verbose = args.verbose 

216 project_id = args.project_id 

217 

218 pg_table = args.pg_table 

219 

220 # initialize the db instance once as a global variable 

221# datastore = await get_datastore() 

222 # process the json dump 

223 datastore = None 

224 

225 from auth.lib_conf_system import lcs_global_singleton 

226 (PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) = lcs_global_singleton.get_pg_conf() 

227 OPENAI_API_KEY = lcs_global_singleton.get_openai_api_key() 

228 

229 from lib.stockage.lib_pg_safia_sys import LibPGSafiaSys 

230 lpgss = LibPGSafiaSys(PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) 

231 

232 total_nb_token, used_model = await process_json_dump( 

233 filepath, custom_metadata, screen_for_pii, extract_metadata, datastore, 

234 lpgss = lpgss, 

235 OPENAI_API_KEY = OPENAI_API_KEY, 

236 PG_TABLE = pg_table, 

237 verbose = verbose 

238 ) 

239 from lib.lib_safia_system import LibSafiaSystem 

240# auth_type = "SCRIPT" 

241 email_user = "admin@opio.fr" 

242 lss = LibSafiaSystem(lib_user_data_internal=lpgss) 

243 

244 lss.update_project_costs(project_id, total_nb_token, used_model) 

245 

246if __name__ == "__main__": 

247 asyncio.run(main())