How to make I/O operation asyncronously using async await?

`I have written a python program to convert PDF into text files, those text files iinto more cleaned text files which are converted into CSV files which are later converted into a single CSV file.
The problem is the whole program does not execute at one go. When i execute it once, All the text files are created. But the CSV files are only generated when i execute the program again, after generation of the text files, How to fix this problem? I have tried using async await but apparently its not working

As per my assesment the problem could be in

async def merge_csvs
async def convert_to_csv(clean_tables_dir)
async def segregate_patient_data(all_patients_raw_data, output_dir)
async def compare_patients_data

Any help would be greatly appreciated

Here is my code

` from PyPDF2 import PdfReader, PdfWriter
import re
from pdfminer.high_level import extract_text
import os
import csv
import fitz
import pandas as pd
import codecs
import asyncio
from functools import partial
from concurrent.futures import ThreadPoolExecutor

current_directory = os.getcwd()
print(f"Current Working Directory {current_directory}")

clean_data_file = f"{current_directory}/data/extracted_clean_data.txt"
patient_raw_file_dir = f"{current_directory}/patient_raw_data"
csv_dir = f"{current_directory}data/output_csvs"
all_patients_raw_data = f'{current_directory}/src/all_patients_raw_data.txt'
input_pdf = f'{current_directory}/eg.pdf'
keywords = ['PatientName']  # List containing the multi-word keyword
extracted_pdf = f"{current_directory}/data/extracted_pages.pdf"
output_dir = f'{current_directory}/data/splitpdfs'
output_text_dir = f'{current_directory}/data/textfiles'
output_folder_path = f"{current_directory}/data/extracted_clean_tables"  # Folder to save text files
patient_text_files_rl = f"{current_directory}/data/Revised_Text_Files"

print(f"Successfully processed PDFs in '{output_dir}'. Check '{output_text_dir}' for text files.")

if not os.path.exists(all_patients_raw_data):
# Ensure the directory exists
os.makedirs(os.path.dirname(all_patients_raw_data), exist_ok=True)

# Step 1: Extracting pages containing required keywords from the pdf file and making a separate PDF for the same
async def extractPages(input_pdf, keywords, extracted_pdf):
try:
    # Initializing Reader and Writer
    reader = PdfReader(open(input_pdf, 'rb'))
    writer = PdfWriter()
    # Iterate through each page asynchronously
    for page_num in range(len(reader.pages)):
        page = reader.pages[page_num]
        text = page.extract_text()
        # Search for any keyword in the text (case-insensitive)
        if any(keyword in text for keyword in keywords):
            writer.add_page(page)

    # Save the new PDF with required pages only after iterating all pages
    with open(extracted_pdf, 'wb') as stripped_clean_data_file:
        writer.write(stripped_clean_data_file)

except Exception as e:
    print(f"Error {e}")


import tabula

def extract_patients_clean_data(folder_path, output_file_path):

text_data = ""  # Accumulate text data across all PDFs
table_counter = 0
pdf_to_text_output_folder = f'{current_directory}/data/Revised_Text_Files'
patient_names_list = []

 for patient_names in os.listdir(f'{current_directory}/patient_raw_data'):
  if patient_names.endswith('.txt'):
      patient_file_name = os.path.basename(patient_names)
      patient_names_list.append(patient_file_name)

 patient_names_list.sort()

pdf_path_dir = f'{current_directory}/eg.pdf'

pdf_data = tabula.read_pdf(pdf_path_dir, pages="all")
patient_names = []

for index, (page_data, patient_name) in enumerate(zip(pdf_data, patient_names_list)):
print(f"Number of Tables {len(page_data)}")
table_counter += 1  # Assuming table_counter is initialized somewhere

# Check if the index is within the range of patient_names_list
if index < len(patient_names_list):
    # Construct the filename using the patient name at the current index
    clean_table_filename = f"{patient_names_list[index]}.txt"
    clean_table_filepath = os.path.join(pdf_to_text_output_folder, clean_table_filename)
    
    # Write the page data to the text file
    with open(clean_table_filepath, 'w') as clean_table_text_file_fs:
        clean_table_text_file_fs.write(page_data.to_string(index=False))

else:
    print("Error: Index out of range for patient_names_list")

# remove_headers_from_clean_data(clean_table_filepath)

text_data += page_data.to_string(index=False) + f"nnnnn-------------------------------------------------------------------------------------nnnn"  # Add separator between pages

  # Write accumulated text data to the output file
  with open(output_file_path, "w", encoding="utf-8") as f:
 f.write(text_data)
print(f"Master table file created and saved to '{output_file_path}'")
print(f"NO OF TABLES IN TABULA {table_counter}")

# Step 6 : Remove Headers from the text file

def remove_headers(input_tables):

   with open(input_tables, 'r') as f:
  lines = f.readlines()

with open(input_tables, 'w') as g:
  for line in lines:
     if '    SERVICE' not in line:
        if '      DATES' not in line:
           if ' SERVICE' not in line:
              if '  DATES' not in line:
               g.write(line)

def remove_headers_from_clean_data(input_tables):

with open(input_tables, 'r') as f:
  lines = f.readlines()

with open(input_tables, 'w') as g:
  for line in lines:
     if '    SERVICE' not in line:
        if '   DATE' not in line:
           if ' SERVICE' not in line:
              if '  DATES' not in line:
               g.write(line)

print("Headers removed from Master Table filen")

def extract_patients_raw_data(pdf_path):
"""Extracts tables and associates them with patient IDs, writing to a text file.

Args:
    pdf_folder (str): Path to the folder containing PDFs.
    stripped_clean_data_file (str): Path to the output text file.
"""
with fitz.open(pdf_path) as doc:
  text = ""
  stripped_clean_data_file = all_patients_raw_data
  for page in doc:
   text += page.get_text("text")  # No need for joining here

with open(stripped_clean_data_file, 'w') as f:  # Open outside the loop, write once
  f.write(text)
import re
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)

# Segregaing the patients pdf text data into individual patient text files based on the number of tables

import asyncio
import os

async def segregate_patient_data(all_patients_raw_data, output_dir):
"""
Segregates patient data from a text file and writes to individual files.

Args:
    all_patients_raw_data: The path to the text file.
    output_dir: The directory to store individual patient files.
"""
current_patient = ""
current_patient_data = ""
patient_data_started = False  # Flag to indicate if patient data has started
patient_data_txt_index = 0
patient_raw_file_dir = f"{current_directory}/patient_raw_data"

with open(all_patients_raw_data, 'r') as f:
    for line in f:
        line = line.rstrip()  # Remove trailing newline character

        if line.startswith("Patient Name:"):

            if current_patient and current_patient_data:  # Write data if previous patient data exists
                with open(f"{output_dir}/{patient_data_txt_index}_{current_patient}.txt", 'w') as patient_raw_file:
                    patient_raw_file.write(current_patient_data)
            else:
                patient_data_txt_index = patient_data_txt_index + 1

            current_patient = line[len("Patient Name: "):].strip()  # Extract name without prefix
            current_patient_refined = current_patient.replace(' ','-')
            current_patient_data = f"nPatient Name : {current_patient}n"  # Reset data for new patient
            patient_data_started = True  # Set flag to indicate patient data has started

        elif line.startswith("For Questions Regarding This Claim"):
            if current_patient and current_patient_data:
                current_patient_data += line + "n"  # Add ending line
                with open(f"{output_dir}/{patient_data_txt_index}_{current_patient_refined}.txt", 'w') as patient_raw_file:
                    patient_raw_file.write(current_patient_data)
            current_patient = ""
            current_patient_data = ""
            patient_data_started = False  # Reset flag as patient data ended
            
        elif patient_data_started:  # Only process lines if patient data has started
            current_patient_data += line + "n"  # Append line to current patient data

    # Write any remaining patient data after reaching end of file
    if current_patient and current_patient_data:
        with open(f"{output_dir}/{patient_data_txt_index}_{current_patient}.txt", 'a') as patient_file:
            patient_file.write(current_patient_data)

    for index, patient_txt_file in enumerate(os.listdir(patient_raw_file_dir), start=0):
        if patient_txt_file.endswith(".txt"):
            patient_filepath = os.path.join(patient_raw_file_dir, patient_txt_file)
            with open(patient_filepath, 'r') as f:
                f.seek(0, 0)
                file_content = f.read()

                words_to_scan = ['NEGOTIATED', 'AMOUNT', 'COPAY', 'SUBMITTED', 'CHARGES', 'AMOUNT']
                if not any(word in file_content for word in words_to_scan):
                    os.remove(patient_filepath)       

patient_output_dir = f”{current_directory}/patient_raw_data” # Adjust as needed

from collections import defaultdict

def sort_words_in_file(clean_data_file, stripped_clean_data_file):
with open(clean_data_file, 'r') as f:
    lines = f.readlines()

sorted_lines = []
for line in lines:
    words = line.split()  # Split line into words
    sorted_words = sorted(words, key=lambda x: x.strip())  # Sort words alphabetically after removing white spaces
    sorted_line = 'n'.join(sorted_words)  # Join sorted words back into a line
    sorted_lines.append(sorted_line)

with open(stripped_clean_data_file, 'w') as f:
    f.writelines(sorted_lines)

stripped_clean_data_file = f'{current_directory}/stripped_clean_data.txt'
sort_words_in_file(clean_data_file,stripped_clean_data_file)

def extract_and_remove_data(file_path, output_dir):
try:
    if not os.path.isfile(file_path):
        print(f"Error: {file_path} does not exist.")
        return

    with open(file_path, 'r') as file:
        lines = file.readlines()

    separator_index = -1
    for i, line in enumerate(lines):
        if line.count('-') >= 10:
            separator_index = i
            break

    if separator_index == -1:
        print("Separator not found in the file.")
        return

    copied_data = lines[:separator_index]
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    file_name = os.path.basename(file_path)
    output_file_path = os.path.join(output_dir, file_name)

    with open(output_file_path, 'w') as stripped_clean_data_file:
        stripped_clean_data_file.writelines(copied_data)
    with open(file_path, 'w') as file:
        file.writelines(lines[separator_index + 1:])
    return output_file_path

except Exception as e:
    print(f"Error in extract_and_remove_data: {e}")

 def verify_patient_raw_data_content(patient_raw_file_dir,patient_came_extra,patient_data_txt_index):
try:
    if not os.path.isdir(patient_raw_file_dir):
        print(f"Error: {patient_raw_file_dir} is not a valid directory.")
        return
    patient_raw_file_dir = f"{current_directory}/patient_raw_data"

    for index, patient_raw_file in enumerate(os.listdir(patient_raw_file_dir)):
        if patient_raw_file.endswith(".txt"):
            patient_filepath = os.path.join(patient_raw_file_dir, patient_raw_file)
            with open(patient_filepath, 'r') as patient_raw_text_file:
                patient_raw_text_data = patient_raw_text_file.read()
                words_to_scan = ['NEGOTIATED','AMOUNT','COPAY','SUBMITTED','CHARGES','AMOUNT']
                if not any(word in patient_raw_text_data for word in words_to_scan):
                    os.remove(patient_filepath)
                    patient_came_extra = True
                    patient_data_txt_index = patient_data_txt_index -1
                    print('COUNTER DECREMENTED')
except Exception as e:
    print(f"Error {e}")

async def async_read_file(file_path):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
    async with loop.run_in_executor(pool, open, file_path, 'r') as file:
        return [line.strip() async for line in file]

async def async_read_file(file_path):
with open(file_path, 'r') as file:
    return [line.strip() for line in file.readlines()]

async def compare_patients_data(patient_raw_file_dir, stripped_clean_data_file, separator='-----------', output_dir='final-csvs'):
try:
    if not os.path.isdir(patient_raw_file_dir):
        print(f"Error: {patient_raw_file_dir} is not a valid directory.")
        return

    if not os.path.isfile(stripped_clean_data_file):
        print(f"Error: {stripped_clean_data_file} does not exist.")
        return

    # Read clean data
    with open(stripped_clean_data_file, 'r') as clean_data_file_var:
        clean_data_master_list = [line.strip() for line in clean_data_file_var.readlines()]
    clean_data_list_array = split_sublist(clean_data_master_list, separator)

    mismatch = 'true'

    for clean_data_list in clean_data_list_array:
        mismatch_counter = 0
        # Create a list to hold patient data tasks
        tasks = []
        # Iterate over patient raw files asynchronously
        for index, patient_raw_file in enumerate(os.listdir(patient_raw_file_dir)):
            if patient_raw_file.endswith(".txt"):
                patient_filepath = os.path.join(patient_raw_file_dir, patient_raw_file)

                # Create tasks for reading patient raw data asynchronously
                tasks.append(async_read_file(patient_filepath))

        # Execute tasks concurrently
        patient_raw_data_lists = await asyncio.gather(*tasks)

        for patient_raw_data_list in patient_raw_data_lists:
            for word in clean_data_list:
                if word in patient_raw_data_list:
                    mismatch = 'false'
                elif "NaN" in word:
                    pass
                else:
                    mismatch = 'true'
                    with open(f'{current_directory}/mismatched_data.txt', 'a') as file:
                        file.writelines(word + 'n')

            if mismatch == 'false':
                await process_iteration(f'{current_directory}/data/extracted_clean_data.txt', f'{current_directory}/data/CSV', patient_name)

            elif mismatch == 'true':
                mismatch_counter = mismatch_counter + 1

        stripped_clean_data_file = os.path.join(output_dir, f"{patient_name}.csv")

except Exception as e:
    print(f"Error in compare_patients_data: {e}")
async def convert_to_csv(clean_tables_dir):
await asyncio.sleep(1)
patient_data = {}

patient_raw_data_dir = f"{current_directory}/patient_raw_data"
csv_output_folder = f"{current_directory}/data/Revised_Text_Files"

for raw_patient_text_files in os.listdir(patient_raw_data_dir):
   raw_patient_text_files_path = os.path.join(patient_raw_data_dir,raw_patient_text_files)

   print(f"IM GETTING ALL THE RAW DATA TEXT FILES {raw_patient_text_files}")


   with open(raw_patient_text_files_path, 'r') as read_patientfile:
       lines = read_patientfile.readlines()

       print("YOHOO I AM READING ALL THE PATIENT TEXT FILESn")
       for i,line in enumerate(lines):
           if line.startswith("Patient Name") or "Patient Name" in line:
               patient_name_line = line.strip()
               patient_data[patient_name_line] = None
           elif line.startswith("Patient Account") or "Patient Account" in line:
               if i+4 < len(lines):
                   patient_account_info = lines[i + 4].strip()
                   patient_account_info = patient_account_info.replace("Patient Name : ","")
                   if patient_name_line:
                       patient_data[patient_name_line] = patient_account_info
     
# print(patient_data)
for clean_tables in os.listdir(clean_tables_dir):

    print("YOHOO I AM INTO THE CLEAN TABLES LOOP")
    if clean_tables.endswith('.txt'):
        clean_tables_filename = os.path.basename(clean_tables)
        filename_without_extension = os.path.splitext(os.path.basename(clean_tables_filename))[0]
        filename_without_extension = filename_without_extension.replace('.txt', '')
        filename_without_extension = filename_without_extension[2:]
        clean_tables_path = os.path.join(clean_tables_dir, clean_tables_filename)
        added_pname = f"Patient Name : {filename_without_extension}"

        print(f"Filename without extension {filename_without_extension} ")

        # remove_headers(clean_tables_path)
        with codecs.open(clean_tables_path, 'r', encoding='utf-8') as codec_txtfile:

            print("YOHOO I AM OPENING THE CLEAN TABLE FILES")
            codec_txtfile.seek(0)
            first_char = codec_txtfile.read(1)

            if not first_char:
                print(f"The file {clean_tables_filename} is empty")
            else:
                df = pd.read_fwf(clean_tables_path)
                # Construct the output CSV file path
                csv_output_path = os.path.splitext(clean_tables_path)[0] + '.csv'

                column_names = ['SERVICE_DATES','PL',  'SERVICE_CODE','NUM_SVCS','SUBMITTED_CHARGES','NEGOTIATED_AMOUNT','COPAY_AMOUNT','NOT_PAYABLE','SEE_REMARKS','DEDUCTIBLE', 'CO_INSURANCE', 'PATIENT_RESP','PAYABLE_AMOUNT','Patient_Name','Account_Id']

                df['Patient_Name'] = f"{filename_without_extension}"
                
                for key, value in patient_data.items():
                    if key == added_pname:
                        # If a match is found, assign the corresponding account ID
                        account_id = value
                        print("IT MATCHEDn")
                        print(f"Key {key}")
                        print(f"{added_pname}")
                        break
                    else:
                        print("DID NOT MATCH")
                        print(f"Key : {key} , pNAME : {added_pname}")
                df['Account_Id'] = account_id
                df.columns = column_names
                df.to_csv(csv_output_path, sep=',', index=False)
                os.chmod(csv_output_path, 0o777)

header_pattern = "SERVICE_DATES"

 import os

async def remove_headers_csv(input_csv_dir, header_pattern):
await asyncio.sleep(1)
for csv_file in os.listdir(input_csv_dir):
    if csv_file.endswith(".csv"):
        csv_file_path = os.path.join(input_csv_dir, csv_file)
        # Read the CSV file
        with open(csv_file_path, 'r') as file:
            lines = file.readlines()
        # Remove lines containing the header pattern
        lines = [line for line in lines if header_pattern not in line]
        # Write back to the CSV file
        with open(csv_file_path, 'w') as file:
            file.writelines(lines)

async def merge_csvs(csv_output_directory):

await asyncio.sleep(1)
# Initialize an empty list to store DataFrames
data_frames = []

# Iterate through each file in the directory
for clean_csv_files in os.listdir(csv_output_directory):
     
    if clean_csv_files.endswith('.csv'):
        print(clean_csv_files)
        # Load the CSV file into a DataFrame asynchronously
        file_path = os.path.join(csv_output_directory, clean_csv_files)
        loop = asyncio.get_running_loop()
        read_csv_partial = partial(pd.read_csv, file_path)
        print(f"Reading CSV file: {file_path}")
        data = await loop.run_in_executor(None, read_csv_partial)
        print(f"Number of rows read from {file_path}: {len(data)}")
        
        # Append the DataFrame to the list
        data_frames.append(data)

# Debug print to check the contents of data_frames
print("Data frames:", data_frames)

# Check if data_frames is empty
if not data_frames:
    print("No CSV files found to merge.")
    return

# Concatenate all DataFrames in the list along the rows
combined_data = pd.concat(data_frames, ignore_index=True)

# Define headers
headers = ['SERVICE_DATES', 'PL', 'SERVICE_CODE', 'NUM_SVCS', 'SUBMITTED_CHARGES',      'NEGOTIATED_AMOUNT', 
           'COPAY_AMOUNT', 'NOT_PAYABLE', 'SEE_REMARKS', 'DEDUCTIBLE', 'CO_INSURANCE',   'PATIENT_RESP', 
           'PAYABLE_AMOUNT', 'Patient_Name', 'Account_Id']
# Assign headers to the combined DataFrame
combined_data.columns = headers

# Define the path for the combined CSV file
combined_file_path = f'{current_directory}/master_csv.csv'
write_csv_partial = partial(combined_data.to_csv, combined_file_path, sep=',', index=False)
await loop.run_in_executor(None, write_csv_partial)
print("Operation completed successfully. Combined CSV file saved at:", combined_file_path)

# asyncio.run(radar_scan())
extractPages(input_pdf,keywords,extracted_pdf)
extract_patients_raw_data(extracted_pdf)
extract_patients_clean_data(output_dir,f'{current_directory}/data/extracted_clean_data.txt')
remove_headers_from_clean_data(f'{current_directory}/data/extracted_clean_data.txt')
asyncio.run(compare_patients_data(patient_raw_file_dir,stripped_clean_data_file))
asyncio.run(segregate_patient_data(f"{current_directory}/src/all_patients_raw_data.txt",          patient_output_dir))
asyncio.run(convert_to_csv(f'{current_directory}/data/Revised_Text_Files')) 
asyncio.run(merge_csvs(f'{current_directory}/data/Revised_Text_Files'))`

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật