Coverage for lib/import_util/lib_import_retrieval/scripts/process_json/process_json.py: 91%
90 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-01-26 23:58 +0100
« prev ^ index » next coverage.py v7.9.1, created at 2026-01-26 23:58 +0100
1import uuid
2import json
3import argparse
4import asyncio
6from lib.import_util.lib_import_retrieval.models.models import Document, DocumentMetadata
8#from datastore.datastore import DataStore
9#from datastore.factory import get_datastore
11#from services.extract_metadata import extract_metadata_from_document
12#from services.pii_detection import screen_text_for_pii
14DOCUMENT_UPSERT_BATCH_SIZE = 50
18async def process_json_dump(
19 filepath: str,
20 custom_metadata: dict,
21 screen_for_pii: bool,
22 extract_metadata: bool,
23 datastore: dict, #DataStore : c'est un truc avec plein de @abstractmethod ! trop cool !
24 lpgss = None,
25 OPENAI_API_KEY = None,
26 PG_TABLE = "documents",
27 verbose = False
28) -> (int, str):
29 # load the json file as a list of dictionaries
30 with open(filepath) as json_file:
31 data = json.load(json_file)
33 total_nb_token, used_model = await process_json_dump_aux(data,
34 custom_metadata,
35 screen_for_pii,
36 extract_metadata,
37 datastore,
38 lpgss,
39 OPENAI_API_KEY,
40 PG_TABLE,verbose)
41 return total_nb_token, used_model
45async def process_json_dump_aux(
46 data: dict,
47 custom_metadata: dict,
48 screen_for_pii: bool,
49 extract_metadata: bool,
50 datastore: dict, #DataStore : c'est un truc avec plein de @abstractmethod ! trop cool !
51 lpgss = None,
52 OPENAI_API_KEY = None,
53 PG_TABLE = "documents",
54 verbose = False
55) -> (int, str):
56 total_nb_token = 0
57 used_model = ""
58 documents = []
59 skipped_items = []
60 # iterate over the data and create document objects
61 for item in data:
62 if len(documents) % 20 == 0:
63 print(f"Processed {len(documents)} documents")
65 try:
66 # get the id, text, source, source_id, url, created_at and author from the item
67 # use default values if not specified
68 id = item.get("id", None)
69 text = item.get("text", None)
70 source = item.get("source", None)
71 source_id = item.get("source_id", None)
72 url = item.get("url", None)
73 created_at = item.get("created_at", None)
74 author = item.get("author", None)
76 if not text:
77 print("No document text, skipping...")
78 continue
80 # create a metadata object with the source, source_id, url, created_at and author
81 metadata = DocumentMetadata(
82 source=source,
83 source_id=source_id,
84 url=url,
85 created_at=created_at,
86 author=author,
87 )
88 print("metadata: ", str(metadata))
90 # update metadata with custom values
91 for key, value in custom_metadata.items():
92 if hasattr(metadata, key):
93 setattr(metadata, key, value)
95 # screen for pii if requested
96# if screen_for_pii:
97# pii_detected = screen_text_for_pii(text)
98# # if pii detected, print a warning and skip the document
99# if pii_detected:
100# print("PII detected in document, skipping")
101# skipped_items.append(item) # add the skipped item to the list
102# continue
104# Juste une query completion et un parsing du résultat
106# # extract metadata if requested
107# if extract_metadata:
108# # extract metadata from the document text
109# extracted_metadata = extract_metadata_from_document(
110# f"Text: {text}; Metadata: {str(metadata)}"
111# )
112# # get a Metadata object from the extracted metadata
113# metadata = DocumentMetadata(**extracted_metadata)
114# metadata = DocumentMetadata()
116 # create a document object with the id or a random id, text and metadata
117 document = Document(
118 id=id or str(uuid.uuid4()),
119 text=text,
120 metadata=metadata,
121 )
122 documents.append(document)
123 except Exception as e:
124 # log the error and continue with the next item
125 print(f"Error processing {str(item)[:100]}: {e}")
126 skipped_items.append(item) # add the skipped item to the list
128 print("Then upserting documents: " + str(len(documents)))
130 # do this in batches, the upsert method already batches documents but this allows
131 # us to add more descriptive logging
132 for i in range(0, len(documents), DOCUMENT_UPSERT_BATCH_SIZE):
133 # Get the text of the chunks in the current batch
134 batch_documents = documents[i : i + DOCUMENT_UPSERT_BATCH_SIZE]
135 print(f"Upserting batch of {len(batch_documents)} documents, batch {i}")
136 if verbose :
137 print("documents: ", documents)
139 from lib.stockage.lib_pgvector import upsert
141# upsert(documents: List[Document], chunk_token_size: Optional[int] = None,
142# PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT,
143# verbose = False
144 res, nb_token, model = upsert(batch_documents,
145 lpgss = lpgss,
146 OPENAI_API_KEY = OPENAI_API_KEY,
147 PG_TABLE = PG_TABLE,
148 verbose = verbose)
149 print(" outside upsert : ")
150 if verbose:
151 print(" res : " + str(res))
152 total_nb_token += nb_token
153 if used_model == "":
154 used_model = model
155 if model != used_model:
156 print("ERROR ESTIMATINO COST")
158# await datastore.upsert(batch_documents)
160 # print the skipped items
161 print(f"Skipped {len(skipped_items)} items due to import failure or PII detection")
162 for item in skipped_items:
163 print(str(item)[:100])
165 return total_nb_token, used_model
167async def main():
168 # parse the command-line arguments
169 parser = argparse.ArgumentParser()
170 parser.add_argument("--filepath", required=True, help="The path to the json dump")
171 parser.add_argument(
172 "--custom_metadata",
173 default="{}",
174 help="A JSON string of key-value pairs to update the metadata of the documents",
175 )
176 parser.add_argument(
177 "--screen_for_pii",
178 default=False,
179 type=bool,
180 help="A boolean flag to indicate whether to try the PII detection function (using a language model)",
181 )
182 parser.add_argument(
183 "--extract_metadata",
184 default=False,
185 type=bool,
186 help="A boolean flag to indicate whether to try to extract metadata from the document (using a language model)",
187 )
188 parser.add_argument(
189 "--verbose",
190 default=False,
191 type=bool,
192 # action=store_true,
193 help="verbose",
194 )
195 parser.add_argument(
196 "--pg_table",
197 default="documents",
198 type=str,
199 help="pg_table : documents by default",
200 )
201 parser.add_argument(
202 "--project_id",
203 default=0,
204 type=int,
205 help="project_id",
206 )
208 args = parser.parse_args()
210 # get the arguments
211 filepath = args.filepath
212 custom_metadata = json.loads(args.custom_metadata)
213 screen_for_pii = args.screen_for_pii
214 extract_metadata = args.extract_metadata
215 verbose = args.verbose
216 project_id = args.project_id
218 pg_table = args.pg_table
220 # initialize the db instance once as a global variable
221# datastore = await get_datastore()
222 # process the json dump
223 datastore = None
225 from auth.lib_conf_system import lcs_global_singleton
226 (PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) = lcs_global_singleton.get_pg_conf()
227 OPENAI_API_KEY = lcs_global_singleton.get_openai_api_key()
229 from lib.stockage.lib_pg_safia_sys import LibPGSafiaSys
230 lpgss = LibPGSafiaSys(PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT)
232 total_nb_token, used_model = await process_json_dump(
233 filepath, custom_metadata, screen_for_pii, extract_metadata, datastore,
234 lpgss = lpgss,
235 OPENAI_API_KEY = OPENAI_API_KEY,
236 PG_TABLE = pg_table,
237 verbose = verbose
238 )
239 from lib.lib_safia_system import LibSafiaSystem
240# auth_type = "SCRIPT"
241 email_user = "admin@opio.fr"
242 lss = LibSafiaSystem(lib_user_data_internal=lpgss)
244 lss.update_project_costs(project_id, total_nb_token, used_model)
246if __name__ == "__main__":
247 asyncio.run(main())