I have such entries in my elasticsearch index:
It’s unstructured data, in this case the content of PDF that was splitted into chunks, then langchain document was created for each chunk and pushed to the index as different vector.
I faced an issue that each time I load the pdf and push to the index new entries were pushed (with same content). The code that is used for that purpose:
def push_to_elasticsearch(es_index_name,embeddings,docs):
elastic_vector_search = ElasticsearchStore(
# es_cloud_id=es_cloud_id,
# es_endpoint=es_endpoint,
# es_apikey=es_apikey,
index_name=es_index_name,
# docs=docs,
embedding=embeddings,
es_connection=es_connection
)
docs_ids = [doc.metadata["hash_id"] for doc in docs]
# # print("---------------------------------------------------------->>>>>>>>>>", docs_ids)
vector_exists_dict = check_vectors_exist_by_hash_id(es_index_name, docs_ids)
print("---------------------------------------------------------->>>>>>>>>>", vector_exists_dict)
idempotency_docs = [doc for doc in docs if not vector_exists_dict.get(doc.metadata["hash_id"], False)]
# idempotency_docs = [doc for doc in docs if not vector_exists_dict.get(calculate_content_hash(doc.page_content), False)]
print('Len of docs:', len(docs))
print('Len of idempotency_docs:', len(idempotency_docs))
elastic_vector_search.add_documents(documents=docs)
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_connection=es_connection,
index_name=es_index_name,
)
return db
In order to check the existence of vectors before pushing them, I guess I couldn’t use the existing _id field (since it’s not pushed yet), so I added a new hash_id field in the metadata column (based on hash content), and I want to use it for searching the index before pushing. I still don’t know how exactly to implement it, I thought about this implementation:
def check_vectors_exist_by_hash_id(index_name, docs_hash_ids):
"""
Check if vectors exist for a list of document IDs.
Args:
doc_ids (list): List of document IDs to check.
Returns:
dict: A dictionary where keys are document IDs and values are boolean (True if vector exists, False otherwise).
"""
vector_exists_dict = {}
try:
# Fetch documents by IDs
responses = es_connection.mget(index=index_name, body={"hash_ids": docs_hash_ids})
for response in responses["docs"]:
doc_id = response["hash_id"]
vector_exists_dict[doc_id] = "embedding" in response["_source"]
except Exception as e:
print(f"Error checking vector existence for doc_ids: {e}")
return vector_exists_dict
but didn’t figure out yet how to filter by these hash_ids!