I try to parse Kafka using the code bellow:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession
.builder
.appName("StructuredSocketRead")
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","***.compute-1.amazonaws.com:9092")
.option("subscribe","kafka-new-topic")
.load()
# Transform to Output DataFrame
schema = StructType([
StructField("card_id", LongType()),
StructField("member_id", LongType()),
StructField("amount", LongType()),
StructField("pos_id", LongType()),
StructField("postcode", LongType()),
StructField("transaction_dt", StringType()),
])
value_df = df.select(from_json(col("value").cast("string"),schema).alias("value"))
exploded_df = value_df.select("value.*")
query = exploded_df
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
But the outcome is all null values.
I try to get the value by using the code bellow:
kafkaDF = lines.selectExpr("cast(key as string)","cast(value as string)")
query = kafkaDF
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
And the result is:
Batch: 1
-------------------------------------------
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|"{"card_id":348702330256514,"member_id":37495066290,"amount":4380912,"postcode":96774,"pos_id":248063406800722,"transaction_dt":"01-03-2018 08:24:29"}n" |
|"{"card_id":348702330256514,"member_id":37495066290,"amount":6703385,"postcode":84758,"pos_id":786562777140812,"transaction_dt":"02-06-2018 04:15:03"}n" |
|"{"card_id":348702330256514,"member_id":37495066290,"amount":7454328,"postcode":93645,"pos_id":466952571393508,"transaction_dt":"12-02-2018 09:56:42"}n" |
I don’t know what was wrong here. Can someone please tell me why and how to resolve this issue?
I also try to change the data type in schema but no luck.