More conccurent users of Netty and BoneCP / Basic Socket Server

608 views Asked by At

Disclaimer- I am not a Java programmer. Odds are I'll need to do my homework on any advice given, but I will gladly do so :)

That said, I wrote a complete database-backed socket server which is working just fine for my small tests, and now I'm getting ready for initial release. Since I do not know Java/Netty/BoneCP well- I have no idea if I made a fundamental mistake somewhere that will hurt my server before it even gets out the door.

For example, I have no idea what an executor group does exactly and what number I should use. Whether it's okay to implement BoneCP as a singleton, is it really necessary to have all those try/catch's for each database query? etc.

I've tried to reduce my entire server to a basic example which operates the same way as the real thing (I stripped this all in text, did not test in java itself, so excuse any syntax errors due to that).

The basic idea is that clients can connect, exchange messages with the server, disconnect other clients, and stay connected indefinitely until they choose or are forced to disconnect. (the client will send ping messages every minute to keep the connection alive)

The only major difference, besides untesting this example, is how the clientID is set (safely assume it is truly unique per connected client) and that there is some more business logic in checking of values etc.

Bottom line- can anything be done to improve this so it can handle the most concurrent users possible? Thanks!


//MAIN
public class MainServer {
    public static void main(String[] args) {
        EdgeController edgeController = new EdgeController();
        edgeController.connect();
    }
}


//EdgeController
public class EdgeController {

    public void connect() throws Exception {
        ServerBootstrap b = new ServerBootstrap();
        ChannelFuture f;


        try {
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .localAddress(9100)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new EdgeInitializer(new DefaultEventExecutorGroup(10)));


            // Start the server.
            f = b.bind().sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

        } finally { //Not quite sure how to get here yet... but no matter
            // Shut down all event loops to terminate all threads.
            b.shutdown();

        }
    }
}

//EdgeInitializer
public class EdgeInitializer  extends ChannelInitializer<SocketChannel> {
    private EventExecutorGroup executorGroup;

    public EdgeInitializer(EventExecutorGroup _executorGroup) {
        executorGroup = _executorGroup;
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("idleStateHandler", new IdleStateHandler(200,0,0));
        pipeline.addLast("idleStateEventHandler", new EdgeIdleHandler());
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.nulDelimiter()));
        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(this.executorGroup, "handler", new EdgeHandler());
    }    
}

//EdgeIdleHandler
public class EdgeIdleHandler extends ChannelHandlerAdapter {
    private static final Logger logger = Logger.getLogger( EdgeIdleHandler.class.getName());


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
        if(evt instanceof IdleStateEvent) {
            ctx.close();
        }
    }

     private void trace(String msg) {
        logger.log(Level.INFO, msg);
    }

}

//DBController
public enum DBController {
    INSTANCE;

    private BoneCP connectionPool = null;
    private BoneCPConfig connectionPoolConfig = null;

    public boolean setupPool() {
        boolean ret = true;

        try {
            Class.forName("com.mysql.jdbc.Driver");

            connectionPoolConfig = new BoneCPConfig();
            connectionPoolConfig.setJdbcUrl("jdbc:mysql://" + DB_HOST + ":" + DB_PORT + "/" + DB_NAME);
            connectionPoolConfig.setUsername(DB_USER);
            connectionPoolConfig.setPassword(DB_PASS);

            try {
                connectionPool = new BoneCP(connectionPoolConfig);
            } catch(SQLException ex) {
                ret = false;
            }

        } catch(ClassNotFoundException ex) {
            ret = false;
        }

        return(ret);
    }

    public Connection getConnection() {
        Connection ret;

        try {
            ret = connectionPool.getConnection();
        } catch(SQLException ex) {
            ret = null;
        }

        return(ret);
    }
}

//EdgeHandler
public class EdgeHandler extends ChannelInboundMessageHandlerAdapter<String> {

    private final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
    private long clientID;
    static final ChannelGroup channels = new DefaultChannelGroup();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Connection dbConnection = null;
        Statement statement = null;
        ResultSet resultSet = null;
        String query;
        Boolean okToPlay = false;


        //Check if status for ID #1 is true
        try {
            query = "SELECT `Status` FROM `ServerTable` WHERE `ID` = 1";

            dbConnection = DBController.INSTANCE.getConnection();
            statement = dbConnection.createStatement();
            resultSet = statement.executeQuery(query);

            if (resultSet.first()) {
                if (resultSet.getInt("Status") > 0) {
                    okToPlay = true;
                }
            }
        } catch (SQLException ex) {
            okToPlay = false;
        } finally {
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException logOrIgnore) {
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException logOrIgnore) {
                }
            }
            if (dbConnection != null) {
                try {
                    dbConnection.close();
                } catch (SQLException logOrIgnore) {
                }
            }
        }

        if (okToPlay) {
            //clientID = setClientID();
            sendCommand(ctx, "HELLO", "WORLD");
        } else {
            sendErrorAndClose(ctx, "CLOSED");
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        channels.remove(ctx.channel());
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, String request) throws Exception {
        // Generate and write a response.
        String[] segments_whitespace;
        String command, command_args;

        if (request.length() > 0) {

            segments_whitespace = request.split("\\s+");
            if (segments_whitespace.length > 1) {
                command = segments_whitespace[0];
                command_args = segments_whitespace[1];

                if (command.length() > 0 && command_args.length() > 0) {
                    switch (command) {
                        case "HOWDY":  processHowdy(ctx, command_args); break;
                        default:    break;
                    }
                }
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        TraceUtils.severe("Unexpected exception from downstream - " + cause.toString());
        ctx.close();
    }

    /*                                      */
    /*       STATES  - / CLIENT SETUP       */
    /*                                      */
    private void processHowdy(ChannelHandlerContext ctx, String howdyTo) {
        Connection dbConnection = null;
        Statement statement = null;
        ResultSet resultSet = null;
        String replyBack = null;

        try {
            dbConnection = DBController.INSTANCE.getConnection();
            statement = dbConnection.createStatement();
            resultSet = statement.executeQuery("SELECT `to` FROM `ServerTable` WHERE `To`='" + howdyTo + "'");

            if (resultSet.first()) {
                replyBack = "you!";
            }
        } catch (SQLException ex) {
        } finally {
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException logOrIgnore) {
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException logOrIgnore) {
                }
            }
            if (dbConnection != null) {
                try {
                    dbConnection.close();
                } catch (SQLException logOrIgnore) {
                }
            }
        }

        if (replyBack != null) {
            sendCommand(ctx, "HOWDY", replyBack);
        } else {
            sendErrorAndClose(ctx, "ERROR");
        }
    }

    private boolean closePeer(ChannelHandlerContext ctx, long peerClientID) {
        boolean success = false;
        ChannelFuture future;

        for (Channel c : channels) {
            if (c != ctx.channel()) {
                if (c.pipeline().get(EdgeHandler.class).receiveClose(c, peerClientID)) {
                    success = true;
                    break;
                }
            }
        }

        return (success);

    }

    public boolean receiveClose(Channel thisChannel, long remoteClientID) {
        ChannelFuture future;
        boolean didclose = false;
        long thisClientID = (clientID == null ? 0 : clientID);

        if (remoteClientID == thisClientID) {
            future = thisChannel.write("CLOSED BY PEER" + '\n');
            future.addListener(ChannelFutureListener.CLOSE);

            didclose = true;
        }

        return (didclose);
    }


    private ChannelFuture sendCommand(ChannelHandlerContext ctx, String cmd, String outgoingCommandArgs) {
        return (ctx.write(cmd + " " + outgoingCommandArgs + '\n'));
    }

    private ChannelFuture sendErrorAndClose(ChannelHandlerContext ctx, String error_args) {

        ChannelFuture future = sendCommand(ctx, "ERROR", error_args);

        future.addListener(ChannelFutureListener.CLOSE);

        return (future);
    }
}
1

There are 1 answers

3
meka On

When a network message arrive at server, it will be decoded and will release a messageReceived event.

If you look at your pipeline, last added thing to pipeline is executor. Because of that executor will receive what has been decoded and will release the messageReceived event.

Executors are processor of events, server will tell which events happening through them. So how executors are being used is an important subject. If there is only one executor and because of that, all clients using this same executor, there will be a queue for usage of this same executor.

When there are many executors, processing time of events will decrease, because there will not be any waiting for free executors.

In your code

new DefaultEventExecutorGroup(10)

means this ServerBootstrap will use only 10 executors at all its lifetime.

While initializing new channels, same executor group being used:

pipeline.addLast(this.executorGroup, "handler", new EdgeHandler());

So each new client channel will use same executor group (10 executor threads).

That is efficient and enough if 10 threads are able to process incoming events properly. But if we can see messages are being decoded/encoded but not handled as events quickly, that means there is need to increase amount of them.

We can increase number of executors from 10 to 100 like that:

new DefaultEventExecutorGroup(100)

So that will process event queue faster if there is enough CPU power.

What should not be done is creating new executor for each new channel:

pipeline.addLast(new DefaultEventExecutorGroup(10), "handler", new EdgeHandler());

Above line is creating a new executor group for each new channel, that will slow down things greatly, for example if there are 3000 clients, there will be 3000 executorgroups(threads). That is removing main advantage of NIO, ability to use with low thread amounts.

Instead of creating 1 executor for each channel, we can create 3000 executors at startup and at least they will not be deleted and created each time a client connects/disconnects.

.childHandler(new EdgeInitializer(new DefaultEventExecutorGroup(3000)));

Above line is more acceptable than creating 1 executor for each client, because all clients are connected to same ExecutorGroup and when a client disconnects Executors still there even if client data is removed.

If we must speak about database requests, some database queries can take long time to being completed, so if there are 10 executorss and there are 10 jobs being processed, 11th job will have to wait until one of others complete. This is a bottleneck if server receiving more than 10 very time consuming database job at the same time. Increasing count of executors will solve bottleneck to some degree.