How to upload binary filestream to S3 bucket using Apache Nifi?

327 views Asked by At

I have filename and filestream saved in the SQL Server database. I want to upload this filestream as a file on AWS S3 Bucket using Apache NIFI.

Currently I am following below processors in the same sequence:

  1. ExcecuteSQL (Here I wrote SQL query: select filename, filestream from table)
  2. ConvertAvroToJson (because ExecuteSQL returns Avro format data)
  3. EvaluateJsonPath (To read filestream column)
  4. Base64EncodeContent
  5. PutS3Object

enter image description here

Now problem is, This approach doesn't convert file stream to file on S3 Bucket. It just uploads a file on s3 bucket having filestream column data. It should work like if filestream is of "png" image type then it should upload png image to s3 bucket. and If filestream is of "xlsx" type then it should upload xlsx file on s3 bucket.

This is the sample database: enter image description here

2

There are 2 answers

5
daggett On

maybe there is a nifi native way to insert read blob column however you could use ExecuteGroovyScript processor instead.

add SQL.mydb parameter on the level of processor and link it to required DBCP pool.

use following script body (have no chance to test):

def ff=session.get()
if(!ff)return

//just assumption - i don't know your table structure...
def query = '''
  select from myTable (file_name,bin_content) where update_time > :p_timestamp
'''
def params = [
  p_timestamp: ff.last_timestamp
]

def outFiles = [] //list of files for output

//SQL.mydb is a reference to groovy.sql.Sql instance
SQL.mydb.eachRow(query, params){ row->
  def binFile = ff.clone(false) //clone incoming file but without content
  binFile.filename = row.file_name
  binFile.write{ stream-> stream << row.bin_content.getBinaryStream() }
  outFiles << binFile
}

REL_SUCCESS << outFiles   //transfer list of new files to success
session.remove(ff)  //drop incoming file

the script above will execute sql select and for each received record will produce a new flow file with name and content received from db.

details about ExecuteGroovyScript processor features:

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-groovyx-nar/1.20.0/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html

0
code_hr On

This script worked!

import groovy.sql.Sql

def ff=session.create()

def sqlIns = Sql.newInstance('jdbc:sqlserver://servername:port;databaseName=dbname;encrypt=true;trustServerCertificate=true', 'username', 'password', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')

// Query the database to fetch the data
def query = 'SELECT FileName, FileStream FROM table'

def outFiles = [] //list of files for output

sqlIns.eachRow(query){ row->
 log.info "${row.FileName}"
 def binFile = ff.clone(false)
 binFile.filename = row.FileName
 binFile.write{ stream-> stream << row.FileStream }
 outFiles << binFile
 }

REL_SUCCESS << outFiles
session.remove(ff)  

enter image description here