Bug Description
I like to create a status_checker api endpoint in fastapi to track the creation of chromadb embeddings. Also I like to create these emebeddings in async mode. Below mentioned the code, but it is giving error. Please do the needful.
Version
llama-index 0.10.12
Code we tried
<code>import os
import asyncio
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader
from llama_index.core.retrievers import RecursiveRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
app = FastAPI()
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
try:
#Ensure 'docs' directory exists
if not os.path.exists("docs"):
os.makedirs("docs")
# Write the file to server with its original filename
file_path = os.path.join("docs", file.filename)
with open(file_path, "wb") as f:
f.write(await file.read())
from rag_define import define_rag
asyncio.create_task(define_rag())
return JSONResponse(content={"message": "File uploaded successfully"})
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/status")
async def status_checker():
return global_variable.upload_in_progress
async def define_rag():
documents = SimpleDirectoryReader(input_dir="./docs",required_exts = [".docx",".doc",".pdf",".txt"]).load_data()
if os.path.exists('./chroma_db'):
print("*************************utilizing pre generated embeddings from chromadb folder")
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model,use_async=True, show_progress=True)
else:
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
#index = VectorStoreIndex.from_documents(documents,storage_context=storage_context)
vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model,storage_context=storage_context,use_async=True, show_progress=True)
vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
global_variable.retriever_chunk = RecursiveRetriever(
"vector",
retriever_dict={"vector": vector_retriever_chunk},
node_dict=all_nodes_dict,
verbose=True,
)
print("Vector store creation done")
global_variable.upload_in_progress = 1
global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(
global_variable.retriever_chunk,
llm=global_variable.llm,
text_qa_template=global_variable.text_qa_template
)
</code>
<code>import os
import asyncio
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader
from llama_index.core.retrievers import RecursiveRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
app = FastAPI()
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
try:
#Ensure 'docs' directory exists
if not os.path.exists("docs"):
os.makedirs("docs")
# Write the file to server with its original filename
file_path = os.path.join("docs", file.filename)
with open(file_path, "wb") as f:
f.write(await file.read())
from rag_define import define_rag
asyncio.create_task(define_rag())
return JSONResponse(content={"message": "File uploaded successfully"})
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/status")
async def status_checker():
return global_variable.upload_in_progress
async def define_rag():
documents = SimpleDirectoryReader(input_dir="./docs",required_exts = [".docx",".doc",".pdf",".txt"]).load_data()
if os.path.exists('./chroma_db'):
print("*************************utilizing pre generated embeddings from chromadb folder")
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model,use_async=True, show_progress=True)
else:
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
#index = VectorStoreIndex.from_documents(documents,storage_context=storage_context)
vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model,storage_context=storage_context,use_async=True, show_progress=True)
vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
global_variable.retriever_chunk = RecursiveRetriever(
"vector",
retriever_dict={"vector": vector_retriever_chunk},
node_dict=all_nodes_dict,
verbose=True,
)
print("Vector store creation done")
global_variable.upload_in_progress = 1
global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(
global_variable.retriever_chunk,
llm=global_variable.llm,
text_qa_template=global_variable.text_qa_template
)
</code>
import os
import asyncio
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader
from llama_index.core.retrievers import RecursiveRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
app = FastAPI()
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
try:
#Ensure 'docs' directory exists
if not os.path.exists("docs"):
os.makedirs("docs")
# Write the file to server with its original filename
file_path = os.path.join("docs", file.filename)
with open(file_path, "wb") as f:
f.write(await file.read())
from rag_define import define_rag
asyncio.create_task(define_rag())
return JSONResponse(content={"message": "File uploaded successfully"})
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/status")
async def status_checker():
return global_variable.upload_in_progress
async def define_rag():
documents = SimpleDirectoryReader(input_dir="./docs",required_exts = [".docx",".doc",".pdf",".txt"]).load_data()
if os.path.exists('./chroma_db'):
print("*************************utilizing pre generated embeddings from chromadb folder")
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model,use_async=True, show_progress=True)
else:
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
#index = VectorStoreIndex.from_documents(documents,storage_context=storage_context)
vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model,storage_context=storage_context,use_async=True, show_progress=True)
vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
global_variable.retriever_chunk = RecursiveRetriever(
"vector",
retriever_dict={"vector": vector_retriever_chunk},
node_dict=all_nodes_dict,
verbose=True,
)
print("Vector store creation done")
global_variable.upload_in_progress = 1
global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(
global_variable.retriever_chunk,
llm=global_variable.llm,
text_qa_template=global_variable.text_qa_template
)
#What we expect
asyncio.create_task(define_rag()):
—– This should be executes successfully
<code>@app.post("/status")
async def status_checker():
return global_variable.upload_in_progress
</code>
<code>@app.post("/status")
async def status_checker():
return global_variable.upload_in_progress
</code>
@app.post("/status")
async def status_checker():
return global_variable.upload_in_progress
This status checker should check and return the statement while executeing create_task.
New contributor
PADALA LIKHITH RISHI is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.