I’m trying to compare the data from the same month but previous year and add previous year data to a new column ‘qty_ly’. As well as sum the last 3months of qty_cy for each product.
This is the input:
| dis_id | month | qty |
|——–+——–+—–|
| 1 | jan’22 | 10 |
| 1 | jun’22 | 30 |
| 1 | may’22 | 20 |
| 2 | jan’22 | 100 |
| 2 | feb’22 | 90 |
| 2 | mar’22 | 80 |
| 2 | apr’22 | 110 |
| 2 | may’22 | 90 |
| 2 | jun’22 | 70 |
| 2 | jul’22 | 100 |
| 1 | jan’21 | 8 |
| 1 | jun’21 | 25 |
| 1 | may’21 | 18 |
this is the expected output:
| dis_id | month | qty_cy | qty_3m | qty_ly |
|——–+——–+——–+——–+——–|
| 1 | jan’22 | 10 | 10 | 8 |
| 1 | jun’22 | 30 | 50 | 25 |
| 1 | may’22 | 20 | 20 | 18 |
| 2 | jan’22 | 100 | 100 | |
| 2 | feb’22 | 90 | 190 | |
| 2 | mar’22 | 80 | 270 | |
| 2 | apr’22 | 110 | 280 | |
| 2 | may’22 | 90 | 280 | |
| 2 | jun’22 | 70 | 270 | |
| 2 | jul’22 | 100 | 260 | |
| 1 | jan’21 | 8 | 8 | |
| 1 | jun’21 | 25 | 43 | |
| 1 | may’21 | 18 | 18 | |
I tried creating a new date format (mm/dd/yyyy) and then look back 3m/12m but it’s sum the previous rows.
case class Months(
desc_id: Int,
month: String,
qty_cy: Int,
date: String,
qty_ly: Option[Int] // Option[Int] for nullable qty_ly
)
object Main {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("Months Data")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val path = "src/test/resources/months.csv"
val schema = StructType(
Array(
StructField("desc_id", IntegerType, nullable = true),
StructField("month", StringType, nullable = true),
StructField("qty_cy", IntegerType, nullable = true),
StructField("date", StringType, nullable = true) // Changed to StringType
)
)
val monthsDf = spark.read
.schema(schema)
.option("header", "true")
.csv(path)
.withColumn("date", date_format(to_date($"month", "MMM''yy"), "MM.dd.yyyy")) // Transform 'month' to 'date'
monthsDf.show()
// Define a window specification partitioned by desc_id and ordered by date
val windowSpec = Window.partitionBy("date").orderBy($"date".cast("date").asc)
// Create a new column qty_3m which sums qty_cy for the past three months
val resultDf = monthsDf
.withColumn("qty_3m", sum($"qty_cy").over(windowSpec.rowsBetween(-90, 0)))
// Show the result
resultDf.show()
// Stop Spark session
spark.stop()
}
}
// Run the Main object to see the contents of the months Dataset with qty_3m
Main.main(Array())
gmb is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.