[enter image description here][1] I could do this to try first, this works well this just print the data on another topic I trie to follow the same logic but don get the result
package org.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Properties;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<TradeEventBase> source = KafkaSource.<TradeEventBase>builder()
.setBootstrapServers("redpanda-0:9092")
.setTopics("data")
.setGroupId("")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TradeEventDeserializationSchema())
.build();
DataStream<TradeEventBase> trades = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Keying by symbol and then applying a 2-second time window
DataStream<Tuple2<String, Long>> counts = trades
.keyBy(event -> event.getSymbol()) // Assuming you have a getSymbol() method in TradeEventBase
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.aggregate(new CountAggregator());
// Define Kafka producer properties
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "redpanda-0:9092");
KafkaSink<Tuple2<String, Long>> sink = KafkaSink.<Tuple2<String, Long>>builder()
.setBootstrapServers("redpanda-0:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("resultado")
.setValueSerializationSchema(new Tuple2SerializationSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
counts.sinkTo(sink);
env.execute("Kafka Trade Consumer-Producer");
}
}
this is an example of what it consume from kafka
{
"e":"aggTrade"
"E":1692050397253
"a":1824780414
"s":"BTCUSDT"
"p":"29390.30"
"q":"0.001"
"f":4007152464
"l":4007152464
"T":1692050397097
"m":false
}
this is the deserialization class
public class TradeEventDeserializationSchema implements DeserializationSchema<TradeEventBase> {
private transient ObjectMapper objectMapper;
@Override
public TradeEventBase deserialize(byte[] message) throws IOException {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
}
String msg = new String(message);
// Check the event type in the message and deserialize accordingly
if (msg.contains("\"e\":\"aggTrade\"")) {
return objectMapper.readValue(msg, AggTradeEvent.class);
} else if (msg.contains("\"e\":\"trade\"")) {
return objectMapper.readValue(msg, TradeEvent.class);
} else {
throw new RuntimeException("Unknown event type in message: " + msg);
}
}
@Override
public boolean isEndOfStream(TradeEventBase nextElement) {
return false;
}
@Override
public TypeInformation<TradeEventBase> getProducedType() {
return TypeInformation.of(TradeEventBase.class);
}
}
what Ive learned I tried to find the problem but dont see it I try with chatgpt 4, and also couldnt I dont see nothing wrong on the logs but the job dosent work even create the topic.
package org.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<Object> source = KafkaSource.<Object>builder()
.setBootstrapServers("redpanda-0:9092")
.setTopics("data")
.setGroupId("")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new TradeEventDeserializationSchema())
.build();
DataStream<Object> trades = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Define Kafka producer properties
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "redpanda-0:9092");
KafkaSink<Object> sink = KafkaSink.<Object>builder()
.setBootstrapServers("redpanda-0:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("result1")
.setValueSerializationSchema(new TradeEventSerializationSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
trades.sinkTo(sink);
env.execute("Kafka Trade Consumer-Producer");
}
}
[1]: https://i.stack.imgur.com/LWykl.png