We have an application that moves csv files from HDFS to Hive. We are using Storm Topology for that process.
8 machines have been using. Each of them has 22 cores and 512 GB RAM. However, our code runs really slow. It takes 10 minutes to finish to transfer 6 million data.
10 MB of 60 files are transferring to HDFS in one second. We are trying to optimize our code, but it is obvious that we are doing something very wrong.
For Hive table, we have 64 buckets.
In our topology, we have 1 Spout and 2 Bolts. Basically our Spout gets the CSV file, emits lines to first Bolt which is responsible for parsing the data then the Bolt emits to second Bolt which is responsible for HDFS process.
HDFS Spout;
HdfsSpout hdfsSpout = new HdfsSpout()
.withOutputFields(TextFileReader.defaultFields)
.setReaderType("text")
.setHdfsUri(hdfsUri)
.setSourceDir("/data/in")
.setArchiveDir("/data/done")
.setBadFilesDir("/data/bad")
.setClocksInSync(true) // NTP installed on all hosts
.setIgnoreSuffix("_COPYING_")
// do not begin reading file until it is completely copied to HDFS
.setMaxOutstanding(50_000);
Mapper;
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(TTDPIRecord.fieldsList))
.withPartitionFields(new Fields(TTDPIRecord.partitionFieldsList));
Hive Options;
HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
.withAutoCreatePartitions(true)
.withHeartBeatInterval(3)
.withCallTimeout(10_000) // default = 10.000
.withTxnsPerBatch(2)
.withBatchSize(50_000)
// doing below because its affecting storm metrics most likely
.withTickTupleInterval(1);
Config;
Config conf = new Config();
conf.setNumWorkers(6);
conf.setNumAckers(6);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
Topology Builder;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hdfsSpout", hdfsSpout, 8);
builder.setBolt("recordParserBolt", recordParserBolt, 8).localOrShuffleGrouping("hdfsSpout");
builder.setBolt("hiveBolt", hiveBolt, 8).localOrShuffleGrouping("recordParserBolt");
We are not sure with the following Parameters;
in HDFS Spout; .setMaxOutstanding(50_000);
in Hive Spout Options; .withTxnsPerBatch(2) .withBatchSize(50_000) .withTickTupleInterval(1);
in Config; .setNumWorkers(6); .setNumAckers(6);
Parallelism in Spout and Bolt; We gave 8 for each.
What should be the values for those parameters? Thanks in advance.
Edit; Here is our test result for 10 mb of 100 csv files;
hdfsSpout Executors: 8 Complete Latency: 1834.209 ms
recordParserBolt Executors: 8 Complete Latency: 0.019 ms
hiveBolt Executors: 8 Complete Latency: 1092.624 ms
You are doing
conf.setNumWorkers(6);
which means you are only using 6 out of 8 machines only, you can set it to 8 to utilise all the hardware you have.Another parameter you can change is parallelism hint of your bolts which which means the initial number of executor (threads) of a component. You have given parallelism to only 8, you can increase this to 100/200 and see how performance varies.
You can go through this to understand how parallelism works in storm.
Can you also tell what is your config for max-spout-pending?