I need to enrich my data using reference tables and would like to use Flink SQL for this purpose. My reference tables are stored
in object storage using the Iceberg table format. To start, I am trying to create temporary tables in Flink SQL that project and
filter the original tables. Therefore along the lines, I have written the following SQL statements:
CREATE TEMPORARY TABLE my_temporary_table (a INT, b STRING, c INT);
INSERT INTO my_temporary_table SELECT a ,b, c FROM stored_table_in_s3 where ...;
However, I do not want to materialize these temporary tables in storage, as they will only
contain a few thousand entries and can easily be handled in memory. When I attempt to execute these statements
using the Table Environment in a Flink context:
tableEnv.executeSql(sqlCreateQuery);
StreamStatementSet statementSet = tableEnv.createStatementSet();
statementSet.addInsertSql(insertQuery)
...
statementSet.execute();
I encounter the following error:
Table options do not contain an option key ‘connector’ for discovering
a connector. Therefore, Flink assumes a managed table. However, a
managed table factory that implements org.apache.flink.table.factories.ManagedTableFactory is not in the
classpath
which states the need for a connector, essentially prompting me to store the data in order to proceed with the execution statement.
Am I missing something ? How can I efficiently enrich events by performing multiple lookups on reference data tables using Flink SQL/Table API without incurring unnecessary storage costs by materializing intermediate tables (temporary tables)?