Optimize batch insert for snowflake for millions of record using jdbcTemplet

40 views Asked by At

How to optimize batch insert in snowflake database for millions of records while using spring boot jdbcTemplet.

public Mono<Void> saveData(List<Map<String, Object>> dataMapList, String jdbcUrl, String username, String password, String targetObject) {
    return Mono.fromCallable(() -> {
        HikariDataSource dataSource = getOrCreateDataSource(jdbcUrl, username, password);
        try(Connection conn = dataSource.getConnection()){
            // Use Spring JdbcTemplate to perform the batch insert statement
            JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

            // If the dataMapList is empty, do nothing
            if (dataMapList == null || dataMapList.isEmpty()) {
                return Mono.empty();
            }

            // Extract the columns from the first map in the list
            Map<String, Object> firstDataMap = dataMapList.get(0);
            List<String> columnNames = new ArrayList<>(firstDataMap.keySet());

            // Build the insert SQL statement based on the column names
            StringBuilder sqlBuilder = new StringBuilder("INSERT INTO " + dbSchema(jdbcUrl,targetObject) + "(");
            for (String columnName : columnNames) {
                sqlBuilder.append(columnName).append(", ");
            }
            sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()).append(") VALUES (");

            // Append placeholders for each column value
            for (int i = 0; i < columnNames.size(); i++) {
                sqlBuilder.append("?, ");
            }
            sqlBuilder.delete(sqlBuilder.length() - 2, sqlBuilder.length()).append(")");

            // Prepare the batch arguments
            List<Object[]> batchArgs = new ArrayList<>();
            for (Map<String, Object> data : dataMapList) {
                Object[] values = columnNames.stream().map(data::get).toArray();
                batchArgs.add(values);
            }

            // Execute the batch insert statement
            try {
                log.info("sqlBuilder.toString() : {}", sqlBuilder.toString());
                log.info("batchArgs rows: {}", batchArgs.size());
                jdbcTemplate.batchUpdate(sqlBuilder.toString(), batchArgs);
                log.info("completed insert rows size: {}", batchArgs.size());
                return Mono.empty(); // Return null to indicate success
            } catch (Exception e) {
                log.error("Error while inserting multi-row data : {}",e.getMessage());
                throw new RuntimeException(e);
            }
        }catch(Exception ex){
            log.error("Error while using JdbcTemplate: {}", ex.getMessage());
            throw new RuntimeException(ex);
        }

    }).subscribeOn(Schedulers.boundedElastic()).then();
}

this method taking much time for execution or getting failed without giving any exception. How to optimize this code to increase the performance? Here data size is huge.

1

There are 1 answers

5
Ricardo Gellman On

You can adjust your code with saveData method to manage the process of staging the data into Snowflake creating a INSERT INTO statement and executing the operation asynchronously. Setting the PreparedStatement parameters can be more efficient.

public Mono<Void> saveData(List<Map<String, Object>> dataMapList, String jdbcUrl, String username, String password, String targetObject) {
    return Mono.fromCallable(() -> {
        HikariDataSource dataSource = getOrCreateDataSource(jdbcUrl, username, password);
        try (Connection conn = dataSource.getConnection()) {
            if (dataMapList == null || dataMapList.isEmpty()) {
                return Mono.empty();
            }

            // snowflake using COPY INTO command
            String copyIntoStatement = buildCopyIntoStatement(targetObject, dataMapList);
            try (PreparedStatement pstmt = conn.prepareStatement(copyIntoStatement)) {
                IntStream.range(0, dataMapList.size())
                        .forEach(i -> setPreparedStatementParams(pstmt, dataMapList.get(i), i));
                pstmt.executeBatch();
            }

            return Mono.empty(); // success
        } catch (Exception ex) {
            throw new RuntimeException("Error while saving data: " + ex.getMessage(), ex);
        }
    }).subscribeOn(Schedulers.boundedElastic()).then();
}

private String buildInsertIntoStatement(String targetObject, List<Map<String, Object>> dataMapList) {
    Map<String, Object> firstDataMap = dataMapList.get(0);
    String columnNames = firstDataMap.keySet().stream()
            .collect(Collectors.joining(", "));

    String valuesPlaceholders = IntStream.range(0, dataMapList.size())
            .mapToObj(i -> "(" + firstDataMap.keySet().stream()
                                        .map(columnName -> "?")
                                        .collect(Collectors.joining(", ")) + ")")
            .collect(Collectors.joining(", "));

    return "INSERT INTO " + targetObject + " (" + columnNames + ") VALUES " + valuesPlaceholders;
}

private void setPreparedStatementParams(PreparedStatement pstmt, Map<String, Object> data, int rowIndex) {
    int parameterIndex = rowIndex * data.size() + 1;
    data.values().forEach(value -> {
        try {
            pstmt.setObject(parameterIndex++, value);
        } catch (Exception ex) {
            throw new RuntimeException("Error while setting prepared statement parameter: " + ex.getMessage(), ex);
        }
    });
}