At https://hexdocs.pm/gen_stage/GenStage.html#module-init-and-subscribe_to I define the GenStage modules with subscribe_to
option
defmodule A do
use GenStage
def start_link(number) do
GenStage.start_link(A, number)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
defmodule B do
use GenStage
def start_link(number) do
GenStage.start_link(B, number)
end
def init(number) do
{:producer_consumer, number, subscribe_to: [{A, max_demand: 10}]}
end
def handle_events(events, _from, number) do
events = Enum.map(events, & &1 * number)
{:noreply, events, number}
end
end
defmodule C do
use GenStage
def start_link() do
GenStage.start_link(C, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [B]}
end
def handle_events(events, _from, state) do
# Wait for a second.
Process.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
If I run them manually it works
iex(1)> GenStage.start_link(A, 0, name: A)
{:ok, #PID<0.195.0>}
iex(2)> GenStage.start_link(B, 2, name: B)
{:ok, #PID<0.197.0>}
iex(3)> GenStage.start_link(C, :ok)
{:ok, #PID<0.199.0>}
[0, 2, 4, 6, 8]
[10, 12, 14, 16, 18]
[20, 22, 24, 26, 28]
[30, 32, 34, 36, 38]
‘(*,.0’
Then it suggest it can be added in a supervisor tree:
In a supervision tree, this is often done by starting multiple workers:
defmodule TestDep.Application do
@moduledoc false
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
worker(A, [0]),
worker(B, [2]),
worker(C, []),
]
opts = [strategy: :rest_for_one]
Supervisor.start_link(children, opts)
end
end
That’s my supervisor tree but when running the app with iex -S mix
I receive a:
** (Mix) Could not start application testdep: TestDep.Application.start(:normal, []) returned an error: shutdown: failed to start child: B ** (EXIT) no process: the process is not alive or there’s no process currently associated with the given name, possibly because its application isn’t started
My app is defined on mix.ex
as
def application do
[
extra_applications: [:logger],
mod: {TestDep.Application, []}
]
end
Is there something I am missing?
It does not :( What works is not what you wanted to test and is not the equivalent of what you are starting in the supervision tree, this is:
That said, you likely want to pass names to wrapped
GenStage.start_link
:And the same for the rest.