Sending to Apache Camel Mina2 using Socket

1.3k views Asked by At

I am trying to use Apache Camel Mina as my socket server to receive byte streams. I am using Apache Camel 2.12.1 and this is my simple route:

<route id="retriever">
  <from uri="mina2:tcp://127.0.0.1:5555?sync=false" />
  <convertBodyTo type="java.lang.String" />
  <to uri="file:temp/out" />
</route>

I can perfectly start the route and send data using telnet. My problem comes when I am using a simple Java test client to send data:

byte[] myData = {0x34, 0x12, 0x25, 0x34};
Socket socket = new Socket("127.0.0.1", 5555);
OutputStream os = socket.getOutputStream();
os.write(myData, 0, myData.length);
os.flush();
socket.close(); 

When using this client I am not getting any exceptions anywhere but the data does not enter in the camel route. I have been trying to implement my own codec and I check that MINA is receiving the data, but I am not sure if for this simple case I need a special codec. I just want to retrieve the byte array and save it.

So my question is: what I am doing wrong? Why doesn't the default mina2 codec work for my scenario? Am I missing any special option in the mina endpoint to allow this?

Thanks!

1

There are 1 answers

1
Alkis Kalogeris On BEST ANSWER

If you mean TextLineCodecFactory then you should check the source code. The decoder of this codec uses a delimiter (new line of the OS the the framework is running).

TextLineCodecFactory , TextLineDecoder , LineDelimiter

Check the decoder. In particular check this part

226     private void decodeAuto(Context ctx, IoSession session, IoBuffer in, ProtocolDecoderOutput out)
227             throws CharacterCodingException, ProtocolDecoderException {
228         int matchCount = ctx.getMatchCount();
229 
230         // Try to find a match
231         int oldPos = in.position();
232         int oldLimit = in.limit();
233 
234         while (in.hasRemaining()) {
235             byte b = in.get();
236             boolean matched = false;
237 
238             switch (b) {
239             case '\r':
240                 // Might be Mac, but we don't auto-detect Mac EOL
241                 // to avoid confusion.
242                 matchCount++;
243                 break;
244 
245             case '\n':
246                 // UNIX
247                 matchCount++;
248                 matched = true;
249                 break;
250 
251             default:
252                 matchCount = 0;
253             }
254 
255             if (matched) {
256                 // Found a match.
257                 int pos = in.position();
258                 in.limit(pos);
259                 in.position(oldPos);
260 
261                 ctx.append(in);
262 
263                 in.limit(oldLimit);
264                 in.position(pos);
265 
266                 if (ctx.getOverflowPosition() == 0) {
267                     IoBuffer buf = ctx.getBuffer();
268                     buf.flip();
269                     buf.limit(buf.limit() - matchCount);
270 
271                     try {
272                         byte[] data = new byte[buf.limit()];
273                         buf.get(data);
274                         CharsetDecoder decoder = ctx.getDecoder();
275 
276                         CharBuffer buffer = decoder.decode(ByteBuffer.wrap(data));
277                         String str = new String(buffer.array());
278                         writeText(session, str, out);
279                     } finally {
280                         buf.clear();
281                     }
282                 } else {
283                     int overflowPosition = ctx.getOverflowPosition();
284                     ctx.reset();
285                     throw new RecoverableProtocolDecoderException("Line is too long: " + overflowPosition);
286                 }
287 
288                 oldPos = pos;
289                 matchCount = 0;
290             }
291         }
292 
293         // Put remainder to buf.
294         in.position(oldPos);
295         ctx.append(in);
296 
297         ctx.setMatchCount(matchCount);
298     }

and this part

180     public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
181         Context ctx = getContext(session);
182 
183         if (LineDelimiter.AUTO.equals(delimiter)) {
184             decodeAuto(ctx, session, in, out);
185         } else {
186             decodeNormal(ctx, session, in, out);
187         }
188     }

When it's filling the IoBuffer it uses a new line as the delimiter, so if you don't add one then it will continue to wait for it. Haven't tested this but I'm confident that that is the problem. Just try to send a string with a new line in the end. Convert it into bytes and see what happens.

If you want to transfer data you will have to use some kind of protocol that will set the the rules that the transmitter and the receiver will use in order to set and get the end of communication.