I have the following class:
import java.util.HashMap;
public class Foo extends HashMap<Bar, Baz> {
}
Flink cannot serialize this class without further help or resorting to Kryo, this test fails:
import org.apache.flink.types.PojoTestUtils;
import org.junit.jupiter.api.Test;
class FooTest {
@Test
void test() {
PojoTestUtils.assertSerializedAsPojoWithoutKryo(Foo.class);
}
}
I tried the following to no avail:
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
@TypeInfo(Foo.FooTypeInfoFactory.class)
public class Foo extends HashMap<Bar, Baz> {
public static class FooTypeInfoFactory extends
TypeInfoFactory<Foo> {
@Override
public TypeInformation<Foo> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
var type = (TypeInformation<Foo>)
Types.MAP(TypeInformation.of(Bar.class), TypeInformation.of(Baz.class));
return type;
}
}
}
But this does not work because
Inconvertible types; cannot cast 'org.apache.flink.api.common.typeinfo.TypeInformation<java.util.Map<Bar,Baz>>' to 'org.apache.flink.api.common.typeinfo.TypeInformation<Foo>'
Is there some way to make this idea work? Foo
implements Map<Bar, Baz>
so it seems to me that I should be able to tell Flink somehow, that it should be serialized like a Map
.