Extracting multiline content from flow file content

3.3k views Asked by At

I am importing data from MySQL table (for selected columns only) and putting it in HDFS. Once this is done, I want to create a table in Hive.

For this I have a schema.sql file which contains the CREATE TABLE statement for the entire table and I want to generate the new CREATE TABLE statement only for the columns I imported.

Something similar to what I am doing with grep in the below example.

enter image description here

I used FetchFile along with ExtractText but couldn't make it work. How can I achieve this using NiFi processors or even Expression Language if I get the overall schema into an attribute?

Or is there a better way to create table on the imported data?

1

There are 1 answers

4
notNull On BEST ANSWER

NiFi can generate Create table statement[s] based on the flowfile content

1.Creating ORC tables by using ConvertAvroToORC processor:

  • if you are converting the avro data into ORC format then storing into HDFS then ConvertAvroToORC processor adds hive.ddl attribute to the flowfile.

  • PutHDFS processor adds absolute.hdfs.path attribute to the flowfile.

  • We can use this hive.ddl, absolute.hdfs.path attributes and create the orc table on top of HDFS directory dynamically.

Flow:

 Pull data from source(ExecuteSQL...etc)
  -> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value--> 
  -> PutHDFS //store the orc file into HDFS location --> 
  -> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'--> 
  -> PutHiveQL //execute the create table statement

Refer to this link for more details regrads to the above flow.

2.Creating Avro tables by using ExtractAvroMetaData processor:

  • In NiFi once we pull data by using QueryDatabaseTable,ExecuteSQL processors the format of the data is in AVRO.

  • We can create Avro tables based on avro schema(.avsc file) and by using ExtractAvroMetaData processor we can extract the schema and keep as flowfile attribute then by using this schema we can create AvroTables dynamically.

Flow:

ExecuteSQL (success)|-> PutHDFS //store data into HDFS
           (success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema 
                     -> ReplaceText //replace flowfile content with avro.schema
                     -> PutHDFS //store the avsc file into schema directory
                     -> ReplaceText //create avro table on top of schema directory
                     -> PutHiveQL //execute the hive.ddl

Example AVRO create table statement:

CREATE TABLE as_avro
  ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  STORED as INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  TBLPROPERTIES (
    'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');

We are going to change path to the schema url by using ReplaceText processor in the above flow.

Another way using ExecuteSQL processor get all the create table statements (or) columns info from (sys.tables/INFORMATION_SCHEMA.COLUMNS ..etc) from source (if source system permits) and write a script to map the data types into hive appropriate types then store them in your desired format in Hive.

EDIT:

To run grep command on the flowfile content we need to use ExecuteStreamCommand processor

ESC Configs:

enter image description here

Then feed the output stream relation to ExtractText Processor

ET Configs:

Add new property as

content

(?s)(.*)

enter image description here

Then content attribute is added to the flowfile, You can use that attribute and prepare create table statements.