I am trying to write a Flink User Defined Aggregate Function (UDAF) for aggregation :
/**
* A custom Aggregate Function (UDAF) for aggregating click events.
* This UDAF aggregates click actions and click tags, and keeps track of the earliest timestamp and click type.
* The results are stored in a map with keys as event1 and event2.
*/
public class ClickEventAggregator extends AggregateFunction<ClickEventAggregator.AggregateResult, ClickEventAggregator.AggregateAccumulator> {
@Override
public AggregateAccumulator createAccumulator() {
return new AggregateAccumulator();
}
@Override
public AggregateResult getValue(AggregateAccumulator accumulator) {
Map<String, String> result = new HashMap<>();
if (!accumulator.actionSet.isEmpty()) {
result.put("event1", String.join(",", accumulator.actionSet));
}
if (!accumulator.tagSet.isEmpty()) {
result.put("event2", String.join(",", accumulator.tagSet));
}
return new AggregateResult(result, accumulator.firstTimestamp, accumulator.type);
}
public void accumulate(AggregateAccumulator accumulator, String action, String clickTag, Long timestamp, Integer type) {
if (accumulator.firstTimestamp == null || timestamp < accumulator.firstTimestamp) {
accumulator.firstTimestamp = timestamp;
accumulator.type = type;
}
if (action != null && !action.isEmpty()) {
accumulator.actionSet.add(action);
}
if (clickTag != null && !clickTag.isEmpty()) {
accumulator.tagSet.add(clickTag);
}
}
public void merge(AggregateAccumulator accumulator, Iterable<AggregateAccumulator> others) {
for (AggregateAccumulator other : others) {
if (accumulator.firstTimestamp == null || (other.firstTimestamp != null && other.firstTimestamp < accumulator.firstTimestamp)) {
accumulator.firstTimestamp = other.firstTimestamp;
accumulator.type = other.type;
}
accumulator.actionSet.addAll(other.actionSet);
accumulator.tagSet.addAll(other.tagSet);
}
}
public static class AggregateAccumulator {
public Long firstTimestamp;
public HashSet<String> actionSet = new HashSet<>();
public HashSet<String> tagSet = new HashSet<>();
public Integer type;
}
public static class AggregateResult {
public Map<String, String> eventMap;
public Long firstTimestamp;
public Integer type;
public AggregateResult(Map<String, String> eventMap, Long firstTimestamp, Integer type) {
this.eventMap = eventMap;
this.firstTimestamp = firstTimestamp;
this.type = type;
}
}
}
Exception stack is :
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'java.util.Set<java.lang.String>'. Interpreting it as a structured type was also not successful.
….
Caused by: org.apache.flink.table.api.ValidationException: Class 'java.util.Set' must not be abstract.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:357) ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
If I used ArrayList it works fine. Not really sure the difference on how the Flink decides if a HashSet has not fields as it fails to read it as a structure whereas with ArrayList it completely works fine. Can someone please suggest what is the right way here to implement this UDAF using HashSet and HashMap.