From what I am reading here: https://hexdocs.pm/gen_stage/ConsumerSupervisor.html
all the implementations of the ConsumerSupervisor only start a child (the printer module in the link above) for each unit of work. Is there a way to restart the child if it dies?
To me, with the name 'ConsumerSupervisor' it would have the ability to restart children if the child doesn't normally shutdown. Has anyone done this before?
In My implemntation, I have the consumersupervisor starting a child that is actually a GenServer to perform work, then shut its self down..If it crashes abnormally, I want it to be restarted.
Thoughts.. I thought about just implementing a consumer that calls out to a dynamicsupervisor then starts children but that doesn't account for back preasure..
Here is how i have it implemented, but want the children to be restarted if they crash:
defmodule Client.Strategy.EventPushing.SendingConsumer do
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(Client.DynamicClient, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [{Client.Strategy.EventPushing.SendingProducerConsumer, max_demand: 10}]}
end
end
And the children that spin up look like this:
defmodule Client.DynamicClient do
use GenServer
require Logger
# Public API's
def start_link(event) do
Client.Statix.timing("sending", 1)
GenServer.start_link(
__MODULE__,
event,
name: String.to_atom(event.sequence_number)
)
end
def schedule_work() do
send(self(), :start_work)
end
# Call Backs
def init(event) do
schedule_work()
{:ok, event}
end
def handle_info(
:start_work,
%{
:filter => filter,
:sequence_number => sequence_number,
:message => message,
:ip_address => ip,
:port => port
} = state
) do
opts = [:binary, active: true]
case :gen_tcp.connect(ip, port, opts) do
{:ok, socket} ->
:gen_tcp.send(socket, filter)
:gen_tcp.send(socket, message)
{:error, reason} ->
Logger.error("Error")
Process.exit(self(), :kill)
end
{:noreply,state}
end
def handle_info({:tcp, socket, msg}, state) do
# :inet.setopts(socket, active: :once)
:gen_tcp.close(socket)
Process.exit(self(), :kill)
{:noreply, state}
end
def handle_info({:tcp_closed, _socket,}, state) do
Client.SendingSupervisor.stop_child(self())
end
end
Thanks
The problem is here:
worker(Client.DynamicClient, [], restart: :temporary)
If you want your "DynamicClient" to be restarted upon crashes, you should userestart: :permanent