Following scenario: A GenStage producer handles a Twitter Stream (using the Stream API and ExTwitter) and providing a set of tweets (max. the demand the consumer is asking for) to a GenStage consumer. The consumer then just prints them.
Following problem: I am looking for specific tweets so there aren't always new tweets available. If the GenStage producer is returning an empty list of events, the consumer will stop asking. See this issue and José Valims reply for more.
I am not sure how to address this issue. Any help is greatly appreciated. This is what I have so far:
defmodule MyApp.TwitterProducer do
  use GenStage
  alias MyApp.TwitterStream
  def start_link(:ok) do
    GenStage.start_link(__MODULE__, :ok)
  end
  def init(:ok) do
    # This creates a regular Elixir Stream
    # I use this as the state so that not every
    # time the consumer asks for new data
    # a new stream is initiated
    stream = TwitterStream.get_stream
    {:producer, stream}
  end
  def handle_demand(demand, stream) do
    # Take tweets from the stream and 
    # turn them into a list. Then return 
    # them to the consumer
    chunk = Stream.take(stream, demand)
    events = Enum.to_list(chunk)
    {:noreply, events, stream}
  end
  def handle_info(_msg, state) do
    # I as getting an "wrong message" error 
    # before I implemented this function myself
    # It does nothing special to my case
    {:noreply, [], state}
  end
end
defmodule MyApp.TwitterConsumer do
  use GenStage
  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end
  def init(:ok) do
    {:consumer, :the_state_does_not_matter}
  end
  def handle_events(events, _from, state) do
    Process.sleep(3000)
    IO.inspect(events)
    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end
end
# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)
What happens is: This runs for a while, then stops. As I understand as soon as there is an empty event list returned by the producer.
Edit: Interestingly enough: If I set demand to 1 it keeps running. But it is much, much slower than querying the Twitter Stream API directly. Meaning I receive ten times less tweets. My theory is that it is due to the repeated Stream.take calls instead of just calling Enum.to_list for the whole stream. But I find it still very confusing. Any ideas what I am missing?
 
                        
There is a significant (but unfortunately not expressed in bold) sentence in the documentation on
GenStage.handle_demand/2:That said, instead of block on
Stream.takeone might be explicitly aware of the task might be blocking and handle the case, collecting the demand in such a case usingTask.await/2with a reasonable timeout (maybeTask.yield/2could be of use in more complicated checks, but here it seems to be an overkill.)From the documentation:
The documentation lacks the examples, though. OTOH, here it probably would be easier just to return empty list and forget about collecting demand: