I encountered an issue while customizing UDAF using Flink (version 1.20). I wanted to implement a UDAF that calculates the median, and I used the following two methods:
java
public class MedianUDAF2 extends AggregateFunction<Double, MedianUDAF2.State> {
public static class State {
public int scale = 2;
public ListView<Double> numbers;
public State() {}
}
@Override
public State createAccumulator() {
State state = new State();
state.numbers = new ListView<>();
return state;
}
public void accumulate(State acc, Double val, Integer scale) throws Exception {
acc.numbers.add(val);
if (scale != null && scale > 0) acc.scale = scale;
}
public void merge(State acc, Iterable<State> it) throws Exception {
for (State a : it) {
acc.numbers.addAll(a.numbers.getList());
}
}
@Override
public Double getValue(State acc) {
try {
List<Double> numbers = acc.numbers.getList();
numbers.sort(Double::compareTo);
double n = numbers.size() - 1;
double index = n * 0.5;
int low = (int) Math.floor(index);
int high = (int) Math.ceil(index);
double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
BigDecimal decimal = new BigDecimal(value);
return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
} catch (Exception ignored) {
}
return 0.0;
}
}
java
public class MedianUDAF extends AggregateFunction<Double, MedianUDAF.State> {
public static class State {
public int scale = 2;
@DataTypeHint(value = "ARRAY<DOUBLE>")
public ArrayList<Double> numbers;
public State() {}
}
@Override
public State createAccumulator() {
State state = new State();
state.numbers = new ArrayList<>();
return state;
}
public void accumulate(State acc, Double val, Integer scale) throws Exception {
acc.numbers.add(val);
if (scale != null && scale > 0) acc.scale = scale;
}
public void merge(State acc, Iterable<State> it) throws Exception {
for (State a : it) {
acc.numbers.addAll(a.numbers);
}
}
@Override
public Double getValue(State acc) {
try {
List<Double> numbers = acc.numbers;
numbers.sort(Double::compareTo);
double n = numbers.size() - 1;
double index = n * 0.5;
int low = (int) Math.floor(index);
int high = (int) Math.ceil(index);
double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
BigDecimal decimal = new BigDecimal(value);
return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
} catch (Exception ignored) {
}
return 0.0;
}
}
java
tableEnvironment.createTemporarySystemFunction("median", new MedianUDAF()); // Or new MedianUDAF2()
Table table = tableEnvironment.sqlQuery("select median(l_linenumber, 2) from lineitem");
Their difference lies in the fact that one uses an ArrayList and the other uses a ListView in State, and their performance gap is very large. Why ?
The comments on the ListView state that it will use a state backend in large amounts of data. Before Flink-table-planner 1.14, the addAccumulatorDataViews in AggregationCodeGenerator could see this conversion process, but it is no longer visible in version 1.20,
I tried to DEBUG this conversion process in the AggsHandlerCodeGenerator class, but still couldn’t succeed,
May I ask where this conversion process occurred and how I should observe this phenomenon? Thank you, thank you!!!