Start fetch, queue intermediate requests during fetch, then serve data for all

643 views Asked by At

I have a problem implementing the following flow using Elixir and Phoenix:

  1. Request from User A, 3rd party API cache is empty
  2. Initiate 3rd party API fetch via HTTP
    1. Fetch not finished yet, Request from User B comes in
    2. User B waits for fetch to complete
  3. Fetch finishes, write fetched data to cache (e.g. Redis)
  4. Serve all waiting users with the cached data

Different routes or route parameters should use different queues. Requests that come in while the 3rd party API data is still fetching should in no case trigger additional fetches with the same parameters. The waiting part (2.2.) is crucial to me.

From what I read so far, this problem seems to be solvable using standard Elixir / Erlang / OTP features.

1

There are 1 answers

6
Dogbert On BEST ANSWER

Yes, this can be done quite easily in Elixir/Erlang compared to most other languages. Here's one way to do this with in memory caching. The thing to note here if you've used GenServer before but not GenServer.reply/2, is that we store the from parameter of incoming handle_call requests and when the request is complete, we respond to each of them. I'm not handling errors in a good way in this POC code but it handles the most interesting part, which is 2.2, correctly:

defmodule CachedParallelHTTP do
  def start_link do
    GenServer.start_link(__MODULE__, :ok)
  end

  def init(_) do
    {:ok, %{}}
  end

  def handle_call({:fetch, arg}, from, state) do
    case state[arg] do
      %{status: :fetched, response: response} ->
        # We've already made this request; just return the cached response.
        {:reply, response, state}
      %{status: :fetching} ->
        # We're currently running this request. Store the `from` and reply to the caller later.
        state = update_in(state, [arg, :froms], fn froms -> [from | froms] end)
        {:noreply, state}
      nil ->
        # This is a brand new request. Let's create the new state and start the request.
        pid = self()
        state = Map.put(state, arg, %{status: :fetching, froms: [from]})
        Task.start(fn ->
          IO.inspect {:making_request, arg}
          # Simulate a long synchronous piece of code. The actual HTTP call should be made here.
          Process.sleep(2000)
          # dummy response
          response = arg <> arg <> arg
          # Let the server know that this request is done so it can reply to all the `froms`,
          # including the ones that were added while this request was being executed.
          GenServer.call(pid, {:fetched, arg, response})
        end)
        {:noreply, state}
    end
  end

  def handle_call({:fetched, arg, response}, _from, state) do
    # A request was completed.
    case state[arg] do
      %{status: :fetching, froms: froms} ->
        IO.inspect "notifying #{length(froms)} clients waiting for #{arg}"
        # Reply to all the callers who've been waiting for this request.
        for from <- froms do
          GenServer.reply(from, response)
        end
        # Cache the response in the state, for future callers.
        state = Map.put(state, arg, %{status: :fetched, response: response})
        {:reply, :ok, state}
    end
  end
end

Here's a short piece of code to test this:

now = fn -> DateTime.utc_now |> DateTime.to_iso8601 end

{:ok, s} = CachedParallelHTTP.start_link
IO.inspect {:before_request, now.()}
for i <- 1..3 do
  Task.start(fn ->
    response = GenServer.call(s, {:fetch, "123"})
    IO.inspect {:response, "123", i, now.(), response}
  end)
end
:timer.sleep(1000)
for i <- 1..5 do
  Task.start(fn ->
    response = GenServer.call(s, {:fetch, "456"})
    IO.inspect {:response, "456", i, now.(), response}
  end)
end
IO.inspect {:after_request, now.()}
:timer.sleep(10000)

Output:

{:before_request, "2017-01-06T10:30:07.852986Z"}
{:making_request, "123"}
{:after_request, "2017-01-06T10:30:08.862425Z"}
{:making_request, "456"}
"notifying 3 clients waiting for 123"
{:response, "123", 3, "2017-01-06T10:30:07.860758Z", "123123123"}
{:response, "123", 2, "2017-01-06T10:30:07.860747Z", "123123123"}
{:response, "123", 1, "2017-01-06T10:30:07.860721Z", "123123123"}
"notifying 5 clients waiting for 456"
{:response, "456", 5, "2017-01-06T10:30:08.862556Z", "456456456"}
{:response, "456", 4, "2017-01-06T10:30:08.862540Z", "456456456"}
{:response, "456", 3, "2017-01-06T10:30:08.862524Z", "456456456"}
{:response, "456", 2, "2017-01-06T10:30:08.862504Z", "456456456"}
{:response, "456", 1, "2017-01-06T10:30:08.862472Z", "456456456"}

Notice that using GenServer.reply and Task.start, a single GenServer is able to handle more than 1 parallel requests while keeping the user facing API fully synchronous. Depending on how much load you want to handle, you might want to look into using a pool of GenServers.