Need to implement incremental load from S3 to Snowflake only using snowflake objects.
Accessing S3 location thru WINSCP :
CHECKITEMS_1
CHECKITEMS_2
CHECKITEMS_3
Project_1
Project_2
Project_3
-
Created a external table without columns.
-
Created a view to load data based on filename.
-
Created a tracking table to track which files got loaded.
-
Created Stored procedure to load files one by one incrementally.
CREATE OR REPLACE PROCEDURE DEV_GR_RDH_DB.NPI.LOAD_NPI_DATA()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
file_name VARCHAR;
file_count INTEGER;
BEGIN
— Process NPICheckItems files
FOR file_record IN (
SELECT METADATA$FILENAME AS file_name
FROM DEV_GR_RDH_DB.EXTERNAL.EXT_NPI_CSV_COMMA
WHERE METADATA$FILENAME ILIKE ‘CHECKITEMS%.csv’;
)
DO
file_name := file_record.file_name;-- Check if the file has already been processed SELECT COUNT(*) INTO :file_count FROM DEV_GR_RDH_DB.NPI.TRACK_LOADED_FILES WHERE file_name = :file_name; IF file_count = 0 THEN -- Merge data into the target table MERGE INTO DEV_GR_RDH_DB.NPI.TRG_NPI_CHECKITEMS trg USING DEV_GR_RDH_DB.EXTERNAL.VW_LATEST_NPI_CHECKITEMS stg ON trg.CHECKITEMID = stg.CHECKITEMID WHEN MATCHED THEN UPDATE SET CHECKITEMCODE = stg.CHECKITEMCODE, CHECKITEMNAME = stg.CHECKITEMNAME, CHECKITEMDESCRIPTION = stg.CHECKITEMDESCRIPTION, CHECKITEMTYPEID = stg.CHECKITEMTYPEID WHEN NOT MATCHED THEN INSERT ( CHECKITEMID, CHECKITEMCODE, CHECKITEMNAME, CHECKITEMDESCRIPTION, CHECKITEMTYPEID ) VALUES ( stg.CHECKITEMID, stg.CHECKITEMCODE, stg.CHECKITEMNAME, stg.CHECKITEMDESCRIPTION, stg.CHECKITEMTYPEID ); -- Track the processed file INSERT INTO DEV_GR_RDH_DB.NPI.TRACK_LOADED_FILES (file_name, success_flag, records_affected) SELECT :file_name, TRUE, COUNT(*) FROM DEV_GR_RDH_DB.NPI.VW_NPI_CHECKITEMS WHERE file_name = :file_name GROUP BY file_name; ELSE -- File already processed, skip processing CONTINUE; END IF; END FOR; RETURN 'Incremental load process completed successfully.';
END;
$$;
Similarly we need to include code for Project% in the same code.
But S.P is throwing errors.