Is it possible in Snowflake to automate a merge?

1.6k views Asked by At

Currently I have a script that merges between my source and target table but updating and inserting. Both of these tables update daily through a task created on snowflake. I would like to preform this merge daily too. Is it possible to automate this merge through either a task or something else on snowflake?

Thanks

2

There are 2 answers

0
Gokhan Atil On

If your script contains only SQL commands (or commands that can be written in JS), you can create a stored procedure to call them, and then create a task to run this procedure on every day.

https://docs.snowflake.com/en/sql-reference/stored-procedures-usage.html

https://docs.snowflake.com/en/user-guide/tasks-intro.html

1
Nicolas On
-- Here is prerequisite for running automerge procedure that is pasted at the back ---
  1  --Create Log Table:
    --EDWH_DEV.WS_EA_DNATA_DEV.GEN_LOG definition
     
    create or replace TABLE GEN_LOG (
     
    LOG_ID NUMBER(38,0) autoincrement,
     
    "number of rows inserted" NUMBER(38,0),
     
    "number of rows updated" NUMBER(38,0),
     
    PROC_NAME VARCHAR(100),
     
    FINISHED TIMESTAMP_NTZ(9),
     
    USER_NAME VARCHAR(100),
     
    USER_ROLE VARCHAR(100),
     
    STATUS VARCHAR(50),
     
    MESSAGE VARCHAR(2000)
     
    );
     
 2   --Data is loaded based on an existing table structure which must match source file columns count.
    --Example:
     
    --EDWH_DEV.WS_EA_DNATA_DEV.AIRLINES definition
     
    create or replace TABLE AIRLINES (
     
    CONSOLIDATED_AIRLINE_CODE VARCHAR(80),
     
    POSSIBLE_CUSTOMER_NAME VARCHAR(100),
     
    CUSTOMER_TYPE VARCHAR(70),
     
    CONSOLIDATED_AIRLINE_NAME VARCHAR(90),
     
    constraint CONSOLIDATED_AIRLINE_CODE unique (CONSOLIDATED_AIRLINE_CODE),
     
    constraint CUSTOMER_TYPE unique (CUSTOMER_TYPE)
     
    );
     
     
     
  3  --File in stage is AIRLINES.CSV has same column number in same order, not necessary has to have same headers as they will be aliased automatically to created table column names as above.
     
     
     
 4   --Make sure you have required file format set or use default ones(refer to SF documentation)
     
    --ALTER FILE FORMAT "EDWH_DEV"."WS_EA_DNATA_DEV".CSV SET COMPRESSION = 'AUTO' FIELD_DELIMITER = ',' RECORD_DELIMITER = '\n' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '\042' TRIM_SPACE = FALSE ERROR_ON_COLUMN_COUNT_MISMATCH = ----TRUE ESCAPE = 'NONE' ESCAPE_UNENCLOSED_FIELD = '\134' DATE_FORMAT = 'AUTO' TIMESTAMP_FORMAT = 'AUTO' NULL_IF = ('\\N');
     
  5  --Tables must be appended to have constraints which then will be used for MERGE ON clause in merge statement. Constraint name must match Column name.
     
     
    ALTER TABLE AIRLINES ADD CONSTRAINT CONSOLIDATED_AIRLINE_CODE UNIQUE (CONSOLIDATED_AIRLINE_CODE);
     
    ALTER TABLE AIRLINES ADD CONSTRAINT CUSTOMER_TYPE UNIQUE (CUSTOMER_TYPE);
     
 6   --You have stage set up and you can view files in it.
    list @my_stage;
     
    
  7 -- this view is used to pull unique fields for on clause in merge 
      CREATE OR REPLACE VIEW CONSTRAINS_VW AS
        SELECT
                tbl.table_schema,
                tbl.table_name,
                con.constraint_name,
                col.data_type
         FROM   EDWH_DEV.information_schema.table_constraints con
                INNER JOIN EDWH_DEV.information_schema.tables tbl
                        ON con.table_name = tbl.table_name
                          AND con.constraint_schema = tbl.table_schema
                INNER JOIN EDWH_DEV.information_schema.columns col
                        ON tbl.table_name = col.table_name
                           AND con.constraint_name = col.column_name
                           AND con.constraint_schema = col.table_schema
       ;
                            
         WHERE  con.constraint_type  in ('PRIMARY KEY', 'UNIQUE');

------ the general procedure code compline once use many times :) ---

CREATE OR REPLACE PROCEDURE "MERGER_BUILDER_GEN"("TABLE_NAME" VARCHAR(200), "SCHEMA_NAME" VARCHAR(200), "STAGE_NAME" VARCHAR(200))
    RETURNS VARCHAR(32000)
    LANGUAGE JAVASCRIPT
    EXECUTE AS CALLER
    AS $$
    var result;
    snowflake.execute( {sqlText: "begin transaction;"});
    var my_sql_command = `SELECT 
        0 AS "number of rows inserted"
        , 0 as "number of rows updated"
        ,'` + TABLE_NAME + `' AS proc_name
        ,CURRENT_TIMESTAMP() AS FINISHED
        ,CURRENT_USER() AS USER_NAME 
        ,CURRENT_ROLE() USER_ROLE
        ,'Failed' as status`;
        var statement1 = snowflake.createStatement( {sqlText: my_sql_command} );
        var result_set1 = statement1.execute();
      result_set1.next();
        var column1 = result_set1.getColumnValue(1);
        var column2 = result_set1.getColumnValue(2);
        var column3 = result_set1.getColumnValue(3);
        var column4 = result_set1.getColumnValue(4);
        var column5 = result_set1.getColumnValue(5);
        var column6 = result_set1.getColumnValue(6);
        var column7 = result_set1.getColumnValue(7);

try {
    var v_sql_stmt = `CREATE OR REPLACE temporary TABLE vars_of_merger_dyn00 AS 
                    SELECT  
                    COL_NAMES_SELECT    
                    ,REPLACE(listagg (distinct' nvl(tgt."'||cons.constraint_name||'",'
                    ||CASE  WHEN cons.data_type ='FLOAT' THEN '0' 
                            WHEN cons.data_type ='NUMBER' THEN '0'
                            WHEN cons.data_type ='DATE' THEN '''1900-12-01'''
                            WHEN cons.data_type ='TIMESTAMP_NTZ' THEN '''1900-12-01 00:00:00'''
                            ELSE '-999999' END||') = nvl(src."' 
                            ||cons.constraint_name ||'",'
                    ||CASE  WHEN cons.data_type ='FLOAT' THEN '0' 
                            WHEN cons.data_type ='NUMBER' THEN '0'
                            WHEN cons.data_type ='DATE' THEN '''1900-12-01'''
                            WHEN cons.data_type ='TIMESTAMP_NTZ' THEN '''1900-12-01 00:00:00'''
                            ELSE '-999999' END  ,') and \n') ||')','-999999','''''') AS dd
                    ,REPLACE(COL_NAMES_WHEN,'-999999','''''') AS COL_NAMES_WHEN
                    ,COL_NAMES_SET
                    ,COL_NAMES_INS
                    ,COL_NAMES_INS1
                    FROM (
                    SELECT 
                     InTab.TABLE_NAME              
                    ,listagg (' cast($'   ||InTab.ORDINAL_POSITION || ' as ' || intab.DATA_TYPE || ') as "' ||InTab.COLUMN_NAME,'", \n') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ||'"'  AS Col_Names_select
                    ,listagg (' nvl(tgt."'  || CASE WHEN intab.CM IS NULL THEN InTab.COLUMN_NAME ELSE NULL end  || '", '
                    ||CASE  WHEN intab.data_type ='FLOAT' THEN '0' 
                            WHEN intab.data_type ='NUMBER' THEN '0'
                            WHEN intab.data_type ='DATE' THEN '''1900-12-01'''
                            WHEN intab.data_type ='TIMESTAMP_NTZ' THEN '''1900-12-01 00:00:00''' ELSE '-999999' END
                    ||') != nvl(src."' ||InTab.COLUMN_NAME||'",'||
                      CASE  WHEN intab.data_type ='FLOAT' THEN '0' 
                            WHEN intab.data_type ='NUMBER' THEN '0'
                            WHEN intab.data_type ='DATE' THEN '''1900-12-01'''
                            WHEN intab.data_type ='TIMESTAMP_NTZ' THEN '''1900-12-01 00:00:00''' ELSE '-999999' END 
                    ,') OR\n') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ||')' AS Col_Names_when
                    ,listagg (' tgt."'  ||CASE WHEN intab.CM IS NULL THEN InTab.COLUMN_NAME ELSE NULL end || '"= src."' ||InTab.COLUMN_NAME , '",\n') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ||'"' AS Col_Names_set
                    ,listagg ( '"'||InTab.COLUMN_NAME,'",\n') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ||'"' AS Col_Names_ins
                    ,listagg ( ' src."'  ||InTab.COLUMN_NAME,'",\n') WITHIN GROUP ( ORDER BY InTab.ORDINAL_POSITION asc ) ||'"' AS Col_Names_ins1 
                    ,listagg (ORDINAL_POSITION,',') WITHIN GROUP ( ORDER BY ORDINAL_POSITION asc ) ORDINAL_POSITION
                    FROM (
                    SELECT 
                     InTab.TABLE_NAME              
                    ,InTab.COLUMN_NAME
                    ,InTab.ORDINAL_POSITION
                    ,intab.DATA_TYPE
                    ,cons.CONSTRAINT_NAME AS CM
                    FROM INFORMATION_SCHEMA.COLUMNS InTab 
                    LEFT JOIN constrains_vw cons ON cons.table_name = intab.table_name AND InTab.COLUMN_NAME = cons.CONSTRAINT_NAME
                    where intab.TABLE_SCHEMA = '`+ SCHEMA_NAME +`'
                    AND intab.TABLE_NAME = '`+ TABLE_NAME +`'
                    GROUP BY 
                    InTab.TABLE_NAME
                    ,InTab.COLUMN_NAME 
                    ,InTab.COLUMN_NAME
                    ,InTab.ORDINAL_POSITION
                    ,intab.DATA_TYPE
                    ,CONSTRAINT_NAME
                    ORDER BY InTab.TABLE_NAME,InTab.ORDINAL_POSITION ) InTab
                    GROUP BY TABLE_NAME
                    ORDER BY TABLE_NAME,ORDINAL_POSITION
                    ) tt
                    LEFT JOIN constrains_vw cons ON cons.table_name = tt.table_name
                    GROUP BY
                    COL_NAMES_SELECT    
                    ,COL_NAMES_WHEN
                    ,COL_NAMES_SET
                    ,COL_NAMES_INS
                    ,COL_NAMES_INS1;` ; 
    
    var rs_clip_name = snowflake.execute ({sqlText: v_sql_stmt});
   
    var my_sql_command1 = `SELECT Col_Names_select,dd,Col_Names_when,Col_Names_set,Col_Names_ins,Col_Names_ins1 FROM vars_of_merger_dyn00;`; 
    
    var statement2 = snowflake.createStatement( {sqlText: my_sql_command1} );
    var result_set = statement2.execute();
    result_set.next();
    var Col_Names_select = result_set.getColumnValue(1);
    var dd = result_set.getColumnValue(2);
    var Col_Names_when = result_set.getColumnValue(3);
    var Col_Names_set = result_set.getColumnValue(4);
    var Col_Names_ins = result_set.getColumnValue(5);
    var Col_Names_ins1 = result_set.getColumnValue(6);

if (Col_Names_set == '"') 
{ 
var my_sql_command2 = `MERGE INTO EDWH_DEV.`+ SCHEMA_NAME +`.`+ TABLE_NAME +` AS tgt
USING 
( select
`+ Col_Names_select +`
from 
@` + STAGE_NAME + `/` + TABLE_NAME + `.csv  (file_format => 'CSV') )
AS src

ON ( `+ dd +`
     )

WHEN NOT MATCHED
THEN INSERT ( `+ Col_Names_ins +`)
VALUES 
(`+ Col_Names_ins1 +`); `; 
    var rs_clip_name2 = snowflake.execute ({sqlText: my_sql_command2});

snowflake.createStatement( { sqlText: `INSERT INTO GEN_LOG
("number of rows inserted", "number of rows updated", proc_name , FINISHED, USER_NAME, USER_ROLE, STATUS, MESSAGE)
 SELECT "number of rows inserted", 0 as "number of rows updated", '` + TABLE_NAME + `' AS proc_name  , sysdate(), CURRENT_USER() ,CURRENT_ROLE(),'done' as status ,'' AS message
        FROM TABLE (RESULT_SCAN(LAST_QUERY_ID()));`} ).execute();

} 
else 
{
var my_sql_command2 = `MERGE INTO EDWH_DEV.`+ SCHEMA_NAME +`.`+ TABLE_NAME +` AS tgt
USING 
( select
`+ Col_Names_select +`
from 
@` + STAGE_NAME + `/` + TABLE_NAME + `.csv  (file_format => 'CSV') )
AS src
ON ( `+ dd +`
     )
WHEN MATCHED
AND `+ Col_Names_when +`
THEN UPDATE SET
`+ Col_Names_set +`
WHEN NOT MATCHED
THEN INSERT ( `+ Col_Names_ins +`)
VALUES 
(`+ Col_Names_ins1 +`); `; 
    var rs_clip_name2 = snowflake.execute ({sqlText: my_sql_command2});

snowflake.createStatement( { sqlText: `INSERT INTO GEN_LOG
("number of rows inserted", "number of rows updated", proc_name , FINISHED, USER_NAME, USER_ROLE, STATUS, MESSAGE)
 SELECT "number of rows inserted","number of rows updated", '` + TABLE_NAME + `' AS proc_name  , sysdate(), CURRENT_USER() ,CURRENT_ROLE(),'done' as status ,'' AS message
        FROM TABLE (RESULT_SCAN(LAST_QUERY_ID()));`} ).execute();   

}
     snowflake.execute( {sqlText: "commit;"} );
    result = "Succeeded" + my_sql_command2 ;
} catch (err) {
  snowflake.execute({
      sqlText: `insert into GEN_LOG VALUES (DEFAULT,?,?,?,?,?,?,?,?)`
      ,binds: [column1, column2, column3 ,column4 , column5 , column6 ,column7 , err.code + " | State: " + err.state + "\n  Message: " + err.message + "\nStack Trace:\n" + err.stackTraceTxt ]
      });
     snowflake.execute( {sqlText: "commit;"} );
     return 'Failed.' + my_sql_command2 ;
}
return result;

$$;

now you can stop here and use proc as : CALL MERGER_BUILDER_GEN('MY_TABLE','MY_SCHEMA','MY_STAGE'); example --- all case senssitive

So what it does in a nut shell it writes a proper merge statement for any table ddl that you created in schema and feeded to proc, it looks up file and creates dynamically select out of it for merge select , then other little bits like "on clause", "when matched and nvl(everything) and when not matched then insert" also it does cast to different data types on the fly, kind of like what "copy into" does but in my humble opinion merge is better for non perfect deltas, so if you don't want to have data lake with partitioned files over dates and then stitch together via external tables or god forbid in a union view then give this a shot.

Also you can use little set up to run as many tables as you like with automerge 1 by 1

create or replace TABLE PROC_LIST (
    PROC_PRIORIT_ID NUMBER(38,0) autoincrement,
    PROC_NAME VARCHAR(150)
);
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE1'); with 50 columns 
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE2');
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE3');
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE4');
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE5'); with 500 columns 
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE6');
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE7');
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE8'); limit dyn sql is 32000 chars go crazy 
INSERT INTO PROC_LIST (PROC_NAME) VALUES ('TABLE9');

--CREATEed SOME nice LIST OF TABLES TO be loaded 1 BY 1 USING AUTO merge !


CREATE OR REPLACE VIEW PROC_LOAD_CONTROL AS
select 
metadata$filename
,REPLACE(REPLACE(metadata$filename,'.csv',''),'path/to/your_table_ifnot_inmain_stage_location/','') AS file_name
,pl.PROC_NAME AS table_name
,'MY_SCHEMA' as schema_name
,'MY_STAGE' AS stage_name
from @MY_STAGE
inner JOIN PROC_LIST pl ON pl.PROC_NAME = REPLACE(REPLACE(metadata$filename,'.csv',''),'path/to/your_table_ifnot_inmain_stage_location/','')
GROUP BY metadata$filename,pl.proc_name
ORDER BY REPLACE(REPLACE(metadata$filename,'.csv',''),'path/to/your_table_ifnot_inmain_stage_location/','') asc;

--this will make sure that your TABLES MATCH names WITH actual FILES IN your STAGE, please look FOR requisite TO make this thing WORK smoothly

CREATE OR REPLACE PROCEDURE "PROJECT_REFRESH_MRG"()
RETURNS VARCHAR(1000)
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS $$

try {
        
    var v_sql_stmt = `SELECT 
    table_name
    ,schema_name
    ,stage_name
    
    FROM PROC_LOAD_CONTROL;`;
    var rs_proc_name = snowflake.execute ({sqlText: v_sql_stmt});
    var v_table_name = '';
    var v_schema_name = '';
    var v_stage_name = '';
    
    //loop throgh all the external table and refresh
    while (rs_proc_name.next())  {
        v_table_name = rs_proc_name.getColumnValue(1);
        v_schema_name = rs_proc_name.getColumnValue(2);
        v_stage_name = rs_proc_name.getColumnValue(3);
       
        //refresh the external table
        v_sql_stmt = `call MERGER_BUILDER_GEN('`+v_table_name+`','`+v_schema_name+`','`+v_stage_name+`')`;
        snowflake.execute ({sqlText: v_sql_stmt});

    }
    return "Success: " + v_sql_stmt;
}
catch (err)  
    {
        //error log here                  
        return "Failed" + err;   // Return a success/error indicator
    }
$$;

--- So this will create a list of tables with stage and schema vars and pass in while loop to generic merger builder.