Coverage for lib/stockage/lib_pgvector.py: 87%

181 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-10 01:10 +0100

1 

2 

3# --debug --job=safia.query --query_safia="Identifie combien de personnes sont présente dans le reve de Victor." -v 

4 

5import os 

6from typing import Any, List, Dict 

7from lib.import_util.lib_import_retrieval.models.models import DocumentChunk 

8 

9from typing import Optional 

10from datetime import datetime 

11import numpy as np 

12 

13from psycopg2 import connect 

14from psycopg2.extras import DictCursor 

15from pgvector.psycopg2 import register_vector 

16 

17import arrow 

18 

19 

20def to_unix_timestamp(date_str: str) -> int: 

21 """ 

22 Convert a date string to a unix timestamp (seconds since epoch). 

23 

24 Args: 

25 date_str: The date string to convert. 

26 

27 Returns: 

28 The unix timestamp corresponding to the date string. 

29 

30 If the date string cannot be parsed as a valid date format, returns the current unix timestamp and prints a warning. 

31 """ 

32 # Try to parse the date string using arrow, which supports many common date formats 

33 try: 

34 date_obj = arrow.get(date_str) 

35 return int(date_obj.timestamp()) 

36 except arrow.parser.ParserError: 

37 # If the parsing fails, return the current unix timestamp and print a warning 

38 print(f"Invalid date format: {date_str}") 

39 return int(arrow.now().timestamp()) 

40 

41 

42#from datastore.providers.pgvector_datastore import PGClient, PgVectorDataStore 

43#from models.models import ( DocumentMetadataFilter,) 

44#from services.date import to_unix_timestamp 

45 

46# Not yet uesd 

47PG_HOST_env = os.environ.get("PG_HOST", "localhost") 

48PG_PORT_env = int(os.environ.get("PG_PORT", 5432)) 

49PG_DB_env = os.environ.get("PG_DB", "postgres") 

50PG_USER_env = os.environ.get("PG_USER", "postgres") 

51PG_PASSWORD_env = os.environ.get("PG_PASSWORD", "postgres") 

52 

53# class that implements the DataStore interface for Postgres Datastore provider 

54# DAns le system safia layer abstract, cette embryon de classe devrait s'appeler LayerVectorSearchEnginePostgres 

55# On pourrait construire le client à partir des paramètres système PG_DB ou en mettant dans le complete_param_json (whole_context system, user, exec/query) 

56# Et avoir comme fonction : 

57# - record/upsert => make sense upsert ! 

58# - search 

59# - initialize 

60# - export/import optionnel 

61# Et pour fvs à quoi cela correspond ? 

62# - init create 

63# Et on veut une table 1-1 project <-> param_vector_search_engine ou bien avec un champ json pour etre générique (à voir) 

64#class PostgresDataStore(PgVectorDataStore): 

65class PostgresDataStore(): 

66 def create_db_client(self): 

67 return PostgresClient() 

68 

69#class PostgresClient(PGClient): 

70class PostgresClient(): 

71 

72 # Utilise le client de pg_safia_sys, on peut utilement garder ces classes dans un fichier différent 

73 def __init__(self, the_client): 

74 super().__init__() 

75 print("CONNECT TO DB with " + str(the_client)) 

76 self.client = the_client 

77 

78 register_vector(self.client) 

79 

80 def rpc(self, function_name: str, params: dict[str, Any]): 

81 """ 

82 Calls a stored procedure in the database with the given parameters. 

83 """ 

84 data = [] 

85# print("pg rpc") 

86 params["in_embedding"] = np.array(params["in_embedding"]) 

87# print(" params : " + str(params)) 

88 with self.client.cursor(cursor_factory=DictCursor) as cur: 

89 cur.callproc(function_name, params) 

90 rows = cur.fetchall() 

91 self.client.commit() 

92 for row in rows: 

93 # row["created_at"] = to_unix_timestamp(row["created_at"]) 

94 data.append(dict(row)) 

95 return data 

96 

97 #async 

98 def upsert_(self, table: str, chunks: Dict[str, List[DocumentChunk]], verbose : bool = False) -> List[str]: 

99 """ 

100 Takes in a dict of document_ids to list of document chunks and inserts them into the database. 

101 Return a list of document ids. 

102 """ 

103 for document_id, document_chunks in chunks.items(): 

104 for chunk in document_chunks: 

105 chunk_text_replace = chunk.text.replace("\x00", "\uFFFD") # s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") 

106 

107 if chunk.text != chunk_text_replace: 

108 print(" Text was replaced like x00 by uFFFD ") 

109 else : 

110 print("No replacement !") 

111 

112 if verbose: 

113 print(str(chunk.text)) 

114 print(str(chunk_text_replace)) 

115 

116 json = { 

117 "id": chunk.id, 

118 "content": chunk_text_replace, # s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") 

119 

120 "embedding": chunk.embedding, 

121 "document_id": document_id, 

122 "source": chunk.metadata.source, 

123 "source_id": chunk.metadata.source_id, 

124 "url": chunk.metadata.url, 

125 "author": chunk.metadata.author, 

126 } 

127 if chunk.metadata.created_at: 

128 json["created_at"] = ( 

129 datetime.fromtimestamp( 

130 to_unix_timestamp(chunk.metadata.created_at) 

131 ), 

132 ) 

133 try: 

134 self.upsert(table, json) 

135 except Exception as e: 

136 print(str(e)) 

137 print("Exception with this document_id : " + str(document_id)) 

138 

139 return list(chunks.keys()) 

140 

141# async  

142 def upsert(self, table: str, json: dict[str, Any]): 

143 """ 

144 Takes in a list of documents and inserts them into the table. 

145 """ 

146 print("pg upsert in " + str(table)) 

147 with self.client.cursor() as cur: 

148 if not json.get("created_at"): 

149 json["created_at"] = datetime.now() 

150 json["embedding"] = np.array(json["embedding"]) 

151 cur.execute( 

152 f"INSERT INTO {table} (id, content, embedding, document_id, source, source_id, url, author, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET content = %s, embedding = %s, document_id = %s, source = %s, source_id = %s, url = %s, author = %s, created_at = %s", 

153 ( 

154 json["id"], 

155 json["content"], 

156 json["embedding"], 

157 json["document_id"], 

158 json["source"], 

159 json["source_id"], 

160 json["url"], 

161 json["author"], 

162 json["created_at"], 

163 json["content"], 

164 json["embedding"], 

165 json["document_id"], 

166 json["source"], 

167 json["source_id"], 

168 json["url"], 

169 json["author"], 

170 json["created_at"], 

171 ), 

172 ) 

173 self.client.commit() 

174 

175 

176 

177def find_docs(embedding_input, 

178 lib_data, 

179 function = "match_page_sections", 

180 in_match_count = 5, 

181 verbose = False): 

182 

183 one_embedding = embedding_input[0][0] 

184 

185 params = { 

186 "in_embedding": one_embedding, 

187 "in_match_count": in_match_count 

188 } 

189# function = "match_page_sections" 

190 

191# Dans quelle partie se trouve du code à nettoyer ? 

192 

193 pgc = PostgresClient(lib_data.client) 

194 result = pgc.rpc(function, params) 

195 

196 return result 

197 

198from abc import ABC, abstractmethod 

199import asyncio 

200 

201from lib.import_util.lib_import_retrieval.models.models import ( 

202 Document, 

203 DocumentChunk, 

204 DocumentChunkMetadata, 

205 DocumentMetadataFilter, 

206 Query, 

207 QueryResult, 

208 QueryWithEmbedding, 

209) 

210 

211def upsert( 

212 documents: List[Document], chunk_token_size: Optional[int] = None, 

213 lpgss = None, 

214 OPENAI_API_KEY = None, PG_TABLE = "documents", 

215 verbose = False 

216 ) -> (List[str], int, str): 

217 """ 

218 Takes in a list of documents and inserts them into the database. 

219 First deletes all the existing vectors with the document id (if necessary, depends on the vector db), then inserts the new ones. 

220 Return a list of document ids. 

221 """ 

222 # Delete any existing vectors for documents with the input document ids 

223 

224# VR TODO 31-5-23 : needs delete also ! 

225# await asyncio.gather( 

226# *[ 

227# delete( 

228# filter=DocumentMetadataFilter( 

229# document_id=document.id, 

230# ), 

231# delete_all=False, 

232# ) 

233# for document in documents 

234# if document.id 

235# ] 

236# ) 

237 

238 chunks, total_nb_token, used_model = get_document_chunks(documents, chunk_token_size, OPENAI_API_KEY, verbose = verbose) 

239 

240 pgc = PostgresClient(lpgss.client) 

241 

242# pgc = PostgresClient(PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT) 

243 

244 res = pgc.upsert_(PG_TABLE, chunks) 

245 return res, total_nb_token, used_model 

246 

247 

248 

249# Constants 

250CHUNK_SIZE = 200 # The target size of each text chunk in tokens 

251MIN_CHUNK_SIZE_CHARS = 350 # The minimum size of each text chunk in characters 

252MIN_CHUNK_LENGTH_TO_EMBED = 5 # Discard chunks shorter than this 

253EMBEDDINGS_BATCH_SIZE = int(os.environ.get("OPENAI_EMBEDDING_BATCH_SIZE", 128)) # The number of embeddings to request at a time 

254MAX_NUM_CHUNKS = 10000 # The maximum number of chunks to generate from a text 

255MIN_NB_TOKEN_CHUNK = MIN_CHUNK_LENGTH_TO_EMBED 

256 

257 

258import tiktoken 

259 

260#from services.openai import get_embeddings 

261from lib.lib_openai import get_embeddings, get_embeddingss 

262 

263 

264 

265# Global variables 

266tokenizer = tiktoken.get_encoding( 

267 "cl100k_base" 

268) # The encoding scheme to use for tokenization 

269 

270 

271def get_text_chunks(text: str, chunk_token_size: Optional[int], OPENAI_API_KEY = None) -> List[str]: 

272 """ 

273 Split a text into chunks of ~CHUNK_SIZE tokens, based on punctuation and newline boundaries. 

274 

275 Args: 

276 text: The text to split into chunks. 

277 chunk_token_size: The target size of each chunk in tokens, or None to use the default CHUNK_SIZE. 

278 

279 Returns: 

280 A list of text chunks, each of which is a string of ~CHUNK_SIZE tokens. 

281 """ 

282 # Return an empty list if the text is empty or whitespace 

283 if not text or text.isspace(): 

284 return [] 

285 

286 # Tokenize the text 

287 tokens = tokenizer.encode(text, disallowed_special=()) 

288 

289 # Initialize an empty list of chunks 

290 chunks = [] 

291 

292 # Use the provided chunk token size or the default one 

293 chunk_size = chunk_token_size or CHUNK_SIZE 

294 

295 # Initialize a counter for the number of chunks 

296 num_chunks = 0 

297 

298 # Loop until all tokens are consumed 

299 while tokens and num_chunks < MAX_NUM_CHUNKS: 

300 # Take the first chunk_size tokens as a chunk 

301 chunk = tokens[:chunk_size] 

302 

303 # s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD") 

304 

305 # Decode the chunk into text 

306 chunk_text = tokenizer.decode(chunk) 

307 

308 # Skip the chunk if it is empty or whitespace 

309 if not chunk_text or chunk_text.isspace(): 

310 # Remove the tokens corresponding to the chunk text from the remaining tokens 

311 tokens = tokens[len(chunk) :] 

312 # Continue to the next iteration of the loop 

313 continue 

314 

315 # Find the last period or punctuation mark in the chunk 

316 last_punctuation = max( 

317 chunk_text.rfind("."), 

318 chunk_text.rfind("?"), 

319 chunk_text.rfind("!"), 

320 chunk_text.rfind("\n"), 

321 ) 

322 

323 # If there is a punctuation mark, and the last punctuation index is before MIN_CHUNK_SIZE_CHARS 

324 if last_punctuation != -1 and last_punctuation > MIN_CHUNK_SIZE_CHARS: 

325 # Truncate the chunk text at the punctuation mark 

326 chunk_text = chunk_text[: last_punctuation + 1] 

327 

328 # Remove any newline characters and strip any leading or trailing whitespace 

329 # TODO VR 2-7 to avoid saving original document somewhere else ! 

330 # chunk_text_to_append = chunk_text.replace("\n", " ").strip() 

331 

332 # TODO VR 2-7 and we want all the data ! 

333 # if len(chunk_text) > MIN_CHUNK_LENGTH_TO_EMBED: 

334 # Append the chunk text to the list of chunks 

335 

336 # Remove the tokens corresponding to the chunk text from the remaining tokens 

337 # il faudrait au moins rajouter tout ce qui est différent de chunk_text_to_append 

338 chunk_token_aux = tokenizer.encode(chunk_text, disallowed_special=()) 

339 nb_same_chunk = 0 

340 for i in range(len(chunk_token_aux)): 

341 if chunk_token_aux[i] == chunk[i]: 

342 nb_same_chunk += 1 

343 else: 

344 break 

345 

346 if nb_same_chunk < MIN_NB_TOKEN_CHUNK: 

347 print(" Will occur with pb if it was at the end !") 

348 nb_same_chunk = MIN_NB_TOKEN_CHUNK 

349 

350 if nb_same_chunk < len(chunk_token_aux): 

351 print("Difference IN CHUNK TOKENIZATION treated as warning but may be harmful !") 

352 chunk_text = tokenizer.decode(chunk[:nb_same_chunk]) 

353 

354 nb_same_chunk_after_endecode = len(tokenizer.encode(chunk_text, disallowed_special=())) 

355 if nb_same_chunk < len(chunk_token_aux) and nb_same_chunk_after_endecode == nb_same_chunk: 

356 print("We should be safe by now") 

357 

358 tokens = tokens[nb_same_chunk_after_endecode :] 

359 

360 chunks.append(chunk_text) 

361 

362 # Increment the number of chunks 

363 num_chunks += 1 

364 

365 # Handle the remaining tokens 

366 if tokens: 

367 remaining_text = tokenizer.decode(tokens).replace("\n", " ").strip() 

368 if len(remaining_text) > MIN_CHUNK_LENGTH_TO_EMBED: 

369 chunks.append(remaining_text) 

370 

371 return chunks 

372 

373 

374from typing import Tuple # Dict, List, Optional,  

375 

376def create_document_chunks( 

377 doc: Document, chunk_token_size: Optional[int], OPENAI_API_KEY = None 

378) -> Tuple[List[DocumentChunk], str]: 

379 """ 

380 Create a list of document chunks from a document object and return the document id. 

381 

382 Args: 

383 doc: The document object to create chunks from. It should have a text attribute and optionally an id and a metadata attribute. 

384 chunk_token_size: The target size of each chunk in tokens, or None to use the default CHUNK_SIZE. 

385 

386 Returns: 

387 A tuple of (doc_chunks, doc_id), where doc_chunks is a list of document chunks, each of which is a DocumentChunk object with an id, a document_id, a text, and a metadata attribute, 

388 and doc_id is the id of the document object, generated if not provided. The id of each chunk is generated from the document id and a sequential number, and the metadata is copied from the document object. 

389 """ 

390 # Check if the document text is empty or whitespace 

391 if not doc.text or doc.text.isspace(): 

392 return [], doc.id or str(uuid.uuid4()) 

393 

394 # Generate a document id if not provided 

395 doc_id = doc.id or str(uuid.uuid4()) 

396 

397 # Split the document text into chunks 

398 text_chunks = get_text_chunks(doc.text, chunk_token_size, OPENAI_API_KEY = OPENAI_API_KEY) 

399 

400 metadata = ( 

401 DocumentChunkMetadata(**doc.metadata.__dict__) 

402 if doc.metadata is not None 

403 else DocumentChunkMetadata() 

404 ) 

405 

406 metadata.document_id = doc_id 

407 

408 # Initialize an empty list of chunks for this document 

409 doc_chunks = [] 

410 

411 # Assign each chunk a sequential number and create a DocumentChunk object 

412 for i, text_chunk in enumerate(text_chunks): 

413 chunk_id = f"{doc_id}_{i}" 

414 doc_chunk = DocumentChunk( 

415 id=chunk_id, 

416 text=text_chunk, 

417 metadata=metadata, 

418 ) 

419 # Append the chunk object to the list of chunks for this document 

420 doc_chunks.append(doc_chunk) 

421 

422 # Return the list of chunks and the document id 

423 return doc_chunks, doc_id 

424 

425 

426 

427def get_document_chunks( 

428 documents: List[Document], chunk_token_size: Optional[int], OPENAI_API_KEY = None, verbose = False 

429) -> Dict[str, List[DocumentChunk]]: 

430 """ 

431 Convert a list of documents into a dictionary from document id to list of document chunks. 

432 

433 Args: 

434 documents: The list of documents to convert. 

435 chunk_token_size: The target size of each chunk in tokens, or None to use the default CHUNK_SIZE. 

436 

437 Returns: 

438 A dictionary mapping each document id to a list of document chunks, each of which is a DocumentChunk object 

439 with text, metadata, and embedding attributes. 

440 """ 

441 # Initialize an empty dictionary of lists of chunks 

442 chunks: Dict[str, List[DocumentChunk]] = {} 

443 

444 # Initialize an empty list of all chunks 

445 all_chunks: List[DocumentChunk] = [] 

446 

447 # Loop over each document and create chunks 

448 for doc in documents: 

449 doc_chunks, doc_id = create_document_chunks(doc, chunk_token_size, OPENAI_API_KEY = OPENAI_API_KEY) 

450 

451 # Append the chunks for this document to the list of all chunks 

452 all_chunks.extend(doc_chunks) 

453 

454 # Add the list of chunks for this document to the dictionary with the document id as the key 

455 chunks[doc_id] = doc_chunks 

456 

457 # Check if there are no chunks 

458 if not all_chunks: 

459 return {} 

460 

461 total_nb_token = 0 

462 used_model = "" 

463 

464 # Get all the embeddings for the document chunks in batches, using get_embeddings 

465 embeddings: List[List[float]] = [] 

466 for i in range(0, len(all_chunks), EMBEDDINGS_BATCH_SIZE): 

467 # Get the text of the chunks in the current batch 

468 batch_texts = [ 

469 chunk.text for chunk in all_chunks[i : i + EMBEDDINGS_BATCH_SIZE] 

470 ] 

471 

472 if verbose: 

473 print(" batch_texts : " + str(batch_texts)) 

474 

475 if OPENAI_API_KEY != None: 

476 # Get the embeddings for the batch texts 

477 batch_embeddings, nb_token, model = get_embeddingss(batch_texts, openai_token = OPENAI_API_KEY, verbose = verbose) 

478 else: 

479 print(" Document not indexed et voila ! => a lot of refacto to do to have a nice interface for all of this !") 

480 batch_embeddings = [[] for i in range(len(batch_texts)) ] 

481 nb_token = 0 

482 model = "no_embeddings" 

483 

484 total_nb_token += nb_token 

485 if used_model == "": 

486 used_model = model 

487 if model != used_model: 

488 print("ERROR IN COST ESTIMATION (TREATED AS WARNING, USING FIRST MODEL)") 

489 

490 if verbose: 

491 print(" batch_embeddings : " + str(batch_texts)) 

492 

493 # Append the batch embeddings to the embeddings list 

494 embeddings.extend(batch_embeddings) 

495 

496 print(" LEN embeddings : " + str(len(embeddings))) 

497 if verbose : 

498 if len(embeddings) > 0: 

499 print(" one begin " + str(embeddings[0][:10])) 

500 

501 # Update the document chunk objects with the embeddings 

502 for i, chunk in enumerate(all_chunks): 

503 # Assign the embedding from the embeddings list to the chunk object 

504 chunk.embedding = embeddings[i] 

505 

506 return chunks, total_nb_token, used_model 

507 

508 

509