I have a DataFrame[Rand: double, Flag: int, Value: int]. I wanna to do the following with this data.
- In col ‘Flag’, count the total number of occasions where there are more than 3 consecutive 0s.
- Distribution of number of occasions as the function of length of consecutive block of 0s.
my code:
<code>
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, lag, when, sum as _sum
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("ConsecutiveZeros").getOrCreate()
df = spark.read.csv("file.csv", header=True, inferSchema=True)
window = Window.orderBy("Flag")
df = df.withColumn(
"start_streak", F.col("Flag").cast("int") * F.col("Flag").cast("int") - F.lag("Flag").over(window).cast("int")
)
df = df.withColumn(
"streak_id", F.sum("start_streak").over(Window.orderBy("Flag"))
)
filtered_df = df.filter(F.col("streak_id") > 3).select("streak_id").distinct()
total_occurrences = filtered_df.count()
print(f"Total occasions with more than 3 consecutive 0s: {total_occurrences}")
</code>
<code>
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, lag, when, sum as _sum
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("ConsecutiveZeros").getOrCreate()
df = spark.read.csv("file.csv", header=True, inferSchema=True)
window = Window.orderBy("Flag")
df = df.withColumn(
"start_streak", F.col("Flag").cast("int") * F.col("Flag").cast("int") - F.lag("Flag").over(window).cast("int")
)
df = df.withColumn(
"streak_id", F.sum("start_streak").over(Window.orderBy("Flag"))
)
filtered_df = df.filter(F.col("streak_id") > 3).select("streak_id").distinct()
total_occurrences = filtered_df.count()
print(f"Total occasions with more than 3 consecutive 0s: {total_occurrences}")
</code>
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, lag, when, sum as _sum
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("ConsecutiveZeros").getOrCreate()
df = spark.read.csv("file.csv", header=True, inferSchema=True)
window = Window.orderBy("Flag")
df = df.withColumn(
"start_streak", F.col("Flag").cast("int") * F.col("Flag").cast("int") - F.lag("Flag").over(window).cast("int")
)
df = df.withColumn(
"streak_id", F.sum("start_streak").over(Window.orderBy("Flag"))
)
filtered_df = df.filter(F.col("streak_id") > 3).select("streak_id").distinct()
total_occurrences = filtered_df.count()
print(f"Total occasions with more than 3 consecutive 0s: {total_occurrences}")
But it is giving me in correct results, Can someplace help me here