In Spark I am using MinHashLSH to do approximate similarity join. I am doing this on N sets of columns.
The rough code looks like this:
List<Dataset<Row>> outputs = new ArrayList<Dataset<Row>>();
MinHashLSHModel m1 = new MinHashLSH().setNumHashTables(5)
.setInputCol("features1").setOutputCol("hashes1").fit(data1);
MinHashLSHModel m2 = new MinHashLSH().setNumHashTables(5)
.setInputCol("features2").setOutputCol("hashes2").fit(data2);
MinHashLSHModel m3 = new MinHashLSH().setNumHashTables(5)
.setInputCol("features3").setOutputCol("hashes3").fit(data3);
Dataset<?> jds1 = m1.approxSimilarityJoin(sdata1, data1, 0.5,"JaccardDistance");
Dataset<Row> sfds1 = jds1.select(functions.col("datasetA.*"), col("datasetB."+idColName).as(simIdColName),
col("JaccardDistance").as(disColName));
outputs.add(sfds1 );
Dataset<?> jds2 = m2.approxSimilarityJoin(sdata2, data2, 0.5,"JaccardDistance");
Dataset<Row> sfds2 = jds2.select(functions.col("datasetA.*"), col("datasetB."+idColName).as(simIdColName),
col("JaccardDistance").as(disColName));
outputs.add(sfds2 );
Dataset<?> jds3 = m3.approxSimilarityJoin(sdata3, data3, 0.5,"JaccardDistance");
Dataset<Row> sfds3 = jds3.select(functions.col("datasetA.*"), col("datasetB."+idColName).as(simIdColName),
col("JaccardDistance").as(disColName));
outputs.add(sfds3 );
// Finally I want to join the three datasets sfds1, sfds2 and sfds3
List<String> keyLst = Arrays.asList(idColName,simIdColName);
Seq<String> keySeq = JavaConverters.collectionAsScalaIterableConverter(keyLst).asScala().toSeq();
Dataset<Row> outputDs = null;
outputDs = outputs.get(0);
for(int i=1;i<outputs.size();i++) {
outputDs = outputDs
.join(outputs.get(i), keySeq, "fullouter");
}
Because spark executes in lazy manner, the joins are not happening in each iteration. They are executed at the end when an action is called and it creates a performance bottleneck. The last few stages are each longer than the previous one in terms of execution time.
How to improve this code ? Do I need to change anything in the spark submit configuration ?