I want to use guava.BloomFilter in Spark Application. So I need to make BloomFilter serializable by defining a class name SerializableStringBloomFilter which imply Serializable. But according to the log, method readObject and writeObject do not work properly. The value in stage of mapPartitions is normal, but the property bloomFilter is null after reduce stage.
The source code of SerializableStringBloomFilter is here:
package com.nobody.foo.bloom;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import java.io.*;
import java.nio.charset.StandardCharsets;
public class SerializableStringBloomFilter implements Serializable {
public transient BloomFilter<String> bloomFilter;
public SerializableStringBloomFilter(int expectedInsertions, double falsePositiveProbability) {
this.bloomFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.ISO_8859_1), expectedInsertions, falsePositiveProbability);
}
public void putAll(SerializableStringBloomFilter bloom) {
this.bloomFilter.putAll(bloom.bloomFilter);
}
public void put(String object) {
this.bloomFilter.put(object);
}
public void writeTo(OutputStream out) throws IOException {
System.out.println("start to write to");
System.out.println(out.toString());
System.out.println(bloomFilter.toString());
this.bloomFilter.writeTo(out);
}
public boolean mightContain(String object) {
return this.bloomFilter.mightContain(object);
}
private void writeObject(ObjectOutputStream out) throws IOException {
this.bloomFilter.writeTo(out);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.bloomFilter = BloomFilter.readFrom(in, Funnels.stringFunnel(StandardCharsets.ISO_8859_1));
}