We have an API in a cloud that accepts audio recordings and returns features. I would now like to send a number n of audio recordings to the API and whenever I receive features of a recording back, a new request for features of a recording should be pushed from behind. This should be possible with a queue, but I haven’t figured out how to do it yet.
With my current method, only one chunk is processed at a time and the next chunk is sent after it (see below). But how can I ensure that there is always a refill from the back while the front is being processed?
URL = 'https://url_post/post'
URL_features =
'https://url_get/features'
file_paths = file_paths_names_background['file_paths']
chunk_size = 30
nr_of_chunks = math.ceil(len(file_paths) / chunk_size)
file_paths_chunks = lst_in_chunks(file_paths, chunk_size)
res = []
error_lst = []
rest_time = 1
t0 = time.time()
count = 0
verbose = -1
for file_paths_chunk in tqdm(file_paths_chunks,
file=sys.stdout
):
print(f'nProcess chunk {count} of {nr_of_chunks}.')
client_session = aiohttp.ClientSession()
res_post = await post_audio_files(URL,
file_paths_chunk,
client_session,
tqdm_switch=False)
error_lst += check_resp_lst_for_erros(res_post)
print(f'ntPost: {len(error_lst)} errors.')
client_session = aiohttp.ClientSession()
resp_features = await get_features_via_tokens(url = URL_features,
resp_dict_in =
[res_dict for res_dict in res_post if res_dict['status_code'] == 200],
client_session = client_session)
res = res + concat_post_feat_resp(resp_posts = res_post,
resp_feats = resp_features)
if verbose > -1:
print(f'ntRest for {rest_time} seconds.')
time.sleep(rest_time)
print('# ------------------------------------ #')
count += 1
t1 = time.time()
print('----------------------------------')
print(f'nUploaded all {len(file_paths)} files.')
print(f"n--> Elapsed time: {t1 - t0} s.")
print('----------------------------------')
async def post_audio_files(url: str = None,
file_paths: list = None,
client_session: 'aiohttp.ClientSession()' = None,
tqdm_switch: bool = False,
verbose: int = -1):
t0 = time.time()
if verbose > -1:
print('n# --------------------------------------- #')
print(f"nThe posting of {len(file_paths)} audio files started at : {datetime.now()}.")
if tqdm_switch:
async with client_session as session:
res = await tqdm_asyncio.gather(*[post_audio_file(url = url,
file_path = file_path,
session = session)
for file_path in file_paths])
else:
async with client_session as session:
res = await asyncio.gather(*[post_audio_file(url = url,
file_path = file_path,
session = session)
for file_path in file_paths])
if verbose > -1:
print(f'ntAll {len(res)} files have been uploaded.')
t1 = time.time()
if verbose > -1:
print(f"nUploading the files and gathering the tokens ended at {datetime.now()}.")
print(f"nt--> Elapsed time: {t1 - t0} s.")
print('n# --------------------------------------- #')
return res
async def get_features_stat200(url: str = None,
token_file_name_lst: list = None,
client_session: 'aiohttp.ClientSession()' = None,
rest_count: int = 1,
verbose: int = -1):
t0 = time.time()
if verbose > -1:
print(f"nThe gathering of the features of {len(token_file_name_lst)} files "
f"started at: {datetime.now()}.")
async with client_session as session:
animation = "|/-\"
idx = 0
results_bool_lst = [False]
results = []
while not all(results_bool_lst):
print(f'n---- {inspect.currentframe().f_code.co_name}')
print(f'---- Iteration {idx+1} ---- nGather and check for results: ', animation[idx % len(animation)], end="r")
idx += 1
results_current = await tqdm_asyncio.gather(*[get_feature(url = url,
file_name = token_file_name[1],
token = token_file_name[0],
session = session)
for token_file_name in token_file_name_lst])
results_bool_lst = [False if resp_dict['status_code'] != 200 else True for resp_dict in results_current]
status_codes = [resp_dict['status_code'] for resp_dict in results_current if resp_dict['status_code'] != 200]
token_file_name_lst = [(resp_dict['token'], resp_dict['file_name'])
for resp_dict in results_current if resp_dict['status_code'] != 200]
# results += [resp_dict for resp_dict in results_current if resp_dict['status_code'] == 200]
results += results_current
print(f"ntCurrent successful responses: {len(
[resp_dict for resp_dict in results_current if resp_dict['status_code'] == 200]
)}")
print(f"ntCurrent unsuccessful responses: {len(token_file_name_lst)}")
print(f'ttStatus codes: {set(status_codes)}')
break_lst = [True if resp_dict['status_code'] == 500 else False for resp_dict in results_current]
# If the only tokens that are still in the loop
# only have the code 500, then break
if all(break_lst):
print('Current results: ', len(results_current))
break
if not all(results_bool_lst):
show_countdown(rest_count)
print(f'ntReceived features from {len(
[resp_dict for resp_dict in results_current if resp_dict['status_code'] == 200]
)} tokens.')
t1 = time.time()
print(f"nGathering the features ended at {datetime.now()}.")
print(f"nt--> Elapsed time: {t1 - t0} s.")
return results