Converting a Storm 1 Kafka Topology to Heron, have a few questions

135 views Asked by At

Been experimenting with switching a Storm 1.0.6 topology to Heron. Taking a baby step by removing all but the Kafka spout to see how things go. Have a main method as follows (modified from the original Flux version):

import org.apache.heron.eco.Eco;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class KafkaTopology {

    public static void main(String[] args) throws Exception {

        List<String> argList = new ArrayList<String>(Arrays.asList(args));
        String file = KafkaTopology.class.getClassLoader().getResource("topology.yaml").getFile();
        argList.add("local");
        argList.add("--eco-config-file");
        argList.add(file);

        file = KafkaTopology.class.getClassLoader().getResource("dev.properties").getFile();
        argList.add("--props");
        argList.add(file);

        argList.add("--sleep");
        argList.add("36000000");

        String[] ecoArgs = argList.toArray(new String[argList.size()]);

        Eco.main(ecoArgs);

    }
}

YAML is this:

name: "kafkaTopology-XXX_topologyVersion_XXX"
type: "storm"

config:
  topology.workers: ${workers.config}
  topology.max.spout.pending: ${max.spout.pending}
  topology.message.timeout.secs: 120
  topology.testing.always.try.serialize: true
  storm.zookeeper.session.timeout: 30000
  storm.zookeeper.connection.timeout: 30000
  storm.zookeeper.retry.times: 5
  storm.zookeeper.retry.interval: 2000
  properties:
    kafka.mapper.zkServers: ${kafka.mapper.zkServers}
    kafka.mapper.zkPort: ${kafka.mapper.zkPort}
    bootstrap.servers: ${bootstrap.servers}
    kafka.mapper.brokerZkStr: ${kafka.mapper.brokerZkStr}
    kafka.topic.name: ${kafka.topic.name}

components:

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - ${kafka.mapper.brokerZkStr}

  - id: "rawMessageAndMetadataScheme"
    className: "org.acme.storm.spout.RawMessageAndMetadataScheme"

  - id: "messageMetadataSchemeAsMultiScheme"
    className: "org.apache.storm.kafka.MessageMetadataSchemeAsMultiScheme"
    constructorArgs:
      - ref: "rawMessageAndMetadataScheme"

  - id: "kafkaSpoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "zkHosts"
      # topic
      - ${kafka.topic.name}
      # zkRoot
      - "/zkRootKafka.kafkaSpout.builder"
      # id
      - ${kafka.topic.name}
    properties:
      - name: "scheme"
        ref: "messageMetadataSchemeAsMultiScheme"
      - name: zkServers
        value: ${kafka.mapper.zkServers}
      - name: zkPort
        value: ${kafka.mapper.zkPort}
      # Retry Properties
      - name: "retryInitialDelayMs"
        value: 60000
      - name: "retryDelayMultiplier"
        value: 1.5
      - name: "retryDelayMaxMs"
        value: 14400000
      - name: "retryLimit"
        value: 0

# spout definitions
spouts:

  - id: "kafka-spout"
    className: "org.apache.storm.kafka.KafkaSpout"
    parallelism: ${kafka.spout.parallelism}
    constructorArgs:
      - ref: "kafkaSpoutConfig"

Relevant POM entries:

        <dependency>
            <groupId>org.apache.heron</groupId>
            <artifactId>heron-api</artifactId>
            <version>0.20.3-incubating</version>
        </dependency>

        <dependency>
            <groupId>org.apache.heron</groupId>
            <artifactId>heron-storm</artifactId>
            <version>0.20.3-incubating</version>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.0.6</version>
        </dependency>

Main method seems to run fine:

Apr 30, 2021 4:38:49 PM org.apache.heron.eco.parser.EcoParser loadTopologyFromYaml
INFO: Parsing eco config file
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.parser.EcoParser loadTopologyFromYaml
INFO: Performing property substitution.
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.parser.EcoParser loadTopologyFromYaml
INFO: Performing environment variable substitution.
topology type is Storm
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.builder.storm.EcoBuilder buildConfig
INFO: Building topology config
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.Eco printTopologyInfo
INFO: ---------- TOPOLOGY DETAILS ----------
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.Eco printTopologyInfo
INFO: Topology Name: kafkaTopology-XXX_topologyVersion_XXX
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.Eco printTopologyInfo
INFO: --------------- SPOUTS ---------------
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.Eco printTopologyInfo
INFO: kafka-spout [1] (org.apache.storm.kafka.KafkaSpout)
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.Eco printTopologyInfo
INFO: ---------------- BOLTS ---------------
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.Eco printTopologyInfo
INFO: --------------- STREAMS ---------------
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.Eco printTopologyInfo
INFO: --------------------------------------
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.builder.storm.EcoBuilder buildTopologyBuilder
INFO: Building components
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.builder.storm.EcoBuilder buildTopologyBuilder
INFO: Building spouts
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.builder.storm.EcoBuilder buildTopologyBuilder
INFO: Building bolts
Apr 30, 2021 4:38:49 PM org.apache.heron.eco.builder.storm.EcoBuilder buildTopologyBuilder
INFO: Building streams

Process finished with exit code 0

Question 1: The topology exits immediately, is there an Eco flag equivalent to Flux '--sleep' to keep it running for a while (to debug, etc.)?

Question 2: Was a little surprised that I needed to pull storm-kafka in (thought there would be a Heron equivalent) - is this correct (or some other artifact?) and if so, is 1.0.6 an OK version to use or does Heron work better with another version?

Question 3: The above was with type: "storm" in the YAML, trying type: "heron" gives the following error:

INFO: Building spouts
Exception in thread "main" java.lang.ClassCastException: class org.apache.storm.kafka.KafkaSpout cannot be cast to class org.apache.heron.api.spout.IRichSpout (org.apache.storm.kafka.KafkaSpout and org.apache.heron.api.spout.IRichSpout are in unnamed module of loader 'app')
    at org.apache.heron.eco.builder.heron.SpoutBuilder.buildSpouts(SpoutBuilder.java:42)
    at org.apache.heron.eco.builder.heron.EcoBuilder.buildTopologyBuilder(EcoBuilder.java:70)
    at org.apache.heron.eco.Eco.submit(Eco.java:125)
    at org.apache.heron.eco.Eco.main(Eco.java:161)
    at KafkaTopology.main(KafkaTopology.java:26)

Process finished with exit code 1

Is this just the way it is using Kafka, type needs to be storm and not heron, or is there some workaround here?

2

There are 2 answers

0
Josh Fischer On BEST ANSWER

Question 1: I'm not sure why the topology would shut down on you. Try to run your submit with the --verbose flag. At this time the functionality of the --sleep argument does not exist. It could be a feature added if you need.

Question 2: There is a Heron equivalent. After Heron was donated to Apache quite a lot of work had to be done to get binary releases out. Most of that work has been done. With the next release I would hope that all binary artifacts will be distributed appropriately.

Question 3:This issue occurs because based on the type specified it looks for bolts/spouts in a certain package. When "storm" is input it expects the classes it implements or extends to be of "org.apache.storm". When "heron" is input it expects the classes it implements or extends to be of "org.apache.heron". If you use the dependency storm-kafka the type will need to be "storm". The heron equivalents can be found here. https://search.maven.org/search?q=heron-kafka

0
thinker0 On
  1. https://search.maven.org/search?q=heron-kafka