I wrote a own processwindowfunction
for the following method in Scala/flink but for some reason, I am getting this error in my IDE:
Type mismatch.
Required: ProcessWindowFunction[(String, String, Int), NotInferredR, String, TimeWindow]
Found: MyProcessWindowFunction
here is the relevant part of my implementation method:
def question_seven(
commitStream: DataStream[Commit]): DataStream[CommitSummary] = {
val windowedStream = commitStream
.map { x =>
(x.url.split("/")(4) + "/" + x.url.split("/")(5), x.commit.committer.name, x.stats.map(_.total).getOrElse(0))
}
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.days(1)))
.process(new MyProcessWindowFunction) //here is the error occurring
}
Here under my custom window process class:
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.util.Collector
import util.Protocol.CommitSummary
import java.lang
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
class MyProcessWindowFunction extends ProcessWindowFunction[
(String, String, Int),
CommitSummary,
String,
TimeWindow
] {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
override def process(key: String, context: ProcessWindowFunction[(String, String, Int), CommitSummary, String, TimeWindow]#Context, iterable: lang.Iterable[(String, String, Int)], collector: Collector[CommitSummary]): Unit = {
val elements = iterable.asScala
val simpleDateFormat = new SimpleDateFormat("dd-MM-yyyy")
val windowStart = context.window.getStart
val date = simpleDateFormat.format(windowStart)
val amountOfCommits = elements.size
val committerCounts = elements.groupBy(_._2).mapValues(_.size)
val amountOfCommitters = committerCounts.size
val totalChanges = elements.map(_._3).sum
val maxCommits = committerCounts.values.max
val topCommitters = committerCounts.filter(_._2 == maxCommits).keys.toList.sorted.mkString(",")
val commitSummary = CommitSummary(key, date, amountOfCommits, amountOfCommitters, totalChanges, topCommitters)
collector.collect(commitSummary)
}
}
When i try to change something in this then i get other errors complaining that I don’t implement the original class correctly. I can provide more info if needed. The process
class should return a datastream or a windowed datastream as far as i know.
1
I actually found out why the error was occurring. This is due to a wrong import of the ProcessWindowFunction.
I did: org.apache.flink.streaming.api.functions.windowing
But i should have done: org.apache.flink.streaming.api.scala.function
The first one is a Java function, which has different parameters, whereas the second one is a scala function that has other parameters.
Java parameters: [(String, String, Int), NotInferredR, String, TimeWindow]
Scala parameters: [(String, String, Int), CommitSummary, String, TimeWindow]
Commitsummary can be anyclass