We have an Apache Beam (Streaming) job which is getting records from a Changestream. We read change streams using,
Step 1.
PCollection inputChangeStreams = pipeline.apply( "readChangeStream1", SpannerIO.readChangeStream() .withSpannerConfig(spannerConfig) .withChangeStreamName(options.getLoansDecisionResponseChangeStream()) .withMetadataInstance(options.getInputInstanceId()) .withMetadataDatabase(options.getMetaDataDatabaseId()) .withInclusiveStartAt(Timestamp.now())); return inputChangeStreams;
Step 2.
PCollection> crdRqsIds = inputChangeStreams.apply( "Extract Ids from the Changestreams", ParDo.of(new ParseChangestreamData()));
how can we have delay in between the Step 1 and Step 2.
tried this code but didn’t work –
inputChangeStreams.apply(
"Apply Fixed Length Windows",
Window.into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.standardSeconds(30))
.discardingFiredPanes());