Elixir GenStage ConsumerSupervisor Restart Child

202 views Asked by At

From what I am reading here: https://hexdocs.pm/gen_stage/ConsumerSupervisor.html

all the implementations of the ConsumerSupervisor only start a child (the printer module in the link above) for each unit of work. Is there a way to restart the child if it dies?

To me, with the name 'ConsumerSupervisor' it would have the ability to restart children if the child doesn't normally shutdown. Has anyone done this before?

In My implemntation, I have the consumersupervisor starting a child that is actually a GenServer to perform work, then shut its self down..If it crashes abnormally, I want it to be restarted.

Thoughts.. I thought about just implementing a consumer that calls out to a dynamicsupervisor then starts children but that doesn't account for back preasure..

Here is how i have it implemented, but want the children to be restarted if they crash:

 defmodule Client.Strategy.EventPushing.SendingConsumer do
  use ConsumerSupervisor

  def start_link() do
    ConsumerSupervisor.start_link(__MODULE__, :ok)
  end

  # Callbacks

  def init(:ok) do
    children = [
      worker(Client.DynamicClient, [], restart: :temporary)
    ]

    {:ok, children, strategy: :one_for_one, subscribe_to: [{Client.Strategy.EventPushing.SendingProducerConsumer, max_demand: 10}]}
  end
end

And the children that spin up look like this:

defmodule Client.DynamicClient do
  use GenServer
  require Logger

  # Public API's
  def start_link(event) do
    Client.Statix.timing("sending", 1)
    GenServer.start_link(
      __MODULE__,
      event,
      name: String.to_atom(event.sequence_number)
    )
  end

  def schedule_work() do
    send(self(), :start_work)
  end

  # Call Backs
  def init(event) do
    schedule_work()
    {:ok, event}
  end

  def handle_info(
        :start_work,
        %{
          :filter => filter,
          :sequence_number => sequence_number,
          :message => message,
          :ip_address => ip,
          :port => port
        } = state
      ) do

    opts = [:binary, active: true]

    case :gen_tcp.connect(ip, port, opts) do
      {:ok, socket} ->
        :gen_tcp.send(socket, filter)
        :gen_tcp.send(socket, message)

      {:error, reason} ->
        Logger.error("Error")
        Process.exit(self(), :kill)
    end

    {:noreply,state}
  end

  def handle_info({:tcp, socket, msg}, state) do
    # :inet.setopts(socket, active: :once)

    :gen_tcp.close(socket)
     Process.exit(self(), :kill)
    {:noreply, state}
  end

  def handle_info({:tcp_closed, _socket,}, state) do
    Client.SendingSupervisor.stop_child(self())
  end
end

Thanks

1

There are 1 answers

0
Gabriel Rufino On

The problem is here: worker(Client.DynamicClient, [], restart: :temporary) If you want your "DynamicClient" to be restarted upon crashes, you should use restart: :permanent