I'm investigating taking data from Kafka -> SnowFlake/Kafka connector -> SnowFlake. Unfortunately, the connector seems to use just two columns (and put the entire JSON payload into a single column). So I created a stream/task to periodically copy data from the landing table to the destination table (using insert). Everything works beautifully except for deleting data in the landing table once it has landed in the destination table. Using streams, I know what has landed. How do I delete the rest of the data? Truncate seems so much faster. Do I just periodically run a delete task that deletes these entries? I am also concerned about warehouse time to perform these deletes. Thanks
SnowFlake-Kafka connector -> Landing Table -> Destination table. How to cleanup Landing Table
412 views Asked by Surendar Chandra At
1
There are 1 answers
Related Questions in SNOWFLAKE-TASK
- Load data from Snowflake table to aws s3 in batches for a very large files
- How to Convert SQL code to Snowflake Code
- Snowflake IF ELSE Stored Procedure logic
- Return only last statement from Snowflake SQL query to R
- Snowflake date format change
- Convert MongoDB query to Snowflake
- getting error while calling sowflake external function from one database to another database
- Snowflake Timestamp column not loaded using Copy Command
- Compare variables declared in Snowflake Stored procedure
- Snowflake How to get Records failed in Copy command
- Snowflake Copy Command Result Scan Inconsistency
- Error while loading csv file to Snowflake Table as "Timestamp '9/15/2020 1:28:00 AM' is not recognized"
- Snowflake Alert Long Running Queries
- Schedule snowflake task to run once a month
- How to run pySpark with snowflake JDBC connection driver in AWS glue
Related Questions in SNOWFLAKE-CLOUD-DATA-PLATFORM
- Are there poor practices in this use of python cryptography package to generate RSA keypair?
- snowflake cost management page limited warehouse access to role
- How to make FLATTEN function in Snowflake return PATH in Dot Notation instead of Brackets Notation
- How to overwrite a single partition in Snowflake when using Spark connector
- snowflake enforce unsorted json into variant column
- Spark connectors from Azure Databricks to Snowflake using AzureAD login
- Load data from csv in airflow docker container to snowflake DB
- Snowflake ODBC xdg-open Missing X server or $DISPLAY
- How can I reduce table scan time in snowflake
- API INTEGRATION for azure devops git on snowflake
- When will "create or alter" be available to all accounts?
- Event_date reference in CTE
- Problem decorating Python stored procedure handler with @functools.cache
- How to add a 1 to a phone number and remove the dashes?
- DBT - Merge - Only update condition
Related Questions in SNOWFLAKE-PIPE
- increase snowpipe speed to process 1 record per file
- Access to Snowflake Internal Stage for Non Owner Role
- Snowpipe vs Airflow for Continues data loading into Snowflake
- Clear or truncate information_schema.copy_history table: Snowflake
- Snowflake - Fail COPY INTO (Can't parse '0' as date with format 'YYYYMMDD')
- how to extract snowflake tables schema and store-procedures using python script?
- Snowflake- After MERGE t1 into t2, how do I ensure that next time I run MERGE, old records from t1 don't get merged again?
- Custom Sink to Snowflake using Snowflake JDBC driver is very slow
- Snowflake Alert Long Running Queries
- How to get Data from a Mysql Database to Snowflake
- PySpark Streaming, when writing producing error
- Snowflake ON_ERROR=CONTINUE abort the COPY command for file
- How do we stream a data pipeline for data transfer from snowflake to kafka?
- How to build a data catalog in Glue for Snowflake?
- Snowflake Validate Option does not return Failed records When using TO_DATE function in Copy Command
Popular Questions
- How do I undo the most recent local commits in Git?
- How can I remove a specific item from an array in JavaScript?
- How do I delete a Git branch locally and remotely?
- Find all files containing a specific text (string) on Linux?
- How do I revert a Git repository to a previous commit?
- How do I create an HTML button that acts like a link?
- How do I check out a remote Git branch?
- How do I force "git pull" to overwrite local files?
- How do I list all files of a directory?
- How to check whether a string contains a substring in JavaScript?
- How do I redirect to another webpage?
- How can I iterate over rows in a Pandas DataFrame?
- How do I convert a String to an int in Java?
- Does Python have a string 'contains' substring method?
- How do I check if a string contains a specific word?
Trending Questions
- UIImageView Frame Doesn't Reflect Constraints
- Is it possible to use adb commands to click on a view by finding its ID?
- How to create a new web character symbol recognizable by html/javascript?
- Why isn't my CSS3 animation smooth in Google Chrome (but very smooth on other browsers)?
- Heap Gives Page Fault
- Connect ffmpeg to Visual Studio 2008
- Both Object- and ValueAnimator jumps when Duration is set above API LvL 24
- How to avoid default initialization of objects in std::vector?
- second argument of the command line arguments in a format other than char** argv or char* argv[]
- How to improve efficiency of algorithm which generates next lexicographic permutation?
- Navigating to the another actvity app getting crash in android
- How to read the particular message format in android and store in sqlite database?
- Resetting inventory status after order is cancelled
- Efficiently compute powers of X in SSE/AVX
- Insert into an external database using ajax and php : POST 500 (Internal Server Error)
For a use case where multiple statements(like insert, delete, etc) to access the same change records, surround them in explicit transaction statement(Begin..Commit) which will lock stream.
You can have an additional column like a Flag, lock stream using Begin, use the stream to insert to target table from staging, use stream to perform a second merge to staging table to mark the column Flag.
https://docs.snowflake.com/en/user-guide/streams.html#examples