I'm trying to implement an extremely simple GenStage for the purpose of making sure I understand it and can get it working before I start introducing my specific application logic. I'm getting an error on the return data from the Producer. The data looks right to me. I've read that the second value in the returned tuple has to be a list, and it appears to be one.
Here is my GenStage implementation:
defmodule CoachActivity.Event.Producer do
use GenStage
def start_link(page) do
GenStage.start_link(CoachActivity.Event.Producer, page)
end
def init(page) do
{:producer, page}
end
def handle_demand(demand, page) when demand > 0 do
# in here we need to run the ecto query for the next X items?
events = get_fake(page)
IO.puts "PRODUCER: preparing to return events"
IO.puts "PRODUCER: page will be: #{page + 1}"
{:no_reply, events, page + 1}
end
defp get_fake(page) do
IO.puts "PRODUCER: generating events for page #{page}"
[
"Event One",
"Event Two",
"Event Three",
"Event Four",
"Event Five"
]
end
end
defmodule CoachActivity.Event.ProducerConsumer do
use GenStage
def start_link(page) do
GenStage.start_link(CoachActivity.Event.ProducerConsumer, page)
end
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
# in here we will do the transformation of events
IO.puts "PRODUCER_CONSUMER: importing a bunch of events"
events = Enum.map(events, fn(event) ->
CoachActivity.Event.ImportEvent.import(event)
end)
{:noreply, events, number}
end
end
defmodule CoachActivity.Event.Consumer do
use GenStage
def start_link() do
GenStage.start_link(CoachActivity.Event.Consumer, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Wait for a second.
Process.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
defmodule CoachActivity.Event.ImportEvent do
def import(event) do
IO.puts "Converting an event: #{event}"
end
end
defmodule KickStart do
def start do
{:ok, producer} = CoachActivity.Event.Producer.start_link(0)
{:ok, producer_consumer} = CoachActivity.Event.ProducerConsumer.start_link(2)
{:ok, consumer} = CoachActivity.Event.Consumer.start_link()
GenStage.sync_subscribe(consumer, to: producer_consumer)
GenStage.sync_subscribe(producer_consumer, to: producer)
end
end
When I do
KickStart.start
I get this error message output:
iex(4)> KickStart.start
PRODUCER: generating events for page 0
{:ok, #Reference<0.2077982655.942931969.179180>}
PRODUCER: preparing to return events
PRODUCER: page will be: 1
iex(5)> [error] GenServer #PID<0.1066.0> terminating
** (stop) bad return value: {:no_reply, ["Event One", "Event Two", "Event Three", "Event Four", "Event Five"], 1}
Last message: {:"$gen_producer", {#PID<0.1067.0>, #Reference<0.2077982655.942931969.179180>}, {:ask, 1000}}
State: 0
** (EXIT from #PID<0.1002.0>) evaluator process exited with reason: bad return value: {:no_reply, ["Event One", "Event Two", "Event Three", "Event Four", "Event Five"], 1}
What I'm struggling with is a) understanding WHAT that error message means, and of course, why it's manifesting.