Insane throughput slowdown kafka streams using join (kstream & ktable) using jmh and TopologyTestDriver

281 views Asked by At

Am trying to measure relative throughput of my kafka streams app using jmh and TopologyTestDriver. This is on an M1 Mac but all numbers are relative anyway so eh. Anyway when piping directly from my input topic to my output topic:

public class EventTopologyMulti {
  public static StreamsBuilder topology() {
    StreamsBuilder builder = new StreamsBuilder();

    // Main topic
    KStream<String, Event> mainEvents = builder.<String, Event>stream("event_ingestion");

    mainEvents.to("enriched_events");
    return builder;
  }
}

& then running jmh I get:

TopologyBenchMulti.benchmarkTopology  thrpt    4  34548.895 ± 6484.374  ops/s

Spectacular - love it - all good.

BUT as soon as I join with one table it all goes horribly wrong:

public class EventTopologyMulti {
  public static StreamsBuilder topology() {
    StreamsBuilder builder = new StreamsBuilder();

    // Main topic
    KStream<String, Event> mainEvents = builder.<String, Event>stream("event_ingestion");
     
    // Topic to table-ize & join
    KStream<String, Event> identifyStream = builder.<String, Event>stream("indentify");

    // Create the table
    KTable<String, Event> identifyTable = identifyStream.toTable();

    // Create the ValueJoiner
    final EventWithIdentify identifyJoiner = new EventWithIdentify();

    // Do the join & send to final topic
    mainStream.leftJoin(identifyTable, identifyJoiner).to("enriched_events");

    return builder;
  }
}

And I run jmh again WOW:

TopologyBenchMulti.benchmarkTopology  thrpt    4  22.934 ± 2.394  ops/s

!!! Went from 35k -> 23 ops/s????

YES both topics use the same key value for each record. Yes the topics are co-partitioned (although running via TopologyTestDriver that doesn't matter). Both topics ingest the same shape value are keyed with the same value so it should be super easy/straightforward/not require any extra processing.

Here is my value joiner - that does nothing:

public class EventWithIdentify implements ValueJoiner<Event, Event, Event> {
  public Event apply(Event event, Event identifyEvent) {
    return event;
  }
}

Note I get this kind of massive slowdown whether it's join or leftJoin. Also get the same slowdown if I send all Events to the same topic & then branch them & then to the KTable & then the leftJoin (using .split().branch(...)).

Funnily enough I get an even larger slowdown if I use Materialized('identify-table') when converting the KStream to a KTable (.to(Materialized.as('identify-table'))) !!

What the heck is going on? Sure I'd expect some slowdown but not this much!!!

For completeness here is the jmh test code:

public class TopologyBenchMulti {

  private static final String SCHEMA_REGISTRY_SCOPE = TopologyBenchMulti.class.getName();
  private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE;
  private static TopologyTestDriver testDriver;

  @State(org.openjdk.jmh.annotations.Scope.Thread)
  public static class MyState {
    public TestInputTopic<String, Event> mainEventsTopic;
    public TestInputTopic<String, Event> identifyTopic;
    public Event regStartedEvent;

    @Setup(Level.Trial)
    public void setupState() {
      Properties props = new Props().props;
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
      props.put("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);

      // build the topology
      Topology topology = EventTopologyMulti.topology().build();

      // create a test driver. we will use this to pipe data to our topology
      testDriver = new TopologyTestDriver(topology, props);

      // create the avro value serde
      Serde<Event> avroEventSerde = new SpecificAvroSerde<>();
      Map<String, String> config = Map.of("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
      avroEventSerde.configure(config, false);

      // create the test topics
      mainEventsTopic =
          testDriver.createInputTopic(
              "event_ingestion", Serdes.String().serializer(), avroEventSerde.serializer());

      identifyTopic =
        testDriver.createInputTopic(
          "identify_event", Serdes.String().serializer(), avroEventSerde.serializer());

      Event identifyEvent =
          Event.newBuilder()
            .setId("foofie")
            .setTimestamp(Instant.now())
            .setEventName(EventName.Identify)
            .setEvent(Identify.newBuilder().setUserId("userid-99").build())
            .build();
      identifyTopic.pipeInput("anonymous id", identifyEvent);

      regStartedEvent =
        Event.newBuilder()
          .setId("goofie")
          .setTimestamp(Instant.now())
          .setEventName(EventName.RegistrationStarted)
          .setEvent(
              RegistrationStarted.newBuilder()
             .setAuthenticationMethod(RegistrationMethod.email)
                  .build())
          .build();
    }

    @TearDown(Level.Trial)
    public void tearDown() {
      testDriver.close();
    }
  }

  @Benchmark
  @BenchmarkMode(Mode.Throughput)
  @OutputTimeUnit(TimeUnit.SECONDS)
  public void benchmarkTopology(MyState state) {
    state.mainEventsTopic.pipeInput("anonymous id", state.regStartedEvent);
  }
}

And finally in my build.gradle:

jmh {
  iterations = 4
  benchmarkMode = ['thrpt']
  threads = 1
  fork = 1
  timeOnIteration = '3s'
  resultFormat = 'TEXT'
  profilers = []
  warmupIterations = 3
  warmup = '1s'
}

It's can't actually be this slow by just adding one join on 1 table?? Is it TopologyTestDriver is it me???

Losing hair over this thanks for any input!!!!

0

There are 0 answers