I have a S3 bucket with parquet files partitioned by a column that serves as an “id” for different jsons that we get. The thing is, even with those ids, the jsons can have variable schemas, even if they have a “pattern”, so I don’t have proper “schemas”, it’ll always be somewhat different.
What I’m trying:
Read the parquet file by partitions so I get less data to sift through to get a “final schema”
With that, I try to explode the json to get all the columns from the nested json
I’m running it as a test on an EMR cluster with two r5a.16xlarge instances.
Here’s the code that I’m having trouble to get to work:
from pyspark.sql.functions import col, explode_outer
from pyspark.sql.types import *
from copy import deepcopy
from collections import Counter
json_df = spark.read.option("multiline", "true").json(df_test_filter_partition.select('data_new').rdd.flatMap(list)).rdd.map(lambda row: row.json)
json_schema = json_df.schema
I don’t get an error related to the code, per se, but it seems to “timeout”? Here’s the stack:
An error was encountered: An error occurred while calling o138.json. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 9.0 failed 4 times, most recent failure: Lost task 1.3
in stage 9.0 (TID 128) ([2600:1f18:2888:3501:c535:130a:72dd:d39b]
executor 16): ExecutorLostFailure (executor 16 exited caused by one of
the running tasks) Reason: Unknown executor exit code (137) (died from
signal 9?) Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3067)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3003)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3002)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3002)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1318)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1318)
at scala.Option.foreach(Option.scala:407) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1318)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3271)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1041)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2406) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2501) at
org.apache.spark.sql.catalyst.json.JsonInferSchema.infer(JsonInferSchema.scala:116)
at
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$inferFromDataset$5(JsonDataSource.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
at
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:109)
at
org.apache.spark.sql.DataFrameReader.$anonfun$json$4(DataFrameReader.scala:416)
at scala.Option.getOrElse(Option.scala:189) at
org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:416)
at
org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
at
org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:377)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at
py4j.Gateway.invoke(Gateway.java:282) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.GatewayConnection.run(GatewayConnection.java:238) at
java.base/java.lang.Thread.run(Thread.java:840)
How should I approach this? Can I try to explode the json without having a set schema?