I have read streaming data and then I tried to apply a function on it. I want to write the return value of that function to a kafka topic. Here, I have shown a sample of my code trying to write to console.
from pyspark.sql import SparkSession
.....
spark = SparkSession.builder.appName("test_mc_uni").master("local[*]").config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0").getOrCreate()
jsonSchema = StructType([StructField("abc", StringType(), True),StructField("bcd", StringType(), True),StructField("cde", StringType(), True)])
df = (spark.readStream.format("kafka").option("kafka.bootstrap.servers",inputBrokers )
.option("subscribe", inputTopics).option("kafka.group.id", groupid).option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", "false").load().selectExpr("CAST(value AS STRING)").select(
from_json(col("value"), jsonSchema).alias("parsed_data")).select("parsed_data.*")
.select('abc','bcd','cde'))
try:
def predict_on_batch(batch_df):
pdf=batch_df.toPandas()
pdf=pdf.fillna(0)
.........
dff=pdf.to_json(orient='records', lines=True)
return dff
query = (
predict_on_batch(df)
.writeStream
.outputMode("append")
.format("console")
# .foreachBatch(predict_on_batch)
.start().awaitTermination()
)
-
Here, when I directly apply the function on streaming data, I get the error as : Queries with streaming sources must be executed with writeStream.start();
-
Then I replaced the code as follows and tried to apply function on a batch.
try:
def predict_on_batch(batch_df,epochid):
pdf=batch_df.toPandas()
pdf=pdf.fillna(0)
.........
dff=pdf.to_json(orient='records', lines=True)
dff.write.format("console")
query = (
df
.writeStream
.outputMode("append")
.format("console")
.foreachBatch(predict_on_batch)
.start().awaitTermination()
)
But the error is: dff.write.format(“console”) AttributeError: ‘str’ object has no attribute write’
**
May I know how to write the dff (json format) to console ?