I have an use case to write data in list to S3 parallelly.
The list I have is a list of lists -> [[guid1, guid2], [guid3, guid4],...]
The function get_guids_combined()
is responsible for returning the above list
I have to parallelize writing for each list in a list by filtering it from main DF.
I am facing issues when using sparkContext (sc). It’s getting executed on the worker node, where as we are only supposed to execute it on the driver. How do I achieve the same circumventing this problem
Code:
def group_and_write_data_to_s3(main_df):
guid_list_with_count = (
main_df.groupby('call_guid').count().sort('count').collect())
list_groups = get_guids_combined(guid_list_with_count)
list_groups_broadcast = sc.broadcast(list_groups)
# Define the function to process each group and write to S3
def process_group(group_index):
list_groups = list_groups_broadcast.value
single_group = list_groups[group_index]
file_num = group_index + 1
# Filter the DataFrame by the GUIDs in the current group
filtered_df = content_references_df.filter(col('call_guid').isin(single_group))
output_path = f's3://{NAME_BUCKET}/{PAT_GUID}/go/cr/{file_num}'
filtered_df.write.json(output_path)
spark.sparkContext.parallelize(range(len(list_groups_broadcast.value))).foreach(process_group)
The error happens here:
spark.sparkContext.parallelize(range(len(list_groups_broadcast.value))).foreach(process_group)
Error:
It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.