I use FileSink from org.apache.flink.connector.file.sink package.
This is my code:
FileSink
.forRowFormat(new Path(somePath), new SimpleStringEncoder[String]("UTF-8"))
.withOutputFileConfig(FileConfigs.rawFileConfig)
.withRollingPolicy(DefaultRollingPolicy.builder() ... )
.withBucketAssigner( ... )
.build()
And construct source:
KafkaSource.builder[String]
.setBootstrapServers(someBootstrapServers)
.setTopics(someTopic)
.setGroupId(someGroupId)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperties(someKafkaProps)
.build
And stream config:
streamEnv.enableCheckpointing(someCheckpointingInterval, CheckpointingMode.EXACTLY_ONCE)
streamEnv.setRestartStrategy(RestartStrategies.exponentialDelayRestart( ... ))
streamEnv.setParallelism(someParallelism)
When I read messages it writes files. When I turn off the task by canceling in Flink WebUI and restarting, the files in the destination are duplicated.
In case of using a compactor, when turning off the task, unfinished files remain in the same place.
In case of writing to the backet at the end of the hour, files with the status ‘inprogress’ remain as such without completion.
Why do I get duplicates after restarting the task?
Why do I have files in the status in progress at the end of the hour?
Why doesn’t FileSync just attach to the file or create a new one?
I tried to find a solution somewhere but my questions are unanswered.
sopka13 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.