We are using Apache Beam Pipeline and ElasticSearchIO in Java. How can I make a field be a keyword or use fielddata=true? Preferrably, we would like to use a keyword. Thanks.
We tried the following code, and we want type to be keyword:
pipeline.apply("Read MySQL data", JdbcIO.<ProgramProduct>read().withDataSourceConfiguration(config)
.withQuery(sql).withOutputParallelization(false)
.withFetchSize(readBatchSize)
.withRowMapper(new JdbcIO.RowMapper<ProgramProduct>() {
private static final long serialVersionUID = 1L;
public ProgramProduct mapRow(ResultSet resultSet) throws Exception {
return new ProgramProduct(resultSet.getInt("product_fk"), resultSet.getInt("program_fk"),
resultSet.getString("type"),
getProductConfig(resultSet.getDouble("price"),
resultSet.getDouble("variable_min_points"),
resultSet.getDouble("variable_max_points")),
resultSet.getString("date_created"));
}
})).apply("Apply transform on data", ParDo.of(new DoFn<ProgramProduct, String>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext processContext) throws SQLException {
ProgramProduct product = processContext.element();
String jsonObject;
if (null != product) {
jsonObject = new GsonBuilder().create().toJson(product);
processContext.output(jsonObject.toString());
} else {
processContext.output(null);
}
}
}))
.apply("Write to Elasticsearch", ElasticsearchIO.write()
.withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration
.create(new String[] { elasticHost }, elasticIndexName)
.withApiKey(elasticApiKey))
.withMaxBatchSize(elasticBatchSize));
`
New contributor
Phil McLachlan is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.