If my GenStage's handle_demand/2
method looks like this:
def handle_demand(demand, _state) when demand > 0 do
case Queue.dequeue do
nil ->
Logger.debug("Queue empty.")
{:noreply, [], []}
{job, updated_queue} -> {:noreply, job, updated_queue}
end
end
How do I get it to "rerun" when my Queue
(a GenServer) is changed/updated?
My queue module just looks like this:
defmodule Queue do
use GenServer
### client
def start_link(state \\ []) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def queue, do: GenServer.call(__MODULE__, :queue)
def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})
def dequeue, do: GenServer.call(__MODULE__, :dequeue)
### server
def init(state), do: {:ok, state}
def handle_call(:dequeue, _from, [value | state]) do
{:reply, value, state}
end
def handle_call(:dequeue, _from, []), do: {:reply, nil, []}
def handle_call(:queue, _from, state), do: {:reply, state, state}
def handle_cast({:enqueue, value}, state) do
{:noreply, state ++ [value]}
end
end
Why would you want to “rerun” it when
Queue
changes? This is a drastic misusing ofGenStage
. It was invented to allow fight a back pressure, that comes fromQueue
, not vice versa. In real life, you either don’t need aGenStage
at all, or you don’t want to “rerun” demand whenQueue
gets updated because it will sooner or later kill it via timeouts/messagebox.You probably have kinda “consumer” to call
handle_demand
when it handles the previous load from the queue.GenStage
’s repo has four incredibly clear examples using different patterns to work withGenStage
. Besides that, there is a great intro toGenStage
in Elixir blog.Just pick up the pattern you need and adopt it from the sources linked above.