I am using the below CSV File for fetching the corrupted records and storing it for later use:
id,name,age,salary,address,nominee
1,Manish,26,75000,bihar,nominee1
2,Nikita,23,100000,uttarpradesh,nominee2
3,Pritam,22,150000,Bangalore,India,nominee3
4,Prantosh,17,200000,Kolkata,India,nominee4
5,Vikash,31,300000,,nominee5
The Pyspark code used is as below :
emp_schema = StructType(
[
StructField("id",IntegerType(),True),
StructField("name",StringType(),True),
StructField("age",IntegerType(),True),
StructField("salary",IntegerType(),True),
StructField("address",StringType(),True),
StructField("nominee",StringType(),True),
StructField("_corrupt_record",StringType(),True),
]
)
df_after = spark.read.format("csv")
.option("header","true")
.schema(emp_schema)
.option("mode","PERMISSIVE")
.option("badrecords","/FileStore/tables/badrecords")
.load("/FileStore/tables/corrupt-2.csv")
On running the above code I should get the records with ID : 1,2 and 5 as output and a file created but in the ouput I am getting all the records as output and no file is created.
Output :
id| name|age|salary| address| nominee| _corrupt_record|
+---+-------+---+------+------------+--------------------+--------------------+
| 1| Manish| 26| 75000| bihar| nominee1| null|
| 2| Nikita| 23|100000|uttarpradesh|nominee2 ...| null|
| 3| Pritam| 22|150000| Bangalore| India|3,Pritam,22,15000...|
| 4|Prantos| 17|200000| Kolkata| India|4,Prantos,17,2000...|
| 5| Vikash| 31|300000| null|nominee5 ...| null|
+---+-------+---+------+------------+--------------------+--------------------+