The apache camel route works partialy for large insertion of data from csv file from aws s3. It inserts data in batches for few times and then stops with error. If I give the csv record with less rows it does not even executes the insert statement rather the application gets stopped with no error. Note that the number of columns is not known before hand for the csv file and csv file does not have column name as header.
Below is the code I tried.
public void configure() {
from("direct:startinsert")
.transacted()
.process(new ReadFromS3Processor()
})
.unmarshal(csvDataFormat)
.process(exchange->{
List<List<String>> csvData = exchange.getIn().getBody(List.class);
int columnCount = csvData.get(0).size();
StringBuilder sqlBuilder = new StringBuilder();
String tableName = exchange.getProperty("tableName", String.class);
sqlBuilder.append("INSERT INTO ").append(tableName).append(" VALUES (");
for (int i = 0; i < columnCount; i++) {
if (i > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append("#");
}
sqlBuilder.append(")");
exchange.setProperty("sqlinsert", simple(sqlBuilder.toString()).evaluate(exchange, String.class));
})
.split(body()).stopOnException().streaming()
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionPredicate(exchange->{
if (exchange != null) {
ArrayList list = exchange.getIn().getBody(ArrayList.class);
if (CollectionUtils.isNotEmpty(list) && list.size() == size) {
return true;
}
}
return false;
})
.completionTimeout(3000L)
.toD("sql:${exchangeProperty.sqlinsert}?batch=true")
.end();
}
public class ArrayListAggregationStrategy implements AggregationStrategy {
public ArrayListAggregationStrategy() {
super();
}
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
Object newBody = newIn.getBody();
ArrayList list = null;
if (oldExchange == null) {
list = new ArrayList();
list.add(newBody);
newIn.setBody(list);
return newExchange;
} else {
Message in = oldExchange.getIn();
list = in.getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
}
}