I’m seeing odd behavior when using ValueState objects. I have a streaming dataflow job that processes records throughout the day and I window them emit a single file.
Since order preservation isn’t necessary, and my client wants a header record on the output file, I use a BagState to first collect all of the records into a single view, leveraging a simple “start of day/end of day” ValueState to control my window( “TimetoMaketheDonuts” ).
Both the BagState & ValueState variables are properly persisting as the dataflow job runs throughout the day. Setting the expiry is always accurate, and I don’t lose any data.
The problem comes with my second ValueState variable, sequenceNumber. This should increment by one each day, when the window expires( see onExpiry ). However, regardless of what my window is, this ValueState is always null when I read it.
Why would my BagState buffer & ValueState timeToMakeTheDonuts persist and work correctly, but ValueState sequenceNumber isn’t persisting?
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.*;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
public class BufferedProcessing extends DoFn<KV<Integer, String>, String> {
public static class FakeKvPair extends DoFn<String, KV<Integer, String>> {
public void processElement(ProcessContext c) {
c.output(KV.of(0, c.element()));
private final StateSpec<BagState<String>> bufferedEvents = StateSpecs.bag();
@StateId("timeToMakeTheDonuts")
private final StateSpec<ValueState<Integer>> setTheTimerState = StateSpecs.value(VarIntCoder.of());
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("sequenceNumber")
private final StateSpec<ValueState<Integer>> sequenceNumberSpec = StateSpecs.value(VarIntCoder.of());
public void process(ProcessContext context, @StateId("buffer") BagState<String> bufferState,
@StateId("timeToMakeTheDonuts") ValueState<Integer> isItTimeState, @TimerId("expiry") Timer expiryTimer,
if (isItTimeState.read() == null || isItTimeState.read() == 0) {
expiryTimer.set(window.maxTimestamp());
bufferState.add(context.element().getValue());
public void onExpiry(OnTimerContext context, @StateId("buffer") BagState<String> bufferState,
@StateId("timeToMakeTheDonuts") ValueState<Integer> isItTimeState,
@StateId("sequenceNumber") ValueState<Integer> sequenceNumber, @TimerId("expiry") Timer expiryTimer) {
List<String> records = new ArrayList<>();
for (String record : bufferState.read()) {
// Calculate & write header
DateTimeFormatter dateTimeFormat =
DateTimeFormatter.ofPattern("MMddyyyyHHmmss").withZone(ZoneId.of("America/New_York"));
DateTimeFormatter sequenceDateFormatter = DateTimeFormatter.ofPattern("dd MM yyyy");
//ISSUE HERE: sequenceNumber.read() is always null
Integer headerSequenceValue = sequenceNumber.read();
if (headerSequenceValue == null)
sequenceNumber.write(++headerSequenceValue);
String headerOutput = String.format("%-20s%07d%s%07d", "FILEHEADER", headerSequenceValue,
dateTimeFormat.format(LocalDateTime.now()), records.size() + 1); // Include header record in the count
context.output(headerOutput);
for (String record : records) {
<code>
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.*;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
public class BufferedProcessing extends DoFn<KV<Integer, String>, String> {
public static class FakeKvPair extends DoFn<String, KV<Integer, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(0, c.element()));
}
}
@StateId("buffer")
private final StateSpec<BagState<String>> bufferedEvents = StateSpecs.bag();
@StateId("timeToMakeTheDonuts")
private final StateSpec<ValueState<Integer>> setTheTimerState = StateSpecs.value(VarIntCoder.of());
@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("sequenceNumber")
private final StateSpec<ValueState<Integer>> sequenceNumberSpec = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void process(ProcessContext context, @StateId("buffer") BagState<String> bufferState,
@StateId("timeToMakeTheDonuts") ValueState<Integer> isItTimeState, @TimerId("expiry") Timer expiryTimer,
IntervalWindow window) {
if (isItTimeState.read() == null || isItTimeState.read() == 0) {
expiryTimer.set(window.maxTimestamp());
isItTimeState.write(1);
}
bufferState.add(context.element().getValue());
}
@OnTimer("expiry")
public void onExpiry(OnTimerContext context, @StateId("buffer") BagState<String> bufferState,
@StateId("timeToMakeTheDonuts") ValueState<Integer> isItTimeState,
@StateId("sequenceNumber") ValueState<Integer> sequenceNumber, @TimerId("expiry") Timer expiryTimer) {
List<String> records = new ArrayList<>();
for (String record : bufferState.read()) {
records.add(record);
}
// Calculate & write header
DateTimeFormatter dateTimeFormat =
DateTimeFormatter.ofPattern("MMddyyyyHHmmss").withZone(ZoneId.of("America/New_York"));
DateTimeFormatter sequenceDateFormatter = DateTimeFormatter.ofPattern("dd MM yyyy");
//ISSUE HERE: sequenceNumber.read() is always null
Integer headerSequenceValue = sequenceNumber.read();
if (headerSequenceValue == null)
headerSequenceValue = 1;
sequenceNumber.write(++headerSequenceValue);
String headerOutput = String.format("%-20s%07d%s%07d", "FILEHEADER", headerSequenceValue,
dateTimeFormat.format(LocalDateTime.now()), records.size() + 1); // Include header record in the count
context.output(headerOutput);
for (String record : records) {
context.output(record);
}
isItTimeState.write(0);
}
}
</code>
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.*;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
public class BufferedProcessing extends DoFn<KV<Integer, String>, String> {
public static class FakeKvPair extends DoFn<String, KV<Integer, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(0, c.element()));
}
}
@StateId("buffer")
private final StateSpec<BagState<String>> bufferedEvents = StateSpecs.bag();
@StateId("timeToMakeTheDonuts")
private final StateSpec<ValueState<Integer>> setTheTimerState = StateSpecs.value(VarIntCoder.of());
@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("sequenceNumber")
private final StateSpec<ValueState<Integer>> sequenceNumberSpec = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void process(ProcessContext context, @StateId("buffer") BagState<String> bufferState,
@StateId("timeToMakeTheDonuts") ValueState<Integer> isItTimeState, @TimerId("expiry") Timer expiryTimer,
IntervalWindow window) {
if (isItTimeState.read() == null || isItTimeState.read() == 0) {
expiryTimer.set(window.maxTimestamp());
isItTimeState.write(1);
}
bufferState.add(context.element().getValue());
}
@OnTimer("expiry")
public void onExpiry(OnTimerContext context, @StateId("buffer") BagState<String> bufferState,
@StateId("timeToMakeTheDonuts") ValueState<Integer> isItTimeState,
@StateId("sequenceNumber") ValueState<Integer> sequenceNumber, @TimerId("expiry") Timer expiryTimer) {
List<String> records = new ArrayList<>();
for (String record : bufferState.read()) {
records.add(record);
}
// Calculate & write header
DateTimeFormatter dateTimeFormat =
DateTimeFormatter.ofPattern("MMddyyyyHHmmss").withZone(ZoneId.of("America/New_York"));
DateTimeFormatter sequenceDateFormatter = DateTimeFormatter.ofPattern("dd MM yyyy");
//ISSUE HERE: sequenceNumber.read() is always null
Integer headerSequenceValue = sequenceNumber.read();
if (headerSequenceValue == null)
headerSequenceValue = 1;
sequenceNumber.write(++headerSequenceValue);
String headerOutput = String.format("%-20s%07d%s%07d", "FILEHEADER", headerSequenceValue,
dateTimeFormat.format(LocalDateTime.now()), records.size() + 1); // Include header record in the count
context.output(headerOutput);
for (String record : records) {
context.output(record);
}
isItTimeState.write(0);
}
}