Spark Streaming : Custom Receiver : Data source : Websphere Message Queue

1.5k views Asked by At

I am trying to implement Customer receiver for WSMQ data source in Spark streaming. I followed the example provided here.

Later I followed example at this Github repository.

I am getting three issues:

1: Error (This error comes after the program runs for some time)

java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)
  1. The program doesn't remove the messages from WSMQ even though I used this code while creating the session

    MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
  2. I need to implement a reliable Receiver explained at Custom Receiver Spark API. It says:

    To implement a reliable receiver, you have to use store(multiple-records) to store data. This flavour of store is a blocking call which returns only after all the given records have been stored inside Spark. If the receiver’s configured storage level uses replication (enabled by default), then this call returns after replication has completed. Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the source appropriately. This ensures that no data is caused when the receiver fails in the middle of replicating data – the buffered data will not be acknowledged and hence will be later resent by the source.

I don't know what to do about store(multiple-records)?

I can't figure out why these errors are happening and how I can implement a reliable Receiver.

Here is the code:

public class JavaConnector extends Receiver<String> {

    String host = null;
    int port = -1;
    String qm=null;
    String qn=null;
    String channel=null;
    transient Gson gson=new Gson();
    transient MQQueueConnection qCon= null;
    String topic=null;

    Enumeration enumeration =null;
    private static MQQueueReceiver receiver = null;


    public JavaConnector(String host , int port, String qm, String channel, String qn) {
        super(StorageLevel.MEMORY_ONLY_2());
        this.host = host;
        this.port = port;
        this.qm=qm;
        this.qn=qn;
        this.channel=channel;


    }

    public void onStart()  {
        // Start the thread that receives data over a connection
        new Thread()  {
            @Override public void run() {
                try {
                    initConnection();
                    receive();
                }
                catch (JMSException ex)
                {
                    ex.printStackTrace();
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
            }
        }.start();
    }

    public void onStop() {

        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself isStopped() returns false

    }

    /** Create a MQ connection and receive data until receiver is stopped */
    private void receive() throws InterruptedException {
        System.out.print("Started receiving messages from MQ");


        try {

            JMSTextMessage receivedMessage= null;
            int cnt =0;

            //JMSTextMessage receivedMessage = (JMSTextMessage) receiver.receive(10000);

            boolean flag=false;
            while (!isStopped() && enumeration.hasMoreElements()&&cnt<50 )
            {

                receivedMessage= (JMSTextMessage) enumeration.nextElement();
                receivedMessage.acknowledge();
                String userInput = receivedMessage.getText();

                    ArrayList<String> list = new ArrayList<String>();
                    list.add(userInput);
                    Iterator<String> itr = list.iterator();
                    store(itr);
                cnt++;

            }
            /*while (!isStopped() && receivedMessage !=null)
            {

               // receivedMessage= (JMSTextMessage) enumeration.nextElement();
                String userInput = receivedMessage.getText();

                store(userInput);
        receivedMessage.acknowledge();

            }*/

            // Restart in an attempt to connect again when server is active again
            //restart("Trying to connect again");

            stop("No More Messages To read !");
            qCon.close();
            System.out.println("Queue Connection is Closed");

        }
        catch(Exception e)
        {      Thread.sleep(100);
            System.out.println("WRONG"+e.toString());
            e.printStackTrace();
            restart("Trying to connect again");
        }
        catch(Throwable t) {
            Thread.sleep(100);
            System.out.println("WRONG-1"+t.toString());
            // restart if there is any other error
            restart("Error receiving data", t);
        }



    }

    public void initConnection() throws JMSException,InterruptedException {
        try {
            MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
            conFactory.setHostName(host);
            conFactory.setPort(port);
            conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
            conFactory.setQueueManager(qm);
            conFactory.setChannel(channel);
            conFactory.setMsgBatchSize(100);


            qCon = (MQQueueConnection) conFactory.createQueueConnection();
            MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            MQQueue queue = (MQQueue) qSession.createQueue(qn);
            MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
            qCon.start();
            //receiver = (MQQueueReceiver) qSession.createReceiver(queue);
            enumeration= browser.getEnumeration();


        } catch (Exception e) {
            Thread.sleep(1000);
        }
    }

    @Override
    public StorageLevel storageLevel() {
        return StorageLevel.MEMORY_ONLY_2();
    }
1

There are 1 answers

0
Hadoop-worker On BEST ANSWER

Finally I was able to solve this. Solution 1: The steaming context tries to write into Kafka, since kafka was down and It was giving me IO error. That was foolish of me. :)

Solution 2: I was supposed to use MessageListener, QueueBrowser is used for reading the messages it doesn't consume the message actually.