I have a PHP script which accepts a long-running HTTP/2 connection and processes pieces of data as they are streamed from the client. The amount of data streamed from client to server until the connection is closed is expected to be in realm of 5 GB.
After each piece of data, the server responds with a JSON, containing the related information.
The requirements are as follows:
- The PHP script cannot be a standalone application (i .e. a separate PHP server with WebSocket support is not an option hence the HTTP/2 attempt)
- This must happen on a single connection and can't be split up into multiple requests (for reasons I am not going to disclose now)
- The data being streamed is binary.
I have tried using Ktor, OkHttp and the Java 11 HTTP Client but the problem I get is that the response body can't be used until the request has finished. The important aspect here is that it is bidirectional and interleaved. The RFC for HTTP/2 says that this behavior is supported.
The idea is that some thread writes data into an OutputStream, which is fed into the HTTP client using a pipe. Same thread uses the InputStream from the connection to receive the response. From what I understood, the second part never happens because the HTTP client is waiting for the OutputStream to be closed.
Here is my current attempt at OkHttp:
val request = Request.Builder()
.url("/media".customApiUrl)
.post(object : RequestBody() {
override fun contentType(): MediaType = "application/octet-stream".toMediaType()
override fun writeTo(sink: BufferedSink) {
sink.writeAll(pipeIn.source())
}
})
.build()
OkHttpClient().newBuilder()
.protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE))
.build()
.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
TODO("Not yet implemented")
}
override fun onResponse(call: Call, response: Response) {
// this is never called
logger.debug("Media session executing")
}
})
This was my attempt at Ktor:
logger.debug("Making HTTP call")
httpClient.preparePut("/media".customApiUrl) {
contentType(ContentType("application", "octet-stream"))
setBody(pipeIn)
logger.debug("Media session ready to execute")
}.execute {
// This is never called (checked using breakpoint)
logger.debug("Media session executing")
}
// this is also never called but that's fine
logger.info("Media session closed")
This is how the streams are created:
val pipeOut = PipedOutputStream()
val pipeIn = PipedInputStream().also { it.connect(pipeOut) }
How pipeOut is used and where it is stored has been omitted for brevity as it is irrelevant. The debug log statement should be called, even if no byte has been transmitted yet in either direction so that the InputStream from the response can be used.
I would accept any Java or Kotlin response that runs on the JVM and supports HTTP/2. The InputStream and OutputStream of the connection should be readable and writable, respectively, at any time, even if not all of the bytes have been transmitted to the server yet.
Edit: I am also debugging the PHP script at the moment. A suggestion for a solution of above mentioned scenario or correction of my code would be appreciated.
[Disclaimer, I work on the Jetty Project.]
This is possible with Jetty's
HttpClient, exactly like you want: