I am trying to connect TCP server, whose code I can't control, via a simple asynchronous client (socket needs to remain open).
The communication is via byte-arrays and I have some custom functions to create and parse the various byte-arrays to send to this server. So I don't require any encoding or decoding via gloss. But I do need a non blocking but connected request-response mechanism.
Aleph looks great but I'm stuck trying to implement the TCP client for my usecase. So far I have the following:
(:require [manifold.deferred :as d]
[manifold.stream :as s]
[aleph.tcp :as tcp])
(def aleph-client (atom nil))
;store my initial connection into the atom
(reset! aleph-client @(tcp/client {:host (:HOST CONFIG) :port (:PORT CONFIG)}))
(defn msg!
"A very simple request response method"
[ bytebuf ]
(d/let-flow [status (s/put! @aleph-client (.array bytebuf))
reply @(s/take! @aleph-client)]
reply))
In the above code, I'm able to send the correct and expected byte-array to the server and I receive the response too. However I'm unable to take and format the message correctly. I always see the following :
(msg! my-bytes)
=> #object["[B" 0x7ad08644 "[B@7ad08644"]
;I seem to be getting a byte-buffer somehow but the byte array length
;is simply wrong for longer messages.
(alength (byte-buffer (msg! my-bytes)))
=>39
And I have no idea what to do next here. In particular:
- How to receive the bytebuffer response?
- Is there an elegant way to use the wrap-duplex-stream my my own custom bytebuffer creation and decoding ?
- How to connect the put and the take properly ?
- What is this mysterious response when I do the put and take separately ?
- The bytebuffer I received at the end is indeed the response I expected. At least a part of it. Some of the message is lost somewhere and I can see it upon a successive take! call. Why is this ?
- How to use buffers to put in and take larger messages ?
Would appreciate any help from Aleph and Manifold gurus.