I read here that EntityUtils.consume(httpEntity) will result in releasing the connection back to the connection pool, but when I looked at the source code, I couldn't understand how is that happening. Can someone please point me to the part of the code where EntityUtils.consume(httpEntity) or EntityUtils.toString(httpEntity) is releasing the connection when using the low level Elastic Search Rest Client ?

What happens to the connection if there is a SocketTimeoutException and I don't consume the HttpEntity?

1

There are 1 answers

58
aran On

Client-side close and connection release to the Pool (steps)

  1. EntityUtils.consume & EntityUtils.toString > the first one will close() the instream if it fully consumes the entity. The second one will always call instream.close() in its finally clause. instream is the name given to the InputStream variable.

  2. instream.close() > For this example, the implementation of the InputStream is a ContentInputStream. The close() method forces the ContentInputStream to be read till its end by the loop mechanism shown in the code snippet.

    Following calls to this stream will lead to an EOF exception.

    @Override
    public void close() throws IOException 
    {
      final byte tmp[] = new byte[1024];
      /*loop until read() is -1, which means, advance the buffer till is end*/
      while (this.buffer.read(tmp, 0, tmp.length) >= 0) {}
      super.close();
    }
    
  3. Pool > Checks all pooled resources status. This operation may be triggered by some actions (as a new request), or may be managed by underlying threads. If one resource/stream was closed by the other end, it will get an EOF exception (as the buffer was forced to advance to the end). The spot is marked as invalid.

  4. Pool > All invalid spots are recycled. It will remove the closed streams and create new ones, or restore the existing ones without the need of erase+create (depending on the resource type). This means the spot that was holding the stream is avaliable again, with a new stream ready to be used:

    The connection is released back to the pool. The other end is not using it anymore, so the pool has total control of it. Now the pool is allowed to erase it, restore it, and assign it to another requester..


Example

Let's imagine a Pool that manages 3 resources, such as HttpConnections. You already have 3 threads using this pool, so the spots are all occupied.

Meanwhile ThreadZ waits for a connection to be released back to the Pool

 (spot1) [HttpConn1] -- ThreadA
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

ThreadA finished its job and closes its connection. The Pool will notice this when the status of the PoolEntry is closed. Different PoolEntry implementations will check this is different ways, one of them being getting an EOF exception while trying to read from a stream. Other implementations could have different mechanisms to check if the resource is closed. If the PoolEntry tells that his resource is closed/invalid, the Pool will recycle this spot. There are two options here:

a) Erase and create.

 (spot1) [HttpConn4] // the pool removes the old one and creates a new connection
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

b) Restore.

 (spot1) [HttpConn1] // the pool is able to "reset" the existing resource
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

"Releasing the connection back" could be translated to "now there's an avaliable spot/resource again". The pool can now give a connection to ThreadZ:

 (spot1) [HttpConn1] -- ThreadZ
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

consume/toString - connection release

All explained above means that calling close() in the InputStream will trigger the connection release.

This happens both in consume (if the entity content is fully consumed) and toString methods:

public static void consume(final HttpEntity entity) throws IOException 
{ 
    if (entity == null) 
        return; 
   
    if (entity.isStreaming()) 
    { 
        InputStream instream = entity.getContent(); 
        if (instream != null) 
            instream.close();   // <-- connection release
    } 
} 

public static String toString(final HttpEntity entity, final Charset defaultCharset) 
                              throws IOException, ParseException 
{ 
    Args.notNull(entity, "Entity"); 
    InputStream instream = entity.getContent(); 
    if (instream == null) { 
        return null; 
    } 
    try { 
        Args.check(entity.getContentLength() <= Integer.MAX_VALUE,  
                "HTTP entity too large to be buffered in memory"); 
        int i = (int)entity.getContentLength(); 
        if (i < 0) { 
            i = 4096; 
        } 
        Charset charset = null; 
        try { 
            ContentType contentType = ContentType.getOrDefault(entity); 
            charset = contentType.getCharset(); 
        } catch (UnsupportedCharsetException ex) { 
            throw new UnsupportedEncodingException(ex.getMessage()); 
        } 
        if (charset == null) { 
            charset = defaultCharset; 
        } 
        if (charset == null) { 
            charset = HTTP.DEF_CONTENT_CHARSET; 
        } 
        Reader reader = new InputStreamReader(instream, charset); 
        CharArrayBuffer buffer = new CharArrayBuffer(i); 
        char[] tmp = new char[1024]; 
        int l; 
        while((l = reader.read(tmp)) != -1) { 
            buffer.append(tmp, 0, l); 
        } 
        return buffer.toString(); 
    } finally { 
        instream.close();     // <--- connection release
    } 
} 

What happens to the connection if there is a SocketTimeoutException and I don't consume the HttpEntity?

As you notice, both methods throw an IOException, and SocketTimeoutException is inherited from it. It is responsability of the caller to catch this exception and manage to close all resources if such scenario happens. For example:

void tryConsume()
{
   try 
   {
     //...
      EntityUtils.consume(httpEntity);
     //...
   }
   catch (IOException)
   {
     //SocketTimeoutException happened. Log the error,etc
     // (Close resources here...)
   }
   finally
   {
     //...Or maybe include a finally clause and close them here, if you wish 
     // them to be closed regardless of success/failure.
     if (httpEntity!=null)
     {
        InputStream instream = httpEntity.getContent(); 
        if (instream != null) 
            instream.close();   /* <-- connection release. when checking this 
                                 spot, the pool will get (f.e) an EOF 
                                 exception. This will lead to replacing this 
                                 resource with a fresh new connection and 
                                 setting the spot status as avaliable. */
      }
   }
}

Notice that if a SocketTimeoutException is thrown, specific PoolEntry implementations could also check if the resource is invalid without the need of a close() call. Using close() guarantees that the Pool will recycle the spot after a correct use of it, and can be used as well as a "invalid marker" when you are able to catch a thrown exception.

But specific Pool implementations will be able to also check if a resource is invalid even if an uncatched Exception didn't let you specifically call close(), as they would be able to check the status with different mechanisms. For example, checking how many time a connection was in IDLE state. If this time is superior to a certain treshold marked by the Pool, this spot will be recycled without the need of a previous close() call from the client.

This time the Pool will be the end that calls close() on it, avoiding a possible deadlock in the client side, if this one doesn't manage the max connection time or certain exceptions.