KafkaTridentSpoutOpaque Repeated consumption the last message

115 views Asked by At

I use storm+kafka+protobuf to build my stream process system.

The problem is KafkaTridentSpoutOpaque repeatedly consumes the last message. I want just one consumer for every message in kafka.

Followings are some details:

Java Dependency

storm-kafka-client 1.2.2

storm-core 1.2.2

kafka_2.10 0.10.2.0

Component

kafka_2.12-2.0.0

apache-storm-1.2.2

Build KafkaTridentSpoutOpaque instance code

protected static KafkaSpoutConfig<String, byte[]> newKafkaSpoutConfig(String bootstrapServers, String topic) {
        KafkaSpoutConfig.Builder<String, byte[]> builder = new KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
        return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "stormKafkaSpoutGroup")
                .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
                .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
                .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
                .setRecordTranslator(new JustValueFunc(), new Fields("str"))
                .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
                .setProcessingGuarantee(AT_MOST_ONCE)
                .build();
    }
    private static KafkaTridentSpoutOpaque<String, byte[]> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte[]> spoutConfig) {
        return new KafkaTridentSpoutOpaque<>(spoutConfig);
    }
    private static class JustValueFunc implements Func<ConsumerRecord<String, byte[]>, List<Object>>, Serializable {
        @Override
        public List<Object> apply(ConsumerRecord<String, byte[]> record) {
            Values res = null;
            try {
                res = new Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            return res;
        }
    }

Here is my topology code

public static void main(String[] args) throws Exception {
        StormTopology topology = getTridentTopology();
        Config conf = new Config();
        conf.setNumWorkers(20);
        conf.setMaxSpoutPending(5000);
        StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
    }

    public static StormTopology getTridentTopology() {
        final TridentTopology tridentTopology = new TridentTopology();

        KafkaSpoutConfig<String, byte[]> spoutConfig = newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
        ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);

        final Stream spoutStream = tridentTopology.newStream("spout", spout).parallelismHint(1);

        spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));

        return tridentTopology.build();
    }

Output Log

./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1

I just produce one message in kafka and it should be only one single output, but indeed there are many. And it will repeat about every 45 minutes.

Any help is appreciated.

Thanks.

1

There are 1 answers

0
ChenBo On

Setting max spout pending value very high causes this. Try setting with low value say 1.

setMaxSpoutPending:that set the max number unfinish task can emit. there are some advice about how to set this option.