I have a code below.
image_df = session.read.format("image").load(image_path)
# Collect the image metadata
metadata = image_df.select("image.height", "image.width", "image.nChannels").first()
height = metadata["height"]
width = metadata["width"]
## I have to see why this portion gives 4 channels instead of 3.....
nChannels = metadata["nChannels"]
# Broadcast the height, width, and nChannels
height_broadcast = session.sparkContext.broadcast(height)
width_broadcast = session.sparkContext.broadcast(width)
nChannels_broadcast = session.sparkContext.broadcast(nChannels)
# Define a UDF to split the image array into separate R, G, B arrays
def split_rgb(data):
# you need to access the broadcasted value by .value method
height = height_broadcast.value
width = width_broadcast.value
nChannels = nChannels_broadcast.value
# from buffer assumes that the input is byte data. dtype = the data type of the return
image = np.frombuffer(data, dtype=np.uint8).reshape((height, width, nChannels), order = "C") # numpy uses C order, so should be correct, but we must check....
if nChannels == 4: # Remove the alpha channel if present
image = image[:, :, :3]
r = image[:, :, 0].flatten().tolist()
g = image[:, :, 1].flatten().tolist()
b = image[:, :, 2].flatten().tolist()
return r, g, b
split_rgb_udf = F.udf(lambda data: split_rgb(data), ArrayType(ArrayType(IntegerType())))
# Apply the UDF to the image DataFrame
rgb_df = image_df.withColumn("rgb", split_rgb_udf(F.col("image.data")))
# Select the RGB column and explode it into separate R, G, B columns
r_df = rgb_df.select(F.col("rgb")[0].alias("r"))
g_df = rgb_df.select(F.col("rgb")[1].alias("g"))
b_df = rgb_df.select(F.col("rgb")[2].alias("b"))
# this show section is commented out because i checked the outputs.
# r_df.show(10)
# g_df.show(10)
# b_df.show(10)
# rdd.map: take each df and perform the lambda function inside and return.
# later for computeSVD, the input must be RowMatrix. Thus need to take the df and put in as RowMatrix
r_rdd = RowMatrix(r_df.select('r').rdd.map(lambda row: DenseVector(row)))
g_rdd = RowMatrix(g_df.select('g').rdd.map(lambda row: DenseVector(row)))
b_rdd = RowMatrix(b_df.select('b').rdd.map(lambda row: DenseVector(row)))
# check if the image is imported correctly.
print(r_rdd.rows.map(lambda row: sum(row.toArray())).sum())
print(g_rdd.rows.map(lambda row: sum(row.toArray())).sum())
print(b_rdd.rows.map(lambda row: sum(row.toArray())).sum())
Basically, it imports a png image file and then creates rdd of RowMatrix (I am going to do a SVD on them later). But the issue is, when I check if the values are imported correctly, in the print statements, I get 0 for the r_rdd sum, meaning that there is no R value. But the picture does have a lot of red in it.
I am guessing it went wrong in the np.frombuffer section where I convert image bytes to the numpy array, but I cannot seem to be able to fix it.