I have created 20 tables .10 tables(target) for full load from AWS athena using streamsets and remaining 10 tables(source tables) for incremental from AWS athena using streamsets to postgresql (dbeaver). Now I want to merge them one source table to one target table.
I used this query to merge the tables in postgresql.
CREATE OR REPLACE PROCEDURE merge_all_tables()
LANGUAGE plpgsql
AS $$
BEGIN
-- Merge source_table_a into target_table_a1
MERGE INTO target_table_a1 AS t
USING source_table_a AS s
ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id, name, balance)
VALUES (s.id, s.name, s.balance);
-- Merge source_table_b into target_table_b1
MERGE INTO target_table_b1 AS t
USING source_table_b AS s
ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id, name, balance)
VALUES (s.id, s.name, s.balance);
-- Repeat MERGE for other source and target table pairs (D to D1, E to E1, etc.)
-- Merge source_table_d into target_table_d1
MERGE INTO target_table_d1 AS t
USING source_table_d AS s
ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id, name, balance)
VALUES (s.id, s.name, s.balance);
END;
$$;
THIS IS THE HARD CODING WHICH I HAVE ENCOUNTERED. BUTM I NEED THE QUERY WHICH READS AND INSERTS INTO THE TARGET FROM INFORMATUON SCHEMA.
I got this
CREATE OR REPLACE PROCEDURE merge_multiple_tables()
LANGUAGE plpgsql
AS $$
DECLARE
-- Array of source-target-primary key pairs
source_tables TEXT[] := ARRAY['source_table_1', 'source_table_2', 'source_table_3', 'source_table_4', 'source_table_5'];
target_tables TEXT[] := ARRAY['target_table_1', 'target_table_2', 'target_table_3', 'target_table_4', 'target_table_5'];
primary_keys TEXT[] := ARRAY['id_1', 'id_2', 'order_id', 'emp_id', 'item_code'];
column_list TEXT;
insert_values TEXT;
i INT;
BEGIN
-- Loop through each source-target-primary key pair
FOR i IN 1..array_length(source_tables, 1)
LOOP
-- Dynamically retrieve column names for the target table
SELECT string_agg(quote_ident(column_name), ', ')
INTO column_list
FROM information_schema.columns
WHERE lower(table_name) = lower(target_tables[i]); -- Convert table names to lowercase to avoid case issues
-- Dynamically retrieve values for the source table columns for the insert clause
SELECT string_agg('source.' || quote_ident(column_name), ', ')
INTO insert_values
FROM information_schema.columns
WHERE lower(table_name) = lower(target_tables[i]); -- Same column names as the target table
-- Dynamically build and execute the MERGE statement
EXECUTE '
MERGE INTO ' || quote_ident(target_tables[i]) || ' AS target
USING ' || quote_ident(source_tables[i]) || ' AS source
ON target.' || quote_ident(primary_keys[i]) || ' = source.' || quote_ident(primary_keys[i]) || '
WHEN MATCHED THEN
UPDATE SET
' || (
SELECT string_agg('target.' || quote_ident(column_name) || ' = source.' || quote_ident(column_name), ', ')
FROM information_schema.columns
WHERE lower(table_name) = lower(target_tables[i])
) || '
WHEN NOT MATCHED THEN
INSERT (' || column_list || ')
VALUES (' || insert_values || ');
';
END LOOP;
END;
$$;
while calling the function it is showing error at the target (like column is not found but everything was exact)
I need a stored procedure
Diviya is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
3