In my Android app I need to use a Socket
to send and receive arrays of bytes. For convenience sake I want to work with Observable
connected to a Socket
.
Looking on the internet I've found this code:
import rx.lang.scala.Observable
val s = Observable.using[Char,Socket](new Socket("10.0.2.2", 9002))(
socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable),
socket => Try(socket.close))
.subscribeOn(rx.lang.scala.schedulers.IOScheduler.apply)
val a = s.subscribe(println, println)
It works but outputs one character at a time, for example when sent a "hello there" string, the output is:
I/System.out: h
I/System.out: e
I/System.out: l
I/System.out: l
I/System.out: o
I/System.out:
I/System.out: t
I/System.out: h
I/System.out: e
I/System.out: r
I/System.out: e
But I want to receive a buffered arrays of bytes in my subscription. How can I achieve that?
As @SeanVieira already said, you first have to decide how to aggregate the stream elements, the characters. If you know the stream will be closed after each message, you can wait for the whole message to be received and then emit the sequence on
onCompleted()
.I think what you implemented so far is quite well, as it depends on the observer what and how it wants to process the characters.
You can then, depending on your needs, add a stream transformation, e. g.
tumblingBuffer(boundary)
)A solution using
tumblingBuffer
on your already created Observable could look like this (not tested):where you buffer anything incoming from the source and emit the whole buffer once the boundary-observable
source.filter(...)
emits an element. Then you can transform the Sequence of characters to a string usingmkString
and subscribe to that Observable: