I’m using a thread from the threading package to launch a function that performs spark streaming. I want to stop the thread inside the process
function when a condition is met.
import threading
import asyncio
import functools
from pyspark.sql import SparkSession
threading.Thread(target=streaming_to_consumer_wrapper).start()
async def process(df, df_id):
if df_id == 2:
# I want to stop the thread here
def streaming_to_consumer_wrapper():
asyncio.run(streaming_to_consumer())
async def streaming_to_consumer():
df = spark.readStream
.format("iceberg")
.load("local.db.table")
query = df
.writeStream
.outputMode("append")
.foreachBatch(functools.partial(process_wrapper))
.trigger(processingTime="0.5 seconds")
.start()
query.awaitTermination(2)