Return a paginated Multi<> with reactive-pg-client

105 views Asked by At

I have a question. I have to implement a Quarkus service which is consuming REST requests with some filter settings, which are than used for requesting large dataset ( could be >10.000.000 entries) from TimescaleDB/PostgreSQL.

The REST request just has three fields (short class_id, OffsetDateTime time_min, OffsetDateTime time_max).

My Resource class looks the following:

@Path( "/export" )
public class DataExporterResource {

  @Inject
  io.vertx.mutiny.pgclient.PgPool client;

  @Inject
  RequestValidator helper;

  private static final Logger LOGGER = Logger.getLogger( DataExporterResource.class );

  @Path( "/test-data" )
  @POST
  @Consumes( MediaType.APPLICATION_JSON )
  @Produces( MediaType.APPLICATION_JSON )
  @APIResponses( value = { @APIResponse( responseCode = "200", description = "Success", content = @Content( mediaType = "application/json" ) ),
                           @APIResponse( responseCode = "400", description = "Bad Request", content = @Content( mediaType = "application/json" ) ) } )
  public Multi< TestData > exportTestData( TestRequest request ) {
    LOGGER.info( "Test Request received" );
    LOGGER.debug( request.toString() );
    try {
      LOGGER.debug( "Validating request" );
      helper.validateRequest( request );
      ResponseBuilder.create( RestResponse.Status.ACCEPTED );
    } catch( ConstraintViolationException e ) {
      LOGGER.warn( e.getConstraintViolations() );
      ResponseBuilder.create( RestResponse.Status.BAD_REQUEST );
    }
    TestExporter exporter = new TestExporter( client, request );
    Multi< TestData > result = exporter.process();
    return result;
  }
}

My Exporter class doing the Database query, etc. looks the following:

public class TestExporter {

  private static final Logger LOGGER = Logger.getLogger( TestExporter.class );

  private final TestRequest request;

  private PreparedQuery< RowSet< Row > > preparedQueryStatement;

  private List< Object > tupleList;

  public TestExporter( PgPool client, TestRequest request ) {
    this.request = request;
    this.tupleList = new ArrayList<>();
    this.preparedQueryStatement = client.preparedQuery( preparePreparedStatement() );
  }

  public Multi< TestData > process() {
    LOGGER.info( "Processing Test export request" );
    Multi< TestData > result =
        this.preparedQueryStatement.execute( Tuple.tuple( tupleList ) ).onItem().transformToMulti( set -> Multi.createFrom().iterable( set ) )
                                   .onItem().transform( TestExporter::from );
    return result;
  }

  private String preparePreparedStatement() {
    String selectSql = "SELECT * FROM test WHERE class_id = $1 AND timestamp >= $2 and timestamp <= $3 ORDER BY timestamp ASC;";
    tupleList.add( request.class_id );
    tupleList.add( request.time_min );
    tupleList.add( request.time_max );
    return selectSql;
  }

  private static TestData from( Row row ) {
    TestData test = new TestData();
    // set values
    return test;
  }

Now I have the problem, when I try to query to much data, that the memory will raise to much and at some point and a heap space problem occurs. Also some Warning is occuring: "(vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-6,5,main] has been blocked for 3533 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked".

Now I'm searching for a possibility to implement some kind of pagination. My preferred way would be to implement pagination via taking the time_min and time_max value, stripping this time frame down to e.g. hour by hour. Each hour would than be one "page" to be processed.

The reactive-pg-client gives back the Multi which I directly return to the client. So my goal would be, to rework everything that each page is processed, the Multi or something similar is sent back to the client, resources are released and next page (hour) should be processed. But I don't know how I can achieve some behavior like this. I haven't find anything in this regards in the internet.

At least my biggest question is how to rework the application so that it use the REST request information, query e.g. one hour of data, streaming back this data to client, best would be to release resources if needed and go on with the next hour(page).. So that I will not blow up the heap space and also the reply is more a stream than a huge one shot reply. I think this should mainly affect the process method in the TestExporter class.

For sure I could easily add some LIMIT and OFFSET to the REST API and let the client handle this, but first it's slow especially when the dataset is huge and also not really convenient.

Would be nice if somebody could help me out, or at least propose a better solution. Thank you :-)

0

There are 0 answers