I have a use case where I need to retrieve metadata from a data lake using Flink SQL. Along the lines I create some simple selection statements as the following:
CREATE TABLE metadata1 AS SELECT table_name, column_name, column_type FROM metadata_table;
-- Additional statements here
I wrap these statements in a SET STATEMENT
block and execute them using executeSql on TableEnvironment. The results are stored in tables in the default catalog.
Next, I need to combine the retrieved data to dynamically create a new table with a statement like this, were column names and data types are results of the previous queries:
CREATE TABLE NewTable (`COLUMN1` VARCHAR, `COLUMN2` VARCHAR, `COLUMN3` VARCHAR)
To achieve this, I need to switch from Flink SQL
to the DataStream API
, retrieve the data from the previous step, and map it according to some basic filter rules to create the final statement. After that I need to switch back to the Flink SQL API to perform some joins to produce my desired results.
My problem is that each call to executeSql
or execute
in case of DataStream API triggers a new job in Flink. This means I can’t execute the entire process as a single Flink application. It seems I need to deploy several Flink applications and use a workflow orchestrator to step through the process and achieve the desired outcome.
Is there a way to streamline this process within a single Flink application (preferably using a more declarative approach like Flink SQL), or is my understanding correct that I need multiple applications and a workflow orchestrator?