Coverage for lib/stockage/lib_pgvector.py: 87%
181 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-10 01:10 +0100
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-10 01:10 +0100
3# --debug --job=safia.query --query_safia="Identifie combien de personnes sont présente dans le reve de Victor." -v
5import os
6from typing import Any, List, Dict
7from lib.import_util.lib_import_retrieval.models.models import DocumentChunk
9from typing import Optional
10from datetime import datetime
11import numpy as np
13from psycopg2 import connect
14from psycopg2.extras import DictCursor
15from pgvector.psycopg2 import register_vector
17import arrow
20def to_unix_timestamp(date_str: str) -> int:
21 """
22 Convert a date string to a unix timestamp (seconds since epoch).
24 Args:
25 date_str: The date string to convert.
27 Returns:
28 The unix timestamp corresponding to the date string.
30 If the date string cannot be parsed as a valid date format, returns the current unix timestamp and prints a warning.
31 """
32 # Try to parse the date string using arrow, which supports many common date formats
33 try:
34 date_obj = arrow.get(date_str)
35 return int(date_obj.timestamp())
36 except arrow.parser.ParserError:
37 # If the parsing fails, return the current unix timestamp and print a warning
38 print(f"Invalid date format: {date_str}")
39 return int(arrow.now().timestamp())
42#from datastore.providers.pgvector_datastore import PGClient, PgVectorDataStore
43#from models.models import ( DocumentMetadataFilter,)
44#from services.date import to_unix_timestamp
46# Not yet uesd
47PG_HOST_env = os.environ.get("PG_HOST", "localhost")
48PG_PORT_env = int(os.environ.get("PG_PORT", 5432))
49PG_DB_env = os.environ.get("PG_DB", "postgres")
50PG_USER_env = os.environ.get("PG_USER", "postgres")
51PG_PASSWORD_env = os.environ.get("PG_PASSWORD", "postgres")
53# class that implements the DataStore interface for Postgres Datastore provider
54# DAns le system safia layer abstract, cette embryon de classe devrait s'appeler LayerVectorSearchEnginePostgres
55# On pourrait construire le client à partir des paramètres système PG_DB ou en mettant dans le complete_param_json (whole_context system, user, exec/query)
56# Et avoir comme fonction :
57# - record/upsert => make sense upsert !
58# - search
59# - initialize
60# - export/import optionnel
61# Et pour fvs à quoi cela correspond ?
62# - init create
63# Et on veut une table 1-1 project <-> param_vector_search_engine ou bien avec un champ json pour etre générique (à voir)
64#class PostgresDataStore(PgVectorDataStore):
65class PostgresDataStore():
66 def create_db_client(self):
67 return PostgresClient()
69#class PostgresClient(PGClient):
70class PostgresClient():
72 # Utilise le client de pg_safia_sys, on peut utilement garder ces classes dans un fichier différent
73 def __init__(self, the_client):
74 super().__init__()
75 print("CONNECT TO DB with " + str(the_client))
76 self.client = the_client
78 register_vector(self.client)
80 def rpc(self, function_name: str, params: dict[str, Any]):
81 """
82 Calls a stored procedure in the database with the given parameters.
83 """
84 data = []
85# print("pg rpc")
86 params["in_embedding"] = np.array(params["in_embedding"])
87# print(" params : " + str(params))
88 with self.client.cursor(cursor_factory=DictCursor) as cur:
89 cur.callproc(function_name, params)
90 rows = cur.fetchall()
91 self.client.commit()
92 for row in rows:
93 # row["created_at"] = to_unix_timestamp(row["created_at"])
94 data.append(dict(row))
95 return data
97 #async
98 def upsert_(self, table: str, chunks: Dict[str, List[DocumentChunk]], verbose : bool = False) -> List[str]:
99 """
100 Takes in a dict of document_ids to list of document chunks and inserts them into the database.
101 Return a list of document ids.
102 """
103 for document_id, document_chunks in chunks.items():
104 for chunk in document_chunks:
105 chunk_text_replace = chunk.text.replace("\x00", "\uFFFD") # s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD")
107 if chunk.text != chunk_text_replace:
108 print(" Text was replaced like x00 by uFFFD ")
109 else :
110 print("No replacement !")
112 if verbose:
113 print(str(chunk.text))
114 print(str(chunk_text_replace))
116 json = {
117 "id": chunk.id,
118 "content": chunk_text_replace, # s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD")
120 "embedding": chunk.embedding,
121 "document_id": document_id,
122 "source": chunk.metadata.source,
123 "source_id": chunk.metadata.source_id,
124 "url": chunk.metadata.url,
125 "author": chunk.metadata.author,
126 }
127 if chunk.metadata.created_at:
128 json["created_at"] = (
129 datetime.fromtimestamp(
130 to_unix_timestamp(chunk.metadata.created_at)
131 ),
132 )
133 try:
134 self.upsert(table, json)
135 except Exception as e:
136 print(str(e))
137 print("Exception with this document_id : " + str(document_id))
139 return list(chunks.keys())
141# async
142 def upsert(self, table: str, json: dict[str, Any]):
143 """
144 Takes in a list of documents and inserts them into the table.
145 """
146 print("pg upsert in " + str(table))
147 with self.client.cursor() as cur:
148 if not json.get("created_at"):
149 json["created_at"] = datetime.now()
150 json["embedding"] = np.array(json["embedding"])
151 cur.execute(
152 f"INSERT INTO {table} (id, content, embedding, document_id, source, source_id, url, author, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET content = %s, embedding = %s, document_id = %s, source = %s, source_id = %s, url = %s, author = %s, created_at = %s",
153 (
154 json["id"],
155 json["content"],
156 json["embedding"],
157 json["document_id"],
158 json["source"],
159 json["source_id"],
160 json["url"],
161 json["author"],
162 json["created_at"],
163 json["content"],
164 json["embedding"],
165 json["document_id"],
166 json["source"],
167 json["source_id"],
168 json["url"],
169 json["author"],
170 json["created_at"],
171 ),
172 )
173 self.client.commit()
177def find_docs(embedding_input,
178 lib_data,
179 function = "match_page_sections",
180 in_match_count = 5,
181 verbose = False):
183 one_embedding = embedding_input[0][0]
185 params = {
186 "in_embedding": one_embedding,
187 "in_match_count": in_match_count
188 }
189# function = "match_page_sections"
191# Dans quelle partie se trouve du code à nettoyer ?
193 pgc = PostgresClient(lib_data.client)
194 result = pgc.rpc(function, params)
196 return result
198from abc import ABC, abstractmethod
199import asyncio
201from lib.import_util.lib_import_retrieval.models.models import (
202 Document,
203 DocumentChunk,
204 DocumentChunkMetadata,
205 DocumentMetadataFilter,
206 Query,
207 QueryResult,
208 QueryWithEmbedding,
209)
211def upsert(
212 documents: List[Document], chunk_token_size: Optional[int] = None,
213 lpgss = None,
214 OPENAI_API_KEY = None, PG_TABLE = "documents",
215 verbose = False
216 ) -> (List[str], int, str):
217 """
218 Takes in a list of documents and inserts them into the database.
219 First deletes all the existing vectors with the document id (if necessary, depends on the vector db), then inserts the new ones.
220 Return a list of document ids.
221 """
222 # Delete any existing vectors for documents with the input document ids
224# VR TODO 31-5-23 : needs delete also !
225# await asyncio.gather(
226# *[
227# delete(
228# filter=DocumentMetadataFilter(
229# document_id=document.id,
230# ),
231# delete_all=False,
232# )
233# for document in documents
234# if document.id
235# ]
236# )
238 chunks, total_nb_token, used_model = get_document_chunks(documents, chunk_token_size, OPENAI_API_KEY, verbose = verbose)
240 pgc = PostgresClient(lpgss.client)
242# pgc = PostgresClient(PG_HOST, PG_USER, PG_PASSWORD, PG_DB, PG_PORT)
244 res = pgc.upsert_(PG_TABLE, chunks)
245 return res, total_nb_token, used_model
249# Constants
250CHUNK_SIZE = 200 # The target size of each text chunk in tokens
251MIN_CHUNK_SIZE_CHARS = 350 # The minimum size of each text chunk in characters
252MIN_CHUNK_LENGTH_TO_EMBED = 5 # Discard chunks shorter than this
253EMBEDDINGS_BATCH_SIZE = int(os.environ.get("OPENAI_EMBEDDING_BATCH_SIZE", 128)) # The number of embeddings to request at a time
254MAX_NUM_CHUNKS = 10000 # The maximum number of chunks to generate from a text
255MIN_NB_TOKEN_CHUNK = MIN_CHUNK_LENGTH_TO_EMBED
258import tiktoken
260#from services.openai import get_embeddings
261from lib.lib_openai import get_embeddings, get_embeddingss
265# Global variables
266tokenizer = tiktoken.get_encoding(
267 "cl100k_base"
268) # The encoding scheme to use for tokenization
271def get_text_chunks(text: str, chunk_token_size: Optional[int], OPENAI_API_KEY = None) -> List[str]:
272 """
273 Split a text into chunks of ~CHUNK_SIZE tokens, based on punctuation and newline boundaries.
275 Args:
276 text: The text to split into chunks.
277 chunk_token_size: The target size of each chunk in tokens, or None to use the default CHUNK_SIZE.
279 Returns:
280 A list of text chunks, each of which is a string of ~CHUNK_SIZE tokens.
281 """
282 # Return an empty list if the text is empty or whitespace
283 if not text or text.isspace():
284 return []
286 # Tokenize the text
287 tokens = tokenizer.encode(text, disallowed_special=())
289 # Initialize an empty list of chunks
290 chunks = []
292 # Use the provided chunk token size or the default one
293 chunk_size = chunk_token_size or CHUNK_SIZE
295 # Initialize a counter for the number of chunks
296 num_chunks = 0
298 # Loop until all tokens are consumed
299 while tokens and num_chunks < MAX_NUM_CHUNKS:
300 # Take the first chunk_size tokens as a chunk
301 chunk = tokens[:chunk_size]
303 # s.decode("utf-8", errors="replace").replace("\x00", "\uFFFD")
305 # Decode the chunk into text
306 chunk_text = tokenizer.decode(chunk)
308 # Skip the chunk if it is empty or whitespace
309 if not chunk_text or chunk_text.isspace():
310 # Remove the tokens corresponding to the chunk text from the remaining tokens
311 tokens = tokens[len(chunk) :]
312 # Continue to the next iteration of the loop
313 continue
315 # Find the last period or punctuation mark in the chunk
316 last_punctuation = max(
317 chunk_text.rfind("."),
318 chunk_text.rfind("?"),
319 chunk_text.rfind("!"),
320 chunk_text.rfind("\n"),
321 )
323 # If there is a punctuation mark, and the last punctuation index is before MIN_CHUNK_SIZE_CHARS
324 if last_punctuation != -1 and last_punctuation > MIN_CHUNK_SIZE_CHARS:
325 # Truncate the chunk text at the punctuation mark
326 chunk_text = chunk_text[: last_punctuation + 1]
328 # Remove any newline characters and strip any leading or trailing whitespace
329 # TODO VR 2-7 to avoid saving original document somewhere else !
330 # chunk_text_to_append = chunk_text.replace("\n", " ").strip()
332 # TODO VR 2-7 and we want all the data !
333 # if len(chunk_text) > MIN_CHUNK_LENGTH_TO_EMBED:
334 # Append the chunk text to the list of chunks
336 # Remove the tokens corresponding to the chunk text from the remaining tokens
337 # il faudrait au moins rajouter tout ce qui est différent de chunk_text_to_append
338 chunk_token_aux = tokenizer.encode(chunk_text, disallowed_special=())
339 nb_same_chunk = 0
340 for i in range(len(chunk_token_aux)):
341 if chunk_token_aux[i] == chunk[i]:
342 nb_same_chunk += 1
343 else:
344 break
346 if nb_same_chunk < MIN_NB_TOKEN_CHUNK:
347 print(" Will occur with pb if it was at the end !")
348 nb_same_chunk = MIN_NB_TOKEN_CHUNK
350 if nb_same_chunk < len(chunk_token_aux):
351 print("Difference IN CHUNK TOKENIZATION treated as warning but may be harmful !")
352 chunk_text = tokenizer.decode(chunk[:nb_same_chunk])
354 nb_same_chunk_after_endecode = len(tokenizer.encode(chunk_text, disallowed_special=()))
355 if nb_same_chunk < len(chunk_token_aux) and nb_same_chunk_after_endecode == nb_same_chunk:
356 print("We should be safe by now")
358 tokens = tokens[nb_same_chunk_after_endecode :]
360 chunks.append(chunk_text)
362 # Increment the number of chunks
363 num_chunks += 1
365 # Handle the remaining tokens
366 if tokens:
367 remaining_text = tokenizer.decode(tokens).replace("\n", " ").strip()
368 if len(remaining_text) > MIN_CHUNK_LENGTH_TO_EMBED:
369 chunks.append(remaining_text)
371 return chunks
374from typing import Tuple # Dict, List, Optional,
376def create_document_chunks(
377 doc: Document, chunk_token_size: Optional[int], OPENAI_API_KEY = None
378) -> Tuple[List[DocumentChunk], str]:
379 """
380 Create a list of document chunks from a document object and return the document id.
382 Args:
383 doc: The document object to create chunks from. It should have a text attribute and optionally an id and a metadata attribute.
384 chunk_token_size: The target size of each chunk in tokens, or None to use the default CHUNK_SIZE.
386 Returns:
387 A tuple of (doc_chunks, doc_id), where doc_chunks is a list of document chunks, each of which is a DocumentChunk object with an id, a document_id, a text, and a metadata attribute,
388 and doc_id is the id of the document object, generated if not provided. The id of each chunk is generated from the document id and a sequential number, and the metadata is copied from the document object.
389 """
390 # Check if the document text is empty or whitespace
391 if not doc.text or doc.text.isspace():
392 return [], doc.id or str(uuid.uuid4())
394 # Generate a document id if not provided
395 doc_id = doc.id or str(uuid.uuid4())
397 # Split the document text into chunks
398 text_chunks = get_text_chunks(doc.text, chunk_token_size, OPENAI_API_KEY = OPENAI_API_KEY)
400 metadata = (
401 DocumentChunkMetadata(**doc.metadata.__dict__)
402 if doc.metadata is not None
403 else DocumentChunkMetadata()
404 )
406 metadata.document_id = doc_id
408 # Initialize an empty list of chunks for this document
409 doc_chunks = []
411 # Assign each chunk a sequential number and create a DocumentChunk object
412 for i, text_chunk in enumerate(text_chunks):
413 chunk_id = f"{doc_id}_{i}"
414 doc_chunk = DocumentChunk(
415 id=chunk_id,
416 text=text_chunk,
417 metadata=metadata,
418 )
419 # Append the chunk object to the list of chunks for this document
420 doc_chunks.append(doc_chunk)
422 # Return the list of chunks and the document id
423 return doc_chunks, doc_id
427def get_document_chunks(
428 documents: List[Document], chunk_token_size: Optional[int], OPENAI_API_KEY = None, verbose = False
429) -> Dict[str, List[DocumentChunk]]:
430 """
431 Convert a list of documents into a dictionary from document id to list of document chunks.
433 Args:
434 documents: The list of documents to convert.
435 chunk_token_size: The target size of each chunk in tokens, or None to use the default CHUNK_SIZE.
437 Returns:
438 A dictionary mapping each document id to a list of document chunks, each of which is a DocumentChunk object
439 with text, metadata, and embedding attributes.
440 """
441 # Initialize an empty dictionary of lists of chunks
442 chunks: Dict[str, List[DocumentChunk]] = {}
444 # Initialize an empty list of all chunks
445 all_chunks: List[DocumentChunk] = []
447 # Loop over each document and create chunks
448 for doc in documents:
449 doc_chunks, doc_id = create_document_chunks(doc, chunk_token_size, OPENAI_API_KEY = OPENAI_API_KEY)
451 # Append the chunks for this document to the list of all chunks
452 all_chunks.extend(doc_chunks)
454 # Add the list of chunks for this document to the dictionary with the document id as the key
455 chunks[doc_id] = doc_chunks
457 # Check if there are no chunks
458 if not all_chunks:
459 return {}
461 total_nb_token = 0
462 used_model = ""
464 # Get all the embeddings for the document chunks in batches, using get_embeddings
465 embeddings: List[List[float]] = []
466 for i in range(0, len(all_chunks), EMBEDDINGS_BATCH_SIZE):
467 # Get the text of the chunks in the current batch
468 batch_texts = [
469 chunk.text for chunk in all_chunks[i : i + EMBEDDINGS_BATCH_SIZE]
470 ]
472 if verbose:
473 print(" batch_texts : " + str(batch_texts))
475 if OPENAI_API_KEY != None:
476 # Get the embeddings for the batch texts
477 batch_embeddings, nb_token, model = get_embeddingss(batch_texts, openai_token = OPENAI_API_KEY, verbose = verbose)
478 else:
479 print(" Document not indexed et voila ! => a lot of refacto to do to have a nice interface for all of this !")
480 batch_embeddings = [[] for i in range(len(batch_texts)) ]
481 nb_token = 0
482 model = "no_embeddings"
484 total_nb_token += nb_token
485 if used_model == "":
486 used_model = model
487 if model != used_model:
488 print("ERROR IN COST ESTIMATION (TREATED AS WARNING, USING FIRST MODEL)")
490 if verbose:
491 print(" batch_embeddings : " + str(batch_texts))
493 # Append the batch embeddings to the embeddings list
494 embeddings.extend(batch_embeddings)
496 print(" LEN embeddings : " + str(len(embeddings)))
497 if verbose :
498 if len(embeddings) > 0:
499 print(" one begin " + str(embeddings[0][:10]))
501 # Update the document chunk objects with the embeddings
502 for i, chunk in enumerate(all_chunks):
503 # Assign the embedding from the embeddings list to the chunk object
504 chunk.embedding = embeddings[i]
506 return chunks, total_nb_token, used_model