`I have written a python program to convert PDF into text files, those text files iinto more cleaned text files which are converted into CSV files which are later converted into a single CSV file.
The problem is the whole program does not execute at one go. When i execute it once, All the text files are created. But the CSV files are only generated when i execute the program again, after generation of the text files, How to fix this problem? I have tried using async await but apparently its not working
As per my assesment the problem could be in
async def merge_csvs
async def convert_to_csv(clean_tables_dir)
async def segregate_patient_data(all_patients_raw_data, output_dir)
async def compare_patients_data
Any help would be greatly appreciated
Here is my code
` from PyPDF2 import PdfReader, PdfWriter
import re
from pdfminer.high_level import extract_text
import os
import csv
import fitz
import pandas as pd
import codecs
import asyncio
from functools import partial
from concurrent.futures import ThreadPoolExecutor
current_directory = os.getcwd()
print(f"Current Working Directory {current_directory}")
clean_data_file = f"{current_directory}/data/extracted_clean_data.txt"
patient_raw_file_dir = f"{current_directory}/patient_raw_data"
csv_dir = f"{current_directory}data/output_csvs"
all_patients_raw_data = f'{current_directory}/src/all_patients_raw_data.txt'
input_pdf = f'{current_directory}/eg.pdf'
keywords = ['PatientName'] # List containing the multi-word keyword
extracted_pdf = f"{current_directory}/data/extracted_pages.pdf"
output_dir = f'{current_directory}/data/splitpdfs'
output_text_dir = f'{current_directory}/data/textfiles'
output_folder_path = f"{current_directory}/data/extracted_clean_tables" # Folder to save text files
patient_text_files_rl = f"{current_directory}/data/Revised_Text_Files"
print(f"Successfully processed PDFs in '{output_dir}'. Check '{output_text_dir}' for text files.")
if not os.path.exists(all_patients_raw_data):
# Ensure the directory exists
os.makedirs(os.path.dirname(all_patients_raw_data), exist_ok=True)
# Step 1: Extracting pages containing required keywords from the pdf file and making a separate PDF for the same
async def extractPages(input_pdf, keywords, extracted_pdf):
try:
# Initializing Reader and Writer
reader = PdfReader(open(input_pdf, 'rb'))
writer = PdfWriter()
# Iterate through each page asynchronously
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text = page.extract_text()
# Search for any keyword in the text (case-insensitive)
if any(keyword in text for keyword in keywords):
writer.add_page(page)
# Save the new PDF with required pages only after iterating all pages
with open(extracted_pdf, 'wb') as stripped_clean_data_file:
writer.write(stripped_clean_data_file)
except Exception as e:
print(f"Error {e}")
import tabula
def extract_patients_clean_data(folder_path, output_file_path):
text_data = "" # Accumulate text data across all PDFs
table_counter = 0
pdf_to_text_output_folder = f'{current_directory}/data/Revised_Text_Files'
patient_names_list = []
for patient_names in os.listdir(f'{current_directory}/patient_raw_data'):
if patient_names.endswith('.txt'):
patient_file_name = os.path.basename(patient_names)
patient_names_list.append(patient_file_name)
patient_names_list.sort()
pdf_path_dir = f'{current_directory}/eg.pdf'
pdf_data = tabula.read_pdf(pdf_path_dir, pages="all")
patient_names = []
for index, (page_data, patient_name) in enumerate(zip(pdf_data, patient_names_list)):
print(f"Number of Tables {len(page_data)}")
table_counter += 1 # Assuming table_counter is initialized somewhere
# Check if the index is within the range of patient_names_list
if index < len(patient_names_list):
# Construct the filename using the patient name at the current index
clean_table_filename = f"{patient_names_list[index]}.txt"
clean_table_filepath = os.path.join(pdf_to_text_output_folder, clean_table_filename)
# Write the page data to the text file
with open(clean_table_filepath, 'w') as clean_table_text_file_fs:
clean_table_text_file_fs.write(page_data.to_string(index=False))
else:
print("Error: Index out of range for patient_names_list")
# remove_headers_from_clean_data(clean_table_filepath)
text_data += page_data.to_string(index=False) + f"nnnnn-------------------------------------------------------------------------------------nnnn" # Add separator between pages
# Write accumulated text data to the output file
with open(output_file_path, "w", encoding="utf-8") as f:
f.write(text_data)
print(f"Master table file created and saved to '{output_file_path}'")
print(f"NO OF TABLES IN TABULA {table_counter}")
# Step 6 : Remove Headers from the text file
def remove_headers(input_tables):
with open(input_tables, 'r') as f:
lines = f.readlines()
with open(input_tables, 'w') as g:
for line in lines:
if ' SERVICE' not in line:
if ' DATES' not in line:
if ' SERVICE' not in line:
if ' DATES' not in line:
g.write(line)
def remove_headers_from_clean_data(input_tables):
with open(input_tables, 'r') as f:
lines = f.readlines()
with open(input_tables, 'w') as g:
for line in lines:
if ' SERVICE' not in line:
if ' DATE' not in line:
if ' SERVICE' not in line:
if ' DATES' not in line:
g.write(line)
print("Headers removed from Master Table filen")
def extract_patients_raw_data(pdf_path):
"""Extracts tables and associates them with patient IDs, writing to a text file.
Args:
pdf_folder (str): Path to the folder containing PDFs.
stripped_clean_data_file (str): Path to the output text file.
"""
with fitz.open(pdf_path) as doc:
text = ""
stripped_clean_data_file = all_patients_raw_data
for page in doc:
text += page.get_text("text") # No need for joining here
with open(stripped_clean_data_file, 'w') as f: # Open outside the loop, write once
f.write(text)
import re
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
# Segregaing the patients pdf text data into individual patient text files based on the number of tables
import asyncio
import os
async def segregate_patient_data(all_patients_raw_data, output_dir):
"""
Segregates patient data from a text file and writes to individual files.
Args:
all_patients_raw_data: The path to the text file.
output_dir: The directory to store individual patient files.
"""
current_patient = ""
current_patient_data = ""
patient_data_started = False # Flag to indicate if patient data has started
patient_data_txt_index = 0
patient_raw_file_dir = f"{current_directory}/patient_raw_data"
with open(all_patients_raw_data, 'r') as f:
for line in f:
line = line.rstrip() # Remove trailing newline character
if line.startswith("Patient Name:"):
if current_patient and current_patient_data: # Write data if previous patient data exists
with open(f"{output_dir}/{patient_data_txt_index}_{current_patient}.txt", 'w') as patient_raw_file:
patient_raw_file.write(current_patient_data)
else:
patient_data_txt_index = patient_data_txt_index + 1
current_patient = line[len("Patient Name: "):].strip() # Extract name without prefix
current_patient_refined = current_patient.replace(' ','-')
current_patient_data = f"nPatient Name : {current_patient}n" # Reset data for new patient
patient_data_started = True # Set flag to indicate patient data has started
elif line.startswith("For Questions Regarding This Claim"):
if current_patient and current_patient_data:
current_patient_data += line + "n" # Add ending line
with open(f"{output_dir}/{patient_data_txt_index}_{current_patient_refined}.txt", 'w') as patient_raw_file:
patient_raw_file.write(current_patient_data)
current_patient = ""
current_patient_data = ""
patient_data_started = False # Reset flag as patient data ended
elif patient_data_started: # Only process lines if patient data has started
current_patient_data += line + "n" # Append line to current patient data
# Write any remaining patient data after reaching end of file
if current_patient and current_patient_data:
with open(f"{output_dir}/{patient_data_txt_index}_{current_patient}.txt", 'a') as patient_file:
patient_file.write(current_patient_data)
for index, patient_txt_file in enumerate(os.listdir(patient_raw_file_dir), start=0):
if patient_txt_file.endswith(".txt"):
patient_filepath = os.path.join(patient_raw_file_dir, patient_txt_file)
with open(patient_filepath, 'r') as f:
f.seek(0, 0)
file_content = f.read()
words_to_scan = ['NEGOTIATED', 'AMOUNT', 'COPAY', 'SUBMITTED', 'CHARGES', 'AMOUNT']
if not any(word in file_content for word in words_to_scan):
os.remove(patient_filepath)
patient_output_dir = f”{current_directory}/patient_raw_data” # Adjust as needed
from collections import defaultdict
def sort_words_in_file(clean_data_file, stripped_clean_data_file):
with open(clean_data_file, 'r') as f:
lines = f.readlines()
sorted_lines = []
for line in lines:
words = line.split() # Split line into words
sorted_words = sorted(words, key=lambda x: x.strip()) # Sort words alphabetically after removing white spaces
sorted_line = 'n'.join(sorted_words) # Join sorted words back into a line
sorted_lines.append(sorted_line)
with open(stripped_clean_data_file, 'w') as f:
f.writelines(sorted_lines)
stripped_clean_data_file = f'{current_directory}/stripped_clean_data.txt'
sort_words_in_file(clean_data_file,stripped_clean_data_file)
def extract_and_remove_data(file_path, output_dir):
try:
if not os.path.isfile(file_path):
print(f"Error: {file_path} does not exist.")
return
with open(file_path, 'r') as file:
lines = file.readlines()
separator_index = -1
for i, line in enumerate(lines):
if line.count('-') >= 10:
separator_index = i
break
if separator_index == -1:
print("Separator not found in the file.")
return
copied_data = lines[:separator_index]
if not os.path.exists(output_dir):
os.makedirs(output_dir)
file_name = os.path.basename(file_path)
output_file_path = os.path.join(output_dir, file_name)
with open(output_file_path, 'w') as stripped_clean_data_file:
stripped_clean_data_file.writelines(copied_data)
with open(file_path, 'w') as file:
file.writelines(lines[separator_index + 1:])
return output_file_path
except Exception as e:
print(f"Error in extract_and_remove_data: {e}")
def verify_patient_raw_data_content(patient_raw_file_dir,patient_came_extra,patient_data_txt_index):
try:
if not os.path.isdir(patient_raw_file_dir):
print(f"Error: {patient_raw_file_dir} is not a valid directory.")
return
patient_raw_file_dir = f"{current_directory}/patient_raw_data"
for index, patient_raw_file in enumerate(os.listdir(patient_raw_file_dir)):
if patient_raw_file.endswith(".txt"):
patient_filepath = os.path.join(patient_raw_file_dir, patient_raw_file)
with open(patient_filepath, 'r') as patient_raw_text_file:
patient_raw_text_data = patient_raw_text_file.read()
words_to_scan = ['NEGOTIATED','AMOUNT','COPAY','SUBMITTED','CHARGES','AMOUNT']
if not any(word in patient_raw_text_data for word in words_to_scan):
os.remove(patient_filepath)
patient_came_extra = True
patient_data_txt_index = patient_data_txt_index -1
print('COUNTER DECREMENTED')
except Exception as e:
print(f"Error {e}")
async def async_read_file(file_path):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
async with loop.run_in_executor(pool, open, file_path, 'r') as file:
return [line.strip() async for line in file]
async def async_read_file(file_path):
with open(file_path, 'r') as file:
return [line.strip() for line in file.readlines()]
async def compare_patients_data(patient_raw_file_dir, stripped_clean_data_file, separator='-----------', output_dir='final-csvs'):
try:
if not os.path.isdir(patient_raw_file_dir):
print(f"Error: {patient_raw_file_dir} is not a valid directory.")
return
if not os.path.isfile(stripped_clean_data_file):
print(f"Error: {stripped_clean_data_file} does not exist.")
return
# Read clean data
with open(stripped_clean_data_file, 'r') as clean_data_file_var:
clean_data_master_list = [line.strip() for line in clean_data_file_var.readlines()]
clean_data_list_array = split_sublist(clean_data_master_list, separator)
mismatch = 'true'
for clean_data_list in clean_data_list_array:
mismatch_counter = 0
# Create a list to hold patient data tasks
tasks = []
# Iterate over patient raw files asynchronously
for index, patient_raw_file in enumerate(os.listdir(patient_raw_file_dir)):
if patient_raw_file.endswith(".txt"):
patient_filepath = os.path.join(patient_raw_file_dir, patient_raw_file)
# Create tasks for reading patient raw data asynchronously
tasks.append(async_read_file(patient_filepath))
# Execute tasks concurrently
patient_raw_data_lists = await asyncio.gather(*tasks)
for patient_raw_data_list in patient_raw_data_lists:
for word in clean_data_list:
if word in patient_raw_data_list:
mismatch = 'false'
elif "NaN" in word:
pass
else:
mismatch = 'true'
with open(f'{current_directory}/mismatched_data.txt', 'a') as file:
file.writelines(word + 'n')
if mismatch == 'false':
await process_iteration(f'{current_directory}/data/extracted_clean_data.txt', f'{current_directory}/data/CSV', patient_name)
elif mismatch == 'true':
mismatch_counter = mismatch_counter + 1
stripped_clean_data_file = os.path.join(output_dir, f"{patient_name}.csv")
except Exception as e:
print(f"Error in compare_patients_data: {e}")
async def convert_to_csv(clean_tables_dir):
await asyncio.sleep(1)
patient_data = {}
patient_raw_data_dir = f"{current_directory}/patient_raw_data"
csv_output_folder = f"{current_directory}/data/Revised_Text_Files"
for raw_patient_text_files in os.listdir(patient_raw_data_dir):
raw_patient_text_files_path = os.path.join(patient_raw_data_dir,raw_patient_text_files)
print(f"IM GETTING ALL THE RAW DATA TEXT FILES {raw_patient_text_files}")
with open(raw_patient_text_files_path, 'r') as read_patientfile:
lines = read_patientfile.readlines()
print("YOHOO I AM READING ALL THE PATIENT TEXT FILESn")
for i,line in enumerate(lines):
if line.startswith("Patient Name") or "Patient Name" in line:
patient_name_line = line.strip()
patient_data[patient_name_line] = None
elif line.startswith("Patient Account") or "Patient Account" in line:
if i+4 < len(lines):
patient_account_info = lines[i + 4].strip()
patient_account_info = patient_account_info.replace("Patient Name : ","")
if patient_name_line:
patient_data[patient_name_line] = patient_account_info
# print(patient_data)
for clean_tables in os.listdir(clean_tables_dir):
print("YOHOO I AM INTO THE CLEAN TABLES LOOP")
if clean_tables.endswith('.txt'):
clean_tables_filename = os.path.basename(clean_tables)
filename_without_extension = os.path.splitext(os.path.basename(clean_tables_filename))[0]
filename_without_extension = filename_without_extension.replace('.txt', '')
filename_without_extension = filename_without_extension[2:]
clean_tables_path = os.path.join(clean_tables_dir, clean_tables_filename)
added_pname = f"Patient Name : {filename_without_extension}"
print(f"Filename without extension {filename_without_extension} ")
# remove_headers(clean_tables_path)
with codecs.open(clean_tables_path, 'r', encoding='utf-8') as codec_txtfile:
print("YOHOO I AM OPENING THE CLEAN TABLE FILES")
codec_txtfile.seek(0)
first_char = codec_txtfile.read(1)
if not first_char:
print(f"The file {clean_tables_filename} is empty")
else:
df = pd.read_fwf(clean_tables_path)
# Construct the output CSV file path
csv_output_path = os.path.splitext(clean_tables_path)[0] + '.csv'
column_names = ['SERVICE_DATES','PL', 'SERVICE_CODE','NUM_SVCS','SUBMITTED_CHARGES','NEGOTIATED_AMOUNT','COPAY_AMOUNT','NOT_PAYABLE','SEE_REMARKS','DEDUCTIBLE', 'CO_INSURANCE', 'PATIENT_RESP','PAYABLE_AMOUNT','Patient_Name','Account_Id']
df['Patient_Name'] = f"{filename_without_extension}"
for key, value in patient_data.items():
if key == added_pname:
# If a match is found, assign the corresponding account ID
account_id = value
print("IT MATCHEDn")
print(f"Key {key}")
print(f"{added_pname}")
break
else:
print("DID NOT MATCH")
print(f"Key : {key} , pNAME : {added_pname}")
df['Account_Id'] = account_id
df.columns = column_names
df.to_csv(csv_output_path, sep=',', index=False)
os.chmod(csv_output_path, 0o777)
header_pattern = "SERVICE_DATES"
import os
async def remove_headers_csv(input_csv_dir, header_pattern):
await asyncio.sleep(1)
for csv_file in os.listdir(input_csv_dir):
if csv_file.endswith(".csv"):
csv_file_path = os.path.join(input_csv_dir, csv_file)
# Read the CSV file
with open(csv_file_path, 'r') as file:
lines = file.readlines()
# Remove lines containing the header pattern
lines = [line for line in lines if header_pattern not in line]
# Write back to the CSV file
with open(csv_file_path, 'w') as file:
file.writelines(lines)
async def merge_csvs(csv_output_directory):
await asyncio.sleep(1)
# Initialize an empty list to store DataFrames
data_frames = []
# Iterate through each file in the directory
for clean_csv_files in os.listdir(csv_output_directory):
if clean_csv_files.endswith('.csv'):
print(clean_csv_files)
# Load the CSV file into a DataFrame asynchronously
file_path = os.path.join(csv_output_directory, clean_csv_files)
loop = asyncio.get_running_loop()
read_csv_partial = partial(pd.read_csv, file_path)
print(f"Reading CSV file: {file_path}")
data = await loop.run_in_executor(None, read_csv_partial)
print(f"Number of rows read from {file_path}: {len(data)}")
# Append the DataFrame to the list
data_frames.append(data)
# Debug print to check the contents of data_frames
print("Data frames:", data_frames)
# Check if data_frames is empty
if not data_frames:
print("No CSV files found to merge.")
return
# Concatenate all DataFrames in the list along the rows
combined_data = pd.concat(data_frames, ignore_index=True)
# Define headers
headers = ['SERVICE_DATES', 'PL', 'SERVICE_CODE', 'NUM_SVCS', 'SUBMITTED_CHARGES', 'NEGOTIATED_AMOUNT',
'COPAY_AMOUNT', 'NOT_PAYABLE', 'SEE_REMARKS', 'DEDUCTIBLE', 'CO_INSURANCE', 'PATIENT_RESP',
'PAYABLE_AMOUNT', 'Patient_Name', 'Account_Id']
# Assign headers to the combined DataFrame
combined_data.columns = headers
# Define the path for the combined CSV file
combined_file_path = f'{current_directory}/master_csv.csv'
write_csv_partial = partial(combined_data.to_csv, combined_file_path, sep=',', index=False)
await loop.run_in_executor(None, write_csv_partial)
print("Operation completed successfully. Combined CSV file saved at:", combined_file_path)
# asyncio.run(radar_scan())
extractPages(input_pdf,keywords,extracted_pdf)
extract_patients_raw_data(extracted_pdf)
extract_patients_clean_data(output_dir,f'{current_directory}/data/extracted_clean_data.txt')
remove_headers_from_clean_data(f'{current_directory}/data/extracted_clean_data.txt')
asyncio.run(compare_patients_data(patient_raw_file_dir,stripped_clean_data_file))
asyncio.run(segregate_patient_data(f"{current_directory}/src/all_patients_raw_data.txt", patient_output_dir))
asyncio.run(convert_to_csv(f'{current_directory}/data/Revised_Text_Files'))
asyncio.run(merge_csvs(f'{current_directory}/data/Revised_Text_Files'))`