GenStage: How to handle situations where producer cannot provide events?

1.6k views Asked by At

Following scenario: A GenStage producer handles a Twitter Stream (using the Stream API and ExTwitter) and providing a set of tweets (max. the demand the consumer is asking for) to a GenStage consumer. The consumer then just prints them.

Following problem: I am looking for specific tweets so there aren't always new tweets available. If the GenStage producer is returning an empty list of events, the consumer will stop asking. See this issue and José Valims reply for more.

I am not sure how to address this issue. Any help is greatly appreciated. This is what I have so far:

defmodule MyApp.TwitterProducer do
  use GenStage
  alias MyApp.TwitterStream

  def start_link(:ok) do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    # This creates a regular Elixir Stream
    # I use this as the state so that not every
    # time the consumer asks for new data
    # a new stream is initiated
    stream = TwitterStream.get_stream
    {:producer, stream}
  end

  def handle_demand(demand, stream) do
    # Take tweets from the stream and 
    # turn them into a list. Then return 
    # them to the consumer
    chunk = Stream.take(stream, demand)
    events = Enum.to_list(chunk)
    {:noreply, events, stream}
  end


  def handle_info(_msg, state) do
    # I as getting an "wrong message" error 
    # before I implemented this function myself
    # It does nothing special to my case
    {:noreply, [], state}
  end

end

defmodule MyApp.TwitterConsumer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do

    Process.sleep(3000)
    IO.inspect(events)

    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end

end

# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)

What happens is: This runs for a while, then stops. As I understand as soon as there is an empty event list returned by the producer.

Edit: Interestingly enough: If I set demand to 1 it keeps running. But it is much, much slower than querying the Twitter Stream API directly. Meaning I receive ten times less tweets. My theory is that it is due to the repeated Stream.take calls instead of just calling Enum.to_list for the whole stream. But I find it still very confusing. Any ideas what I am missing?

1

There are 1 answers

2
Aleksei Matiushkin On

There is a significant (but unfortunately not expressed in bold) sentence in the documentation on GenStage.handle_demand/2:

The producer must either store the demand or return the events requested.

That said, instead of block on Stream.take one might be explicitly aware of the task might be blocking and handle the case, collecting the demand in such a case using Task.await/2 with a reasonable timeout (maybe Task.yield/2 could be of use in more complicated checks, but here it seems to be an overkill.)

From the documentation:

If you don’t want the task to fail then you must change the heavy_fun/0 code in the same way you would achieve it if you didn’t have the async call. For example, to either return {:ok, val} | :error results or, in more extreme cases, by using try/rescue.

The documentation lacks the examples, though. OTOH, here it probably would be easier just to return empty list and forget about collecting demand:

def handle_demand(demand, stream) do
  try do
    task = Task.async(fn ->
      stream
      |> Stream.take(demand)
      |> Enum.to_list()
    end)
    Task.await(task, 1000) # one sec
  catch
    :exit, {:timeout, {Task, :await, [_, 1000]}} ->
      {:noreply, [], stream}
  else
    events when is_list(events) ->
      {:noreply, events, stream}
  end
end