i use the net server of node js and use the socket.on('data') function to receive data. To parse the TCP messages I use the parse buffer method. This uses the first 4 bytes as the length of the TCP message so that I can read from the TCP stream and form individual commands.In summary what happens at high load is that there is some garbage data returned as part of the TCP stream which causes problems.
function onConnect(client) {
var accumulatingBuffer = new Buffer(0);
var totalPacketLen = -1;
var accumulatingLen = 0;
var recvedThisTimeLen = 0;
client.on('data', function (data) {
parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen);
});
}
and here is the parsebuffer method.
function parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen) {
recvedThisTimeLen = Buffer.byteLength(data);
var tmpBuffer = new Buffer(accumulatingLen + recvedThisTimeLen);
accumulatingBuffer.copy(tmpBuffer);
data.copy(tmpBuffer, accumulatingLen); // offset for accumulating
accumulatingBuffer = tmpBuffer;
tmpBuffer = null;
accumulatingLen = accumulatingLen + recvedThisTimeLen;
if (accumulatingLen < PACKETHEADERLEN) {
return;
} else if (accumulatingLen === PACKETHEADERLEN) {
packetHeaderLen
return;
} else {
//a packet info is available..
if (totalPacketLen < 0) {
totalPacketLen = accumulatingBuffer.readUInt32BE(0);
}
}
while (accumulatingLen >= totalPacketLen + PACKETHEADERLEN) {
var aPacketBufExceptHeader = new Buffer(totalPacketLen); // a whole packet is available...
accumulatingBuffer.copy(aPacketBufExceptHeader, 0, PACKETHEADERLEN, PACKETHEADERLEN + totalPacketLen);
////////////////////////////////////////////////////////////////////
//process packet data
var stringData = aPacketBufExceptHeader.toString();
try {
var JSONObject = JSON.parse(stringData);
handler(client, JSONObject);
var newBufRebuild = new Buffer(accumulatingBuffer.length - (totalPacketLen + PACKETHEADERLEN)); // we can reduce size of allocatin
accumulatingBuffer.copy(newBufRebuild, 0, totalPacketLen + PACKETHEADERLEN, accumulatingBuffer.length);
//init
accumulatingLen = accumulatingLen - (totalPacketLen + PACKETHEADERLEN); //totalPacketLen+4
accumulatingBuffer = newBufRebuild;
newBufRebuild = null;
totalPacketLen = -1;
//For a case in which multiple packets are transmitted at once.
if (accumulatingLen <= PACKETHEADERLEN) {
//need to get more data -> wait..
return;
} else {
totalPacketLen = accumulatingBuffer.readUInt32BE(0);
}
} catch (ex) {
console.log(ex + ' unable to process data');
return;
}
}
}
All is well until there is high simulated load using a bunch of clients sending messages fast. At that point of time inside the ParseBuffer method the first line "data.length"returns more than the length of the TCP data. This leads into the code reading garbage as UInt32BE which cause a very high value in totalpacketlength(which tells the next packets length). This leads to lost messages. Am I missing something. Please help.
When you do this in your
parseBuffer()
function:this is just assigning
tmpBuffer
to the function argument namedaccumulatingBuffer
. It is NOT changing theaccumulatingBuffer
variable in youronConnect()
method. As such, when you get a partial buffer, you lose the accumulated part. The same issue is true of the other arguments you are passing toparseBuffer()
. Assigning to them insideparseBuffer()
is not changing the variables of the same name inonConnect()
.There are probably simpler ways to write this, but the easiest way to keep with your same structure is to not pass individual variables, but to pass a single object that has those variables as properties on the object. Then, when you assign to the properties, you can get to those new values from within
onConnect()
.The general structure would look like this:
And, then make the corresponding changes in
parseBuffer()
to access the arguments as properties on theargs
object. Since objects are passed by pointer, when you assign to properties on the args object from within parseBuffer, those will be visible inargs
object in theonConnect
method.FYI, I did not follow the entire logic elsewhere in the function so there could be other errors too. This code seems quite complex with a lot of buffer copies for what the fairly common tasks that it is trying to do. It's also the kind of code that has probably been written many times before and probably even exists in some pre-built libraries.