I was requested to look into using Apache Spark's MemoryStream to simulate a Kafka stream in a Java Spring Boot service. The documentation / online community is a bit small on this topic so I am seeking aid.
This is my implementation code.
final DataStreamReader kafkaDataStreamReader = kafkaLoader.getStream(sparkSession, options);
final Dataset<Row> df = kafkaDataStreamReader.load();
return df.writeStream().foreachBatch((batch, batchId) -> {
// Process a batch of data received from Kafka
updateData(name, customizerFunction, avroSchema, batch);
- KafkaLoader is a class which, depending on the Profile (it/prod), will configure the Kafka Stream differently. It returns a
DataStreamReaderwhich might be the reason why I'm struggling to create aMemoryStream. - Next, in the writeStream i'm writing to my source destinations.
@Slf4j
@Service
@Profile("it")
public class ItKafkaLoader extends KafkaLoader {
@Autowired
SparkSession sparkSession;
@SneakyThrows
@Override
public DataStreamReader getStream(SparkSession sparkSession, Map<String,Object> options) {
options = Map.of();
MemoryStream<String> stream = null;
try {
stream = new MemoryStream<>(1, sparkSession.sqlContext(), null, Encoders.STRING());
String jsonString = "{..}";
Seq<String> seq = JavaConverters
.collectionAsScalaIterableConverter(List.of(jsonString))
.asScala()
.toSeq();
Offset currentOffset = stream.addData(seq);
stream.commit(currentOffset);
} catch (Exception e){
log.warn("Error creating MemoryStream: ", e);
return new DataStreamReader(sparkSession);
}
Dataset<Row> data = stream.toDF();
log.debug("Stream enabled [t/f]: {}", data.isStreaming());
return data
.sqlContext()
.readStream();
.format("kafka")
.option("kafka.bootstrap.servers", "test-servers")
.option("subscribe", "test-data");
}
ItKafkaLoader is called when I'm running integration tests, hence ActiveProfiles is set to it here, and is where I'm struggling to create a MemoryStream. Because my implementation code is expecting a returned object of type DataStreamReader I believe I need to call on readStream() since it's of type DataStreamReader? However, when I just try readStream() Spark throws an exception about my path not being defined.
java.lang.IllegalArgumentException: 'path' is not specified
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataPathNotSpecifiedError
When searching this error I tend to see that I need to set my format to Kafka. And then doing this, Spark expects a topic and then a broker. I was hoping that since I was using MemoryStream that Spark would just recognize that this is a dummy Kafka cluster & topic and go about kicking of my simulated Kafka Stream through my MemoryStream. That doesn't happen, and when I run my integration test I get these errors.
- Query [id = 4ebacd71-d..., runId = 1a2c4...] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
- Invalid url in bootstrap.servers: test-servers
Ideally, I would like to figure out how to fix getStream() in ItKafkaLoader, however I have a slight feeling that i don't understand what MemoryStream is really for and might need to do something different.
Update: I have seen that in newer versions of Spark you can just set the format to memory, however, it appears that my Spark version v2.12 does not support that. I also do not have the green light to upgrade my Spark version.