Problems with sink on flink

244 views Asked by At

[enter image description here][1] I could do this to try first, this works well this just print the data on another topic I trie to follow the same logic but don get the result

package org.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Properties;

public class DataStreamJob {

        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            KafkaSource<TradeEventBase> source = KafkaSource.<TradeEventBase>builder()
                    .setBootstrapServers("redpanda-0:9092")
                    .setTopics("data")
                    .setGroupId("")
                    .setStartingOffsets(OffsetsInitializer.latest())
                    .setValueOnlyDeserializer(new TradeEventDeserializationSchema())
                    .build();

            DataStream<TradeEventBase> trades = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

            // Keying by symbol and then applying a 2-second time window
            DataStream<Tuple2<String, Long>> counts = trades
                    .keyBy(event -> event.getSymbol())  // Assuming you have a getSymbol() method in TradeEventBase
                    .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                    .aggregate(new CountAggregator());

            // Define Kafka producer properties
            Properties producerProps = new Properties();
            producerProps.put("bootstrap.servers", "redpanda-0:9092");

            KafkaSink<Tuple2<String, Long>> sink = KafkaSink.<Tuple2<String, Long>>builder()
                    .setBootstrapServers("redpanda-0:9092")
                    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                            .setTopic("resultado")
                            .setValueSerializationSchema(new Tuple2SerializationSchema())
                            .build()
                    )
                    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .build();

            counts.sinkTo(sink);

            env.execute("Kafka Trade Consumer-Producer");
        }
    }

this is an example of what it consume from kafka

{
"e":"aggTrade"
"E":1692050397253
"a":1824780414
"s":"BTCUSDT"
"p":"29390.30"
"q":"0.001"
"f":4007152464
"l":4007152464
"T":1692050397097
"m":false
} 

this is the deserialization class

public class TradeEventDeserializationSchema implements DeserializationSchema<TradeEventBase> {
    private transient ObjectMapper objectMapper;

    @Override
    public TradeEventBase deserialize(byte[] message) throws IOException {
        if (objectMapper == null) {
            objectMapper = new ObjectMapper();
        }

        String msg = new String(message);

        // Check the event type in the message and deserialize accordingly
        if (msg.contains("\"e\":\"aggTrade\"")) {
            return objectMapper.readValue(msg, AggTradeEvent.class);
        } else if (msg.contains("\"e\":\"trade\"")) {
            return objectMapper.readValue(msg, TradeEvent.class);
        } else {
            throw new RuntimeException("Unknown event type in message: " + msg);
        }
    }

    @Override
    public boolean isEndOfStream(TradeEventBase nextElement) {
        return false;
    }

    @Override
    public TypeInformation<TradeEventBase> getProducedType() {
        return TypeInformation.of(TradeEventBase.class);
    }
}

what Ive learned I tried to find the problem but dont see it I try with chatgpt 4, and also couldnt I dont see nothing wrong on the logs but the job dosent work even create the topic.

package org.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class DataStreamJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<Object> source = KafkaSource.<Object>builder()
                .setBootstrapServers("redpanda-0:9092")
                .setTopics("data")
                .setGroupId("")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new TradeEventDeserializationSchema())
                .build();

        DataStream<Object> trades = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Define Kafka producer properties
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "redpanda-0:9092");

        KafkaSink<Object> sink = KafkaSink.<Object>builder()
                .setBootstrapServers("redpanda-0:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("result1")
                        .setValueSerializationSchema(new TradeEventSerializationSchema())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        trades.sinkTo(sink);

        env.execute("Kafka Trade Consumer-Producer");
    }
}


  [1]: https://i.stack.imgur.com/LWykl.png
0

There are 0 answers