I am using the below test code to try parallel llamaindex ingestion. The code is taken from the link: https://docs.llamaindex.ai/en/stable/examples/ingestion/parallel_execution_ingestion_pipeline/
The code runs as expected using openai-embedding model, but when used with a deployed embedding model either on sky or locally it fails. The logs from the model shows that the token exceeds the max token limit of 512(chunk size given is 256). If i run the same code without parallelization it runs with a warning that token count are more and it might cause indexing errors. I have checked the text which returns from the splitter and observed that in case of no parallelization the same text is splitted differently than the case which involves parallelization which might be the cause of failure. Below is the code:
import multiprocessing as mp
from llama_index.core import SimpleDirectoryReader, Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter,LangchainNodeParser
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.text_splitter import TokenTextSplitter
# Set multiprocessing start method
from llama_index.core import set_global_tokenizer
mp.set_start_method("spawn", force=True)
def get_embedding():
emburl = "http://127.0.0.1:8080/"
embkey = "dummy"
batch_size = 10
headers = {"Authorization": "Bearer " + embkey}
return OpenAIEmbedding(model_name="BAAI/bge-large-en-v1.5", api_base=emburl, api_key=headers["Authorization"], embed_batch_size=batch_size,
default_headers=headers, timeout=300, reuse_client=False)
def main():
documents = SimpleDirectoryReader(input_dir="<path>",recursive=True).load_data(num_workers=4)
print(len(documents))
# Create the pipeline with transformations
from transformers import AutoTokenizer
set_global_tokenizer(AutoTokenizer.from_pretrained("BAAI/bge-large-en-v1.5").encode)
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=256, chunk_overlap=0,tokenizer=AutoTokenizer.from_pretrained("BAAI/bge-large-en-v1.5").encode),
get_embedding(),
]
)
node = pipeline.run(documents=documents,num_workers=2)
# Disable cache for performance testing
pipeline.disable_cache = True
print("done")
if __name__ == '__main__':
main()
I tried with other splitters like tokentextsplitter and could see the same error. Any advice on this would be helpful
I have tried to run the above test code for multiple number of workers and expect it to behave like openai-embedding in parallel. If i reduce the chunk size the code works as expected and looks like there is some issue in parallelizing the splitter passed.