How to fix the NullPointerException happened in KafkaSpout running on Heron?

384 views Asked by At

When I run a topology of Storm with KafkaSpout in Heron, the following exception occurs:

[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: 
Starting instance container_2_ads_2 for topology AdvertisingTopology and topologyId AdvertisingTopologyf7b4acbe-bdbc-4772-aaa4-9dd2f113f405 for component ads with taskId 2 and componentIndex 0 and stmgrId stmgr-2 and stmgrPort 31162 and metricsManagerPort 31067  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: System Config: {heron.streammgr.network.backpressure.lowwatermark.mb=50, heron.streammgr.connection.write.batch.size.mb=1, heron.streammgr.stateful.buffer.size.mb=100, heron.instance.internal.bolt.write.queue.capacity=128, heron.instance.tuning.expected.spout.read.queue.size=512, heron.metricsmgr.network.write.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.reconnect.streammgr.interval.sec=PT5S, heron.instance.tuning.interval.ms=PT0.1S, heron.instance.emit.batch.size.bytes=ByteAmount{32768 bytes}, heron.logging.directory=log-files, heron.check.tmaster.location.interval.sec=120, heron.instance.reconnect.metricsmgr.interval.sec=PT5S, heron.streammgr.client.reconnect.tmaster.max.attempts=30, heron.streammgr.network.backpressure.highwatermark.mb=100, heron.instance.network.read.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.tuning.expected.metrics.write.queue.size=8, heron.instance.internal.spout.write.queue.capacity=128, heron.instance.force.exit.timeout.ms=PT2S, heron.tmaster.network.stats.options.maximum.packet.mb=1, heron.streammgr.xormgr.rotatingmap.nbuckets=3, heron.instance.set.control.tuple.capacity=1024, heron.metricsmgr.network.read.batch.size.bytes=ByteAmount{32768 bytes}, heron.streammgr.client.reconnect.tmaster.interval.sec=10, heron.instance.execute.batch.time.ms=PT0.016S, heron.metrics.export.interval.sec=PT1M, heron.streammgr.connection.read.batch.size.mb=1, heron.streammgr.cache.drain.size.mb=100, heron.tmaster.network.master.options.maximum.packet.mb=16, heron.tmaster.establish.retry.interval.sec=1, heron.metrics.max.exceptions.per.message.count=1024, heron.tmaster.stmgr.state.timeout.sec=60, heron.instance.network.write.batch.size.bytes=ByteAmount{32768 bytes}, heron.logging.err.threshold=3, heron.tmaster.network.controller.options.maximum.packet.mb=1, heron.tmaster.metrics.collector.maximum.exception=256, heron.instance.network.write.batch.time.ms=PT0.016S, heron.instance.network.options.socket.send.buffer.size.bytes=ByteAmount{6 MB (6553600 bytes)}, heron.streammgr.mempool.max.message.number=512, heron.logging.maximum.size.mb=100, heron.streammgr.tmaster.heartbeat.interval.sec=10, heron.instance.network.read.batch.time.ms=PT0.016S, heron.tmaster.metrics.network.bindallinterfaces=false, heron.streammgr.network.options.maximum.packet.mb=10, heron.instance.tuning.expected.bolt.write.queue.size=8, heron.metricsmgr.network.options.socket.received.buffer.size.bytes=ByteAmount{8 MB (8738000 bytes)}, heron.logging.maximum.files=5, heron.instance.network.options.socket.received.buffer.size.bytes=ByteAmount{8 MB (8738000 bytes)}, heron.instance.execute.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.acknowledgement.nbuckets=10, heron.metricsmgr.network.read.batch.time.ms=PT0.016S, heron.metricsmgr.network.options.socket.send.buffer.size.bytes=ByteAmount{6 MB (6553600 bytes)}, heron.metricsmgr.network.options.maximum.packetsize.bytes=ByteAmount{1 MB (1048576 bytes)}, heron.instance.tuning.expected.bolt.read.queue.size=8, heron.logging.flush.interval.sec=10, heron.streammgr.cache.drain.frequency.ms=10, heron.tmaster.establish.retry.times=30, heron.instance.network.options.maximum.packetsize.bytes=ByteAmount{10 MB (10485760 bytes)}, heron.instance.tuning.current.sample.weight=0.8, heron.instance.reconnect.streammgr.times=60, heron.logging.prune.interval.sec=300, heron.instance.reconnect.metricsmgr.times=60, heron.tmaster.metrics.collector.maximum.interval.min=PT3H, heron.tmaster.metrics.collector.purge.interval.sec=PT1M, heron.streammgr.client.reconnect.interval.sec=1, heron.instance.internal.spout.read.queue.capacity=1024, heron.instance.ack.batch.time.ms=PT0.128S, heron.instance.set.data.tuple.size.bytes=ByteAmount{8 MB (8388608 bytes)}, heron.instance.tuning.expected.spout.write.queue.size=8, heron.instance.internal.bolt.read.queue.capacity=128, heron.instance.set.data.tuple.capacity=1024, heron.instance.metrics.system.sample.interval.sec=PT10S, heron.streammgr.network.backpressure.threshold=3, heron.instance.emit.batch.time.ms=PT0.016S, heron.metricsmgr.network.write.batch.time.ms=PT0.016S, heron.instance.internal.metrics.write.queue.capacity=128}  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: Connecting to endpoint: /127.0.0.1:31162  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: Connecting to endpoint: /127.0.0.1:31067  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Connected to Stream Manager. Ready to send register request  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: Connected to Metrics Manager. Ready to send register request  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Stop writing due to not yet connected to Stream Manager.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Stop writing due to not yet connected to Stream Manager.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: We registered ourselves to the Stream Manager  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Handling assignment message from response  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: We received a new Physical Plan.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Push to Slave  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: We registered ourselves to the Metrics Manager  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Building configs for component: ads  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Added topology-level configs: {topology.acker.executors=2, topology.workers=3, topology.skip.missing.kryo.registrations=false, topology.enable.message.timeouts=true, topology.serializer.classname=org.apache.storm.serialization.HeronPluggableSerializerDelegate, topology.debug=false, topology.max.spout.pending=100, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, topology.fall.back.on.java.serialization=false, topology.name=AdvertisingTopology, topology.component.parallelism=1, topology.stmgrs=3, topology.reliability.mode=ATLEAST_ONCE, topology.message.timeout.secs=30}  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Added component-specific configs: {topology.acker.executors=2, config.zkRoot=/ad-events/6647e83d-6bd8-454e-ad91-d3ec0a012e62, topology.workers=3, topology.skip.missing.kryo.registrations=false, topology.enable.message.timeouts=true, topology.serializer.classname=org.apache.storm.serialization.HeronPluggableSerializerDelegate, topology.debug=false, topology.max.spout.pending=100, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, topology.fall.back.on.java.serialization=false, topology.name=AdvertisingTopology, topology.component.parallelism=1, config.topics=ad-events, topology.stmgrs=3, topology.reliability.mode=ATLEAST_ONCE, topology.message.timeout.secs=30, config.zkNodeBrokers=/brokers}  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Incarnating ourselves as ads with task id 2  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: Is this topology stateful: false  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: Enable Ack: true  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: EnableMessageTimeouts: true  
[2018-11-01 22:43:49 +0800] [SEVERE] com.twitter.heron.instance.HeronInstance: Exception caught in thread: SlaveThread with id: 12 
java.lang.NullPointerException
    at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:80)
    at org.apache.storm.topology.IRichSpoutDelegate.open(IRichSpoutDelegate.java:53)
    at com.twitter.heron.instance.spout.SpoutInstance.init(SpoutInstance.java:173)
    at com.twitter.heron.instance.Slave.startInstanceIfNeeded(Slave.java:222)
    at com.twitter.heron.instance.Slave.handleNewAssignment(Slave.java:173)
    at com.twitter.heron.instance.Slave.handleNewPhysicalPlan(Slave.java:349)
    at com.twitter.heron.instance.Slave.access$300(Slave.java:49)
    at com.twitter.heron.instance.Slave$1.run(Slave.java:118)
    at com.twitter.heron.common.basics.WakeableLooper.executeTasksOnWakeup(WakeableLooper.java:160)
    at com.twitter.heron.common.basics.WakeableLooper.runOnce(WakeableLooper.java:89)
    at com.twitter.heron.common.basics.WakeableLooper.loop(WakeableLooper.java:79)
    at com.twitter.heron.instance.Slave.run(Slave.java:180)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: Waiting for process exit in PT2S  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Closing the Slave Thread  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.metrics.MetricsCollector: Forcing to gather all metrics and flush out.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Shutting down the instance  
[2018-11-01 22:43:49 +0800] [WARNING] com.twitter.heron.common.basics.SysUtils: Failed to close com.twitter.heron.instance.Slave@4bef4d93 
java.lang.NullPointerException
    at org.apache.storm.kafka.KafkaSpout.close(KafkaSpout.java:136)
    at org.apache.storm.topology.IRichSpoutDelegate.close(IRichSpoutDelegate.java:58)
    at com.twitter.heron.instance.spout.SpoutInstance.clean(SpoutInstance.java:195)
    at com.twitter.heron.instance.spout.SpoutInstance.shutdown(SpoutInstance.java:204)
    at com.twitter.heron.instance.Slave.close(Slave.java:238)
    at com.twitter.heron.common.basics.SysUtils.closeIgnoringExceptions(SysUtils.java:66)
    at com.twitter.heron.instance.HeronInstance$SlaveExitTask.run(HeronInstance.java:428)
    at com.twitter.heron.instance.HeronInstance$DefaultExceptionHandler.handleException(HeronInstance.java:396)
    at com.twitter.heron.instance.HeronInstance$DefaultExceptionHandler.uncaughtException(HeronInstance.java:360)
    at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1057)
    at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1052)
    at java.lang.Thread.dispatchUncaughtException(Thread.java:1959)

[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Gateway: Closing the Gateway thread  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.metrics.MetricsCollector: Forcing to gather all metrics and flush out.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: Flushing all pending data in MetricsManagerClient  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Flushing all pending data in StreamManagerClient  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.SocketChannelHelper: Forcing to flush data to socket with best effort.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: To stop the HeronClient.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: MetricsManagerClient exits  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.SocketChannelHelper: Forcing to flush data to socket with best effort.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: To stop the HeronClient.  
[2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: StreamManagerClient exits.  
[2018-11-01 22:43:49 +0800] [SEVERE] com.twitter.heron.instance.HeronInstance: Instance Process exiting.

And the codes of the topology as follows:

String zkServerHosts = "MY_ZK_IP:2181";
ZkHosts hosts = new ZkHosts(zkServerHosts);

SpoutConfig spoutConfig = new SpoutConfig(hosts, kafkaTopic, "/" + kafkaTopic, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

And the location of the NPE is 80 lines of the open method in KafkaSpout class:

public Object getValueAndReset() {
    List<PartitionManager> pms = KafkaSpout.this.coordinator.getMyManagedPartitions();
    Set<Partition> latestPartitions = new HashSet();
    Iterator var3 = pms.iterator();

    PartitionManager pm;
    while(var3.hasNext()) { // the line of NPE happened
         pm = (PartitionManager)var3.next();
         latestPartitions.add(pm.getPartition());
    }

    this.kafkaOffsetMetric.refreshPartitions(latestPartitions);
    var3 = pms.iterator();

    while(var3.hasNext()) {
         pm = (PartitionManager)var3.next();      
         this.kafkaOffsetMetric.setOffsetData(pm.getPartition(), 
         pm.getOffsetData());
    }

    return this.kafkaOffsetMetric.getValueAndReset();
 }

I don't know what caused this problem and how to fix it. Any help is grateful.

NEW EDITED: All imports have been pointed to the heron-storm classes, but the NPE still happened.

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
2

There are 2 answers

7
Tom Cooper On

The Storm based Kafka spout does not work with the native Heron topology API. You will need to use the heron-storm API in the compatible mode (add this dependency to your pom file) to build your topology and interface with the Storm-Kafka spout. It should just be a case of swapping the heron imports for heron-storm imports in your bolts.

Some examples of using the heron-storm api are shown here.

Storm and Heron activate their bolts/spouts in different ways, which can cause issues with Storm only code in native Heron topologies.

0
thinker0 On

This is resolved.

final KafkaSpout<byte[], byte[]> spout = 
    new KafkaSpout<byte[], byte[]>(kafkaSpoutConfig) {

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        super.open(conf, context, collector);
        super.activate();
    }
};