Server alternates between users instead of broadcasting

110 views Asked by At

I have been working on a message system where users type in a server IP/Port and that server then takes in messages and relays them to all other users on the server. The whole program was based of a echo server i rewrote from scratch, for every server.accept() socket it creates two Threads, one to receive messages and one to send them back. The two Threads are connected by a DatagramPacket system so if the server receives a message from one socket it sends it back to all other users because their Threads are listening for the same thing, this is where i am encountering problems; everything work fine except the fact that the user who receives the message alternates in order to time of log on.

Example of problem when two clients are connected:

Client #1 sends 10 messages:

0
1
2
3
4
5
6
7
8
9

The Server receives all of them.

Client #1 receives:

1
3
5
7
9

Client #2 receives:

0
2
4
6
8

Here is the code for the Client:

import java.io.*;
import java.util.*;
import java.net.*;
import java.awt.*;
import java.awt.event.*;
import javax.swing.*;

public class MessageClient {
    public static void main(String[] args) {
        System.out.println("Starting Message System...");
        Scanner in = new Scanner(System.in);
        MessageClient mc = new MessageClient();
        String input;
        System.out.println(":System Started, type help for help.");
        System.out.print(":");
        while (true) {
            input = in.nextLine();
            if (input.equalsIgnoreCase("HELP")) {
                mc.printHelp();
                System.out.print(":");
            } else if (input.equalsIgnoreCase("QUIT")) {
                System.exit(0);
            } else if (input.equalsIgnoreCase("CONNECT")) {
                mc.connect(in);
                in.nextLine();
                System.out.print(":");
            } else {
                System.out.print("No command found.\n:");
            }
        }
    }
    public static void printHelp() {
        System.out.println("help\tShow this prompt\nconnect\tStarts a new connection\nquit\tQuit the program\nexit\tExit a connection");
    }
    public void connect(Scanner in) {
        Socket soc = null;
        InetAddress addr = null;
        System.out.print("IP_ADDRESS/HOST:");
        String ip = in.nextLine();
        System.out.print("PORT:");
        int port = in.nextInt();
        try {
            System.out.println("Attempting to connect to HOST:\'" + ip + "\' on PORT:\'" + port + "\'");
            addr = InetAddress.getByName(ip);
            soc = new Socket(addr, port);
        } catch(Exception e) {
            System.out.println("Error connecting to server: " + e.getLocalizedMessage());
            return;
        }
        SwingUtilities.invokeLater(new MessageGUI(ip + ":" + port, soc));
    }
}

class MessageGUI implements Runnable {
    public MessageGUI(String windowName, Socket server) {
        JFrame window = new JFrame(windowName);
        window.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
        window.setSize(500, 300);
        window.setLayout(new BorderLayout());
        window.setVisible(true);

        MessageReceive mr = new MessageReceive(server);
        mr.setEditable(false);
        mr.setBackground(new Color(0, 0, 0));
        mr.setForeground(new Color(0, 255, 0));
        mr.setVisible(true);
        new Thread(mr).start();
        window.add(mr, BorderLayout.CENTER);

        DataOutputStream dos = null;
        try {
            dos = new DataOutputStream(server.getOutputStream());
        } catch(Exception e) {
            System.out.println("Error creating output stream to server: " + e.getLocalizedMessage());
        }

        JTextField input = new JTextField();
        input.addActionListener(new MessageSend(server, input, dos));
        input.setBackground(new Color(0, 0, 0));
        input.setForeground(new Color(0, 255, 0));
        window.add(input, BorderLayout.PAGE_END);

        System.out.println("Displaying connection.");
    }
    public void run() {}
}

class MessageReceive extends JTextArea implements Runnable {
    protected Socket server;
    public MessageReceive(Socket server) {
        this.server = server;
    }
    public void run() {
        DataInputStream dis = null;
        int bytes;
        try {
            dis = new DataInputStream(server.getInputStream());
        } catch(Exception e) {
            System.out.println("Error connecting server: " + e.getLocalizedMessage());
        }
        this.append("Connected.\n");
        while (true) {
            try {
                while ((bytes = dis.read()) != -1) this.append(String.valueOf((char) bytes));
            } catch(Exception e) {
                System.out.println("Error reading from server: " + e.getLocalizedMessage());
                return;
            }
        }
    }
}

class MessageSend implements ActionListener {
    protected Socket server;
    protected JTextField input;
    protected DataOutputStream dos = null;
    public MessageSend(Socket server, JTextField input, DataOutputStream dos) {
        this.server = server;
        this.input = input;
        this.dos = dos;
    }
    public void actionPerformed(ActionEvent ae) {
        try {
            dos.writeBytes(input.getText() + "\n");
            input.setText("");
        } catch(Exception e) {
            System.out.println("Error writing to server output stream: " + e.getLocalizedMessage());
        }
    }
}

Here is the code for the Server:

import java.io.*;
import java.net.*;
import java.util.*;

public class MessageServer {
    public static void main(String[] args) {
        int port = Integer.parseInt(args[0]);
        MessageServer ms = new MessageServer();
        System.out.println("Starting server on port " + port + "...");
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
        } catch(Exception e) {
            System.out.println("Error creating server: " + e.getLocalizedMessage());
            System.exit(0);
        }
        System.out.println("Created server port, now waiting for users...");
        Socket client = null;
        DatagramSocket ds = null;
        try {
            ds = new DatagramSocket(4);
        } catch(Exception e) {
            System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
            e.printStackTrace();
            System.exit(0);
        }
        while (true) {
            try {
                client = ss.accept();
                System.out.println("Connecting user: " + client.getInetAddress().toString());
            } catch(Exception e) {
                System.out.println("Error on server: " + e.getLocalizedMessage());
            }
            new MessageConnectionIn(client, ds).start();
            new MessageConnectionOut(client, ds).start();
        }
    }
}

class MessageConnectionOut extends Thread {
    protected Socket client;
    public DatagramSocket ds;
    public MessageConnectionOut(Socket client, DatagramSocket ds) {
        this.client = client;
        this.ds = ds;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":OUT");
        try {
            System.out.println("OUT:User connected.");
            DataOutputStream dos = new DataOutputStream(client.getOutputStream());
            while (true) {
                byte[] outgoing = new byte[4096];
                DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
                ds.receive(dp);
                dos.writeChars(new String(outgoing) + "\n");
            }
        } catch(Exception e) {
            System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

class MessageConnectionIn extends Thread {
    protected Socket client;
    public DatagramSocket ds;
    public MessageConnectionIn(Socket client, DatagramSocket ds) {
        this.client = client;
        this.ds = ds;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":IN");
        try {
            System.out.println("IN:User connected.");
            BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            while (true) {
                String lineIn = br.readLine();
                byte[] input = lineIn.getBytes();
                System.out.println(lineIn);
                byte[] output = new byte[4096];
                for (int c = 0; c < output.length; c++) output[c] = 0x0;
                for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
                DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
                ds.send(dp);
            }
        } catch(Exception e) {
            System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

UPDATE:

I tried replacing all the DatagramSockets with MulticastSockets and adding it to a group when I declared it, MessageServer.main(). The same problem occurred.

Multicast code:

public class MessageServer {
    public static void main(String[] args) {
        int port = Integer.parseInt(args[0]);
        MessageServer msgsrv = new MessageServer();
        System.out.println("Starting server on port " + port + "...");
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
        } catch(Exception e) {
            System.out.println("Error creating server: " + e.getLocalizedMessage());
            System.exit(0);
        }
        System.out.println("Created server port, now waiting for users...");
        Socket client = null;
        MulticastSocket ms = null;
        try {
            ms = new MulticastSocket(4);
            ms.joinGroup(InetAddress.getByName("225.65.65.65"));
        } catch(Exception e) {
            System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
            e.printStackTrace();
            System.exit(0);
        }
        while (true) {
            try {
                client = ss.accept();
                System.out.println("Connecting user: " + client.getInetAddress().toString());
            } catch(Exception e) {
                System.out.println("Error on server: " + e.getLocalizedMessage());
            }
            new MessageConnectionIn(client, ms).start();
            new MessageConnectionOut(client, ms).start();
        }
    }
}

class MessageConnectionOut extends Thread {
    protected Socket client;
    public MulticastSocket ms;
    public MessageConnectionOut(Socket client, MulticastSocket ms) {
        this.client = client;
        this.ms = ms;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":OUT");
        try {
            System.out.println("OUT:User connected.");
            DataOutputStream dos = new DataOutputStream(client.getOutputStream());
            while (true) {
                byte[] outgoing = new byte[4096];
                DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
                ms.receive(dp);
                dos.writeChars(new String(outgoing) + "\n");
                System.out.println("SENT_TO:" + this.getName());
            }
        } catch(Exception e) {
            System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

class MessageConnectionIn extends Thread {
    protected Socket client;
    public MulticastSocket ms;
    public MessageConnectionIn(Socket client, MulticastSocket ms) {
        this.client = client;
        this.ms = ms;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":IN");
        try {
            System.out.println("IN:User connected.");
            BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            while (true) {
                String lineIn = br.readLine();
                byte[] input = lineIn.getBytes();
                System.out.println(lineIn);
                byte[] output = new byte[4096];
                for (int c = 0; c < output.length; c++) output[c] = 0x0;
                for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
                DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
                ms.send(dp);
            }
        } catch(Exception e) {
            System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}
1

There are 1 answers

0
bobs_007 On BEST ANSWER

This sample may be help you out.

There is 2 threads for Server.

  1. One for reading UDP messages. I used 2 different ports since I just want to avoid messages read by same process. I don't have 2 machines to test it. Tested on my local host.
  2. Another thread will broadcast UDP messages received by reader thread.

There is a thread safe list which acts between threads as data sync. Received data added to the list. Broadcaster thread polling the list for the data, if there is any broadcast and else sleep for 500 microseconds. Threads are created using executor.

private final static String INET_ADDR = "224.0.0.3";
private final static int PORT1 = 8888;
private final static int PORT2 = 8889;
private static List<String> threadSafeList = null;

public static void main(String[] args) throws UnknownHostException, InterruptedException {
    threadSafeList = new CopyOnWriteArrayList<String>();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    executorService.submit(new Sender(InetAddress.getByName(INET_ADDR), PORT1));
    executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR), PORT2));
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;

    public Receiver (InetAddress inetAddress, int port) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver " + this.port);
        byte[] buf = new byte[256];

        try {
            MulticastSocket clientSocket = new MulticastSocket(this.port);
            //Joint the Multicast group.
            clientSocket.joinGroup(this.addr);

            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf, buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
                threadSafeList.add(msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;

    public Sender (InetAddress inetAddress, int port) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Sender Address " + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        // Open a new DatagramSocket, which will be used to send the data.
        while (true) {
            try (DatagramSocket serverSocket = new DatagramSocket()) {
                for (Iterator<String> it = threadSafeList.iterator(); !threadSafeList.isEmpty() && it.hasNext(); ) {

                    String i = it.next();
                    String msg = "Sent message no " + i;

                    // Create a packet that will contain the data
                    // (in the form of bytes) and send it.
                    DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(), msg.getBytes().length, this.addr, this.port);
                    serverSocket.send(msgPacket);

                    threadSafeList.remove(i);
                    System.out.println("Server sent packet with msg: " + msg);
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            try {
                System.out.println("going for sleep"); 
                Thread.currentThread().sleep(500);
                System.out.println("going for sleeping"); 
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

Design can be modified by changing the creation of sender thread. Whenever receiver thread gets a message, create a sender thread and do the broadcast and shutdown that thread. You can use reusable thread pool instead of fixed Thread pool what used in this example. And you can pass message as argument while you create sender thread (so list may not be needed at all) and do the submit. I do have code.

    public static void main(String[] args) throws UnknownHostException,
        InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR),
            PORT2, executorService));
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

And inner classes,

    private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;
    private ExecutorService executorService;

    public Receiver(InetAddress inetAddress, int port,
            ExecutorService executorService) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
        this.executorService = executorService;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver " + this.port);
        byte[] buf = new byte[256];

        try {
            MulticastSocket clientSocket = new MulticastSocket(this.port);
            // Joint the Multicast group.
            clientSocket.joinGroup(this.addr);

            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf,
                        buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
                executorService.submit(new Sender(InetAddress
                        .getByName(INET_ADDR), PORT1, msg));
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

And

    private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;
    private String message;

    public Sender(InetAddress inetAddress, int port, String message)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
        this.message = message;
    }

    public void run() {
        System.out.println(" @ Sender Address "
                + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        try {
            DatagramSocket serverSocket = new DatagramSocket();
            String msg = "Sent message no " + message;

            // Create a packet that will contain the data
            // (in the form of bytes) and send it.
            DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(),
                    msg.getBytes().length, this.addr, this.port);
            serverSocket.send(msgPacket);

            System.out.println("Server sent packet with msg: " + msg);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

Client has got 2 threads,

  1. One for reading broadcaster messages.
  2. Another for sending 5 messages in loop. Once it is finished, thread will shut-down.

No data exchange here, so no thread safe list.

    private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;

    public Receiver(InetAddress inetAddress, int port)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver port " + this.port);
        byte[] buf = new byte[256];

        try (MulticastSocket clientSocket = new MulticastSocket(this.port)) {
            // Joint the Multicast group.
            clientSocket.joinGroup(this.addr);
            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf,
                        buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

And

    private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;

    public Sender(InetAddress inetAddress, int port)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Sender Address "
                + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        // Open a new DatagramSocket, which will be used to send the data.
        try {
            DatagramSocket serverSocket = new DatagramSocket();

            for (int i = 0; i < 5; i++) {

                System.out.println("inside loop");
                String msg = "Sent message no 2" + i;

                // Create a packet that will contain the data
                // (in the form of bytes) and send it.
                DatagramPacket msgPacket = new DatagramPacket(
                        msg.getBytes(), msg.getBytes().length, this.addr,
                        this.port);
                System.out.println("Before sending to socket");
                serverSocket.send(msgPacket);

                System.out.println("Server sent packet with msg: " + msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

This article sample code is extended further.

Code to be fine tuned.