I am working on a project where I need to process a large amount of Betfair historical betting data provided in a tar file. This tar file contains multiple bz2 files, each of which includes a single file with a JSON object per line.
The current approach involves extracting data from these bz2 files, parsing the JSON, and then loading the data into Pandas dataframes for further analysis. I’ve utilized concurrent.futures
for parallel processing and tried to optimize file reading and data handling as much as possible. However, I’m looking for ways to further optimize this process, particularly in terms of reducing the overall processing time and improving memory usage.
Here is the relevant part of the code:
import tarfile
import bz2
import json
import pandas as pd
import io
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import time
def process_bz2_data(bundle):
market_definition_list = []
normal_entry_list = []
for data in bundle:
with bz2.open(io.BytesIO(data), 'rt', encoding='utf-8') as f:
for line in f:
try:
json_data = json.loads(line)
pt_time = json_data['pt']
mc_data = json_data['mc']
for info in mc_data:
if 'marketDefinition' in info:
market_definition_list.append({
'market_id': info['id'],
'start_datetime': info['marketDefinition']['openDate'],
**{f'runner_{i}_id': v['id'] for i, v in enumerate(info['marketDefinition']['runners'])},
**{f'runner_{i}_name': v['name'] for i, v in enumerate(info['marketDefinition']['runners'])},
})
else:
market_id = info['id']
for change in info['rc']:
last_traded_price = change.get('ltp', None)
runner_id = change['id']
normal_entry_list.append({
'time': pt_time,
'market_id': market_id,
'runner_id': runner_id,
'last_traded_price': last_traded_price
})
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
except UnicodeDecodeError as e:
print(f"Error decoding Unicode: {e}")
return market_definition_list, normal_entry_list
def load_json_to_dataframes(tar_file_path, batch_size=10):
results = []
with tarfile.open(tar_file_path, 'r') as tar:
initial = time.perf_counter()
files_to_process = [tar.extractfile(member).read() for member in tar.getmembers() if member.isfile() and member.name.endswith('.bz2')] # 41 seconds
print(f"Elapsed time (Reading Files): {time.perf_counter() - initial:.2f} seconds")
print(f"We have {len(files_to_process)} files to process.") # 183598
batches = [files_to_process[i:i + batch_size] for i in range(0, len(files_to_process), batch_size)]
with ProcessPoolExecutor() as executor:
futures = [executor.submit(process_bz2_data, batch) for batch in batches]
for future in tqdm(as_completed(futures), total=len(futures), desc="Processing files"): # 3 mins 51 seconds
result = future.result()
if result:
results.extend(result)
initial = time.perf_counter()
market_definitions = [item for result in results[0] for item in result]
normal_entries = [item for result in results[1] for item in result]
print(f"Elapsed time (Splitting Data): {time.perf_counter() - initial:.2f} seconds") # 0.00 seconds (does not seem right)
initial = time.perf_counter()
market_definition_df = pd.DataFrame(market_definitions)
normal_entry_df = pd.DataFrame(normal_entries)
print(f"Elapsed time (Creating DataFrames): {time.perf_counter() - initial:.2f} seconds") # 0.06 secodns (does not seem right)
# Takes a couple of seconds before displaying the sucess message, not sure why.
return market_definition_df, normal_entry_df
if __name__ == '__main__':
tar_file_path = "./data (6).tar"
market_definition_df, normal_entry_df = load_json_to_dataframes(tar_file_path)
print('DataFrames loaded successfully.')