Turn TCP socket to Observable of Array[Byte]

218 views Asked by At

In my Android app I need to use a Socket to send and receive arrays of bytes. For convenience sake I want to work with Observable connected to a Socket.

Looking on the internet I've found this code:

import rx.lang.scala.Observable

val s = Observable.using[Char,Socket](new Socket("10.0.2.2", 9002))(
  socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable),
  socket => Try(socket.close))
  .subscribeOn(rx.lang.scala.schedulers.IOScheduler.apply)

  val a = s.subscribe(println, println)

It works but outputs one character at a time, for example when sent a "hello there" string, the output is:

I/System.out: h
I/System.out: e
I/System.out: l
I/System.out: l
I/System.out: o
I/System.out:  
I/System.out: t
I/System.out: h
I/System.out: e
I/System.out: r
I/System.out: e

But I want to receive a buffered arrays of bytes in my subscription. How can I achieve that?

1

There are 1 answers

0
wtho On

As @SeanVieira already said, you first have to decide how to aggregate the stream elements, the characters. If you know the stream will be closed after each message, you can wait for the whole message to be received and then emit the sequence on onCompleted().

I think what you implemented so far is quite well, as it depends on the observer what and how it wants to process the characters.

You can then, depending on your needs, add a stream transformation, e. g.

  • a buffer, especially have a look at tumblingBuffer(boundary))
  • debounce with a buffer, as it is done in this SO answer

A solution using tumblingBuffer on your already created Observable could look like this (not tested):

 source.tumblingBuffer(source.filter(_ == '\n'))

where you buffer anything incoming from the source and emit the whole buffer once the boundary-observable source.filter(...) emits an element. Then you can transform the Sequence of characters to a string using mkString and subscribe to that Observable:

source.tumblingBuffer(source.filter(_ == '\n')).map(mkString(_))