I am trying to process data from Kinesis stream using Spark streaming.
I am using https://spark.apache.org/docs/latest/streaming-kinesis-integration.html as reference to write code
object TimeAndLanFixer {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[6]").setAppName("LangAndTimeZoneFixer")
val ssc = new StreamingContext(conf, Seconds(3))
val time = System.currentTimeMillis()
println(s"Current time in millis: $time")
val lang = System.getProperty("user.language")
println(s"Current language: $lang")
val initialPositionTimestamp = "2024-06-27 00:00:00"
val date: Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(initialPositionTimestamp)
val kinesisStream = KinesisInputDStream.builder
.streamName(System.getenv("STREAM_NAME"))
.streamingContext(ssc)
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
.regionName("us-west-2")
.checkpointAppName("DebugApp").checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.initialPosition(new AtTimestamp(date))
.metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
.build()
kinesisStream.start()
kinesisStream.foreachRDD(rdd => {
rdd.foreach(println)
})
ssc.awaitTermination()
}
}
When I run this,I don’t see any output on console in Intellij. I just see two println that i added in program. My input kinesis stream has 5 shards and based on some of reading that I did on internet,if testing locally,we need to supply a number more than number of shards (local[6]) to process data locally. Program is still not moving ahead
pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
<version>3.5.1</version>
</dependency>