We received following error in Creating Tests for Apache Beam Pipeline with DirectTestRunner.
Cloning and editing the original object will be required to fix the issue. IllegalMutationException from Beam PTransform
org.apache.beam.sdk.util.IllegalMutationException: PTransform
org.apache.beam.sdk.values.KV. Input values must not be mutated in any way.
This problem does not occur in our realtime Flink Pipeline, but only in the Test Pipeline Code.
What are some problems that can ocur, if we modify the original object? Apache mentions input objects are supposed to be immutable.
Resource:
https://cloud.google.com/dataflow/docs/guides/develop-and-test-pipelines
Example Code:
private class WeatherStatsPipeline extends
PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
@Override
public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
// modify the original input * 2 , etc
}
}
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
// Create test input consisting of temperature readings
PCollection<Integer> tempCelsius =
p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));
// CalculateWeatherStats calculates the min, max, and average temperature
PCollection<WeatherSummary> result =
tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());
// Assert correct output from CalculateWeatherStats
PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
.withAverageTemp(21)
.withMaxTemp(24)
.withMinTemp(20)
.build());
p.run();
}