AsyncRetryStrategy asyncRetryStrategy =
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
.build();
// apply the async I/O transformation with retry
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);
This is example from official Flink docs
Say we make 5 async API call inside the AsyncDatabaseRequest’s asyncInvoke method, does flink test the predicates in AsyncRetryStrategy against the output stream element
which is of type Tuple2<String, String> in this example, , and retry whole asyncInvoke method OR will it test the predicates against the output of each of the 5 async API calls and retry the ones which failed out of the 5 ?
The retry is on a per-record basis. See Flink’s AsyncWaitOperator.processElement()
, which is called once per element. If retry is enabled, then it uses the RetryableResultHandlerDelegator
, which implements the ResultFuture
API, and in particular the completeExceptionally()
call which will trigger a retry of that single record that failed.