My use case is to start a spark job from Dolphin Scheduler or Airflow. This job will read the data from the Apache Pulsar stream for a timespan, process the records, and then shut down or close the application completely.
The above will repeat for a cron pattern, and I do not want the job to be running full time as it will consume the resources and I want to executors to shut down, however, the master can continue to run.
I tried implementing the below and it is not working. I am also wondering if there is a way by which Spark does it on its own.
public class CardTransactionStreamProcessor extends StreamingQueryListener {
public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
public static final String TOPIC = "spark/tutorial/card-txn";
public static final String SUB = "my-sub";
public SparkSession sparkSession;
public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException, AnalysisException {
CardTransactionStreamProcessor streamProcessor = new CardTransactionStreamProcessor();
streamProcessor.sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");
streamProcessor.sparkSession.conf().set("spark.streaming.stopGracefullyOnShutdown", true);
Dataset<Row> lines = streamProcessor.sparkSession.readStream()
.format("pulsar")
.option("service.url", PULSAR_SERVICE_URL)
.option("topic", TOPIC)
.option("startingOffsets", "earliest")
.option("pulsar.reader.readerName", "my-sub-card-txn")
.load();
lines.printSchema();
Dataset<CardTransactionDTO> cardTransactionDTODataset = lines.as(ExpressionEncoder.javaBean(CardTransactionDTO.class));
cardTransactionDTODataset.printSchema();
cardTransactionDTODataset
.writeStream()
.trigger(Trigger.AvailableNow())
.outputMode("update")
.format("console")
.start();
streamProcessor.sparkSession.streams().addListener(streamProcessor);
streamProcessor.sparkSession.streams().awaitAnyTermination();
}
@Override
public void onQueryStarted(QueryStartedEvent event) {
System.out.printf("Query started%n");
}
@Override
public void onQueryProgress(QueryProgressEvent event) {
System.out.printf("Query in progress%n");
}
@Override
public void onQueryTerminated(QueryTerminatedEvent event) {
System.out.printf("Query terminated%n");
sparkSession.stop();
System.exit(1);
}
}