Creating a Thread for Socket Communication with Command Processing

50 views Asked by At

I'm working on a Java application that involves socket communication, and I'm trying to create a Thread subclass,calling it CommandThread, which reads commands from a Socket. The valid commands include "DELAY n" and "QUIT."

I have a few questions and concerns:

How can I improve the error handling in the processCommand method? Is there a better way to structure the code for reading and processing commands from the socket? Any general advice on best practices for handling sockets in a multi-threaded environment? I appreciate any insights, suggestions, or corrections you can provide! Thank you.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class CommandThread extends Thread {
    private final Socket clientSocket;
    private boolean isRunning;

    public CommandThread(Socket clientSocket) {
        this.clientSocket = clientSocket;
        this.isRunning = true;
    }

    @Override
    public void run() {
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);

            while (isRunning) {
                String command = reader.readLine();
                if (command != null) {
                    processCommand(command, writer);
                }
            }

            // Clean up resources
            reader.close();
            writer.close();
            clientSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void processCommand(String command, PrintWriter writer) {
        if (command.startsWith("DELAY ")) {
            try {
                int delayTime = Integer.parseInt(command.substring(6));
                Thread.sleep(delayTime);
                writer.println("OK");
            } catch (InterruptedException | NumberFormatException e) {
                writer.println("ERROR");
            }
        } else if (command.equals("QUIT")) {
            writer.println("BYE");
            isRunning = false;
        } else {
            writer.println("INVALID COMMAND");
        }
    }

    public static void main(String[] args) {
        // Usage example:
        // In your server code, when a new client connects, create a CommandThread for that client.
        // CommandThread commandThread = new CommandThread(clientSocket);
        // commandThread.start();
    }
}

1

There are 1 answers

1
Tianhong Yu On
  • How can I improve the error handling?

    use try-with-resources in your run method please.

  • Is there a better way to structure the code for reading and processing commands from the socket? Any general advice on best practices for handling sockets in a multi-threaded environment?

    Use thread pool instead of new thread, this allows for better reuse of server resources

public class CommandHandler implements Runnable {
    private final Socket clientSocket;
    private boolean isRunning;

    public CommandHandler(Socket clientSocket) {
        this.clientSocket = clientSocket;
        this.isRunning = true;
    }

    @Override
    public void run() {
        try (Socket socket = clientSocket;
             BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             PrintWriter writer = new PrintWriter(socket.getOutputStream(), true)) {

            while (isRunning) {
                String command = reader.readLine();
                if (command != null) {
                    String result = processCommand(command);
                    writer.println(result);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private String processCommand(String command) {
        if (command.startsWith("DELAY ")) {
            try {
                int delayTime = Integer.parseInt(command.substring(6));
                Thread.sleep(delayTime);
                return "OK";
            } catch (InterruptedException | NumberFormatException e) {
                return "ERROR";
            }
        } else if (command.equals("QUIT")) {
            isRunning = false;
            return "BYE";
        } else {
            return "INVALID COMMAND";
        }
    }

    public static void main(String[] args) {
        // Usage example:
        /*
        // global thread pool
        ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 40,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(500));

        ServerSocket serverSocket = new ServerSocket(8080);
        while (true) {
            Socket clientSocket = serverSocket.accept();
            CommandHandler commandHandler = new CommandHandler(clientSocket);
            executor.execute(commandHandler);
        }
        */
    }
}

Your code is written in BIO mode, use NIO or AIO for better performance.

Use Reactor pattern to write your code, you can learn more from Netty[https://netty.io/index.html].

Here is an simple example written in Java nio, and you can do better if you use Netty.

public class ServerSocketTest {

    /**
     * server ip
     */
    private final String HOST = "127.0.0.1";

    /**
     * server port
     */
    private final int PORT = 8080;

    @Test
    public void testServer() throws IOException {

        // create Selector
        Selector selector = Selector.open();

        // create ServerSocketChannel and listen port
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
        serverSocketChannel.configureBlocking(false);

        // register Channel
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // create buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        Charset charset = StandardCharsets.UTF_8;
        CharsetDecoder charsetDecoder = charset.newDecoder();
        CharBuffer charBuffer = CharBuffer.allocate(1024);

        while (true) {
            // block and wait
            if (selector.select() == 0) {
                continue;
            }

            // get ready Channel set
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // remove current key from set
                iterator.remove();


                // if accept event
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    // accept connection from client
                    SocketChannel clientChannel = server.accept();
                    clientChannel.configureBlocking(false);
                    // register with Selector,wait for connections
                    clientChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("Accepted connection from " + clientChannel);
                }

                // if read event
                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    buffer.clear();
                    // read data
                    StringBuilder stringBuilder = new StringBuilder();
                    while (clientChannel.read(buffer) != -1 || buffer.position() != 0) {
                        buffer.flip();
                        charsetDecoder.decode(buffer, charBuffer, true);
                        charBuffer.flip();
                        stringBuilder.append(charBuffer);
                        charBuffer.clear();
                        buffer.compact();
                    }

                    String message = stringBuilder.toString();
                    System.out.println("Received message from " + clientChannel + ": " + message);
                    // write data back
                    clientChannel.write(charset.encode("Echo: " + message));

                    // client close connection
                    clientChannel.close();
                }
            }
        }
    }


    @Test
    public void testClient() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(8192);
        Charset charset = StandardCharsets.UTF_8;
        CharsetDecoder charsetDecoder = charset.newDecoder();

        // connect server
        try (SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT))) {
            // send message to server
            byteBuffer.put(charset.encode("hello server"));
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            socketChannel.shutdownOutput();
            byteBuffer.clear();

            // receive response from server
            StringBuilder stringBuilder = new StringBuilder();
            while (socketChannel.read(byteBuffer) != -1 || byteBuffer.position() != 0) {
                byteBuffer.flip();
                // decode and store
                CharBuffer decode = charsetDecoder.decode(byteBuffer);
                stringBuilder.append(decode);
                byteBuffer.compact();
            }
            System.out.println("Received message from server " + socketChannel + ": " + stringBuilder);
        } catch (IOException e) {
            Assert.fail();
        }
    }

}