This is my first attempt at the Perl Object Environment (POE). I am attempting to create a program that will run as a service that obtains a list of devices via couchdb, spawns child processes to ping them in 60 second intervals, while restricting the maximum number of concurrent child processes to 3.
I am able to successfully re-queue child processes after a delay (1 min), however, I am not sure how to manage multiple alarms/delays that call the same event. I am attempting to store them in $_[HEAP]->{timers}->{$_}
where $_
is a given host.
#!/usr/bin/perl
use warnings;
use strict;
use POE qw(Wheel::Run Filter::Reference);
use CouchDB::Client;
use Data::Dumper;
use constant MAX_CONCURRENT_TASKS => 3;
our $couch = CouchDB::Client->new(uri => 'http://192.168.1.100:5984/')->newDB('devices');
POE::Session->create(
inline_states => {
_start => sub {
push (@{$_[HEAP]->{devices}}, $couch->newDoc($_->id)->retrieve->id) for @{$couch->listDocs};
$_[KERNEL]->delay_set('refresh', 60);
$_[HEAP]->{timers}->{$_} = $_[KERNEL]->delay_set('spawn', 1, $_) for @{$_[HEAP]->{devices}};
},
refresh => sub {
undef @{$_[HEAP]->{devices}};
$_[KERNEL]->delay_set('refresh', 60);
push (@{$_[HEAP]->{devices}}, $couch->newDoc($_->id)->retrieve->id) for @{$couch->listDocs};
print "\nRefreshing device list.\n\n";
},
spawn => sub {
if (keys(%{$_[HEAP]->{task}}) < MAX_CONCURRENT_TASKS) {
print "Starting $_[ARG0].\t # of tasks running: ". keys(%{$_[HEAP]->{task}}),$/;
my $host = $_[ARG0];
my $task = POE::Wheel::Run->new(
Program => sub { \&do_stuff($host) },
StdoutFilter => POE::Filter::Reference->new(),
StdoutEvent => "task_result",
StderrEvent => "task_debug",
CloseEvent => "task_done"
);
$_[HEAP]->{task}->{$task->ID} = $task;
$_[KERNEL]->sig_child($task->PID, "sig_child", $_[ARG0])
} else {
$_[KERNEL]->delay_adjust($_[HEAP]->{timers}->{$_[ARG0]}, 5);
}
print Dumper 'spawn', sort $_[HEAP]->{timers};
},
task_result => sub {
print "Result for $_[ARG0]->{task}: $_[ARG0]->{status}\n";
},
task_done => sub {
delete $_[HEAP]->{task}->{$_[ARG0]};
},
task_debug => sub {
print "Debug: $_[ARG0]\n";
},
sig_child => sub {
delete $_[HEAP]->{$_[ARG1]};
$_[HEAP]->{timers}->{$_[ARG3]} = $_[KERNEL]->delay_set('spawn', 60, $_[ARG3]) if $_[ARG3] ~~ $_[HEAP]->{devices};
$_[KERNEL]->alarm_remove($_[HEAP]->{timers}->{$_[ARG0]});
}
}
);
sub do_stuff {
binmode(STDOUT);
my $filter = POE::Filter::Reference->new();
sleep(rand 5);
my %result = (
task => shift,
status => "complete.",
);
my $output = $filter->put([\%result]);
print @$output;
}
POE::Kernel->run();
exit 0;
Any advice/strategies would be welcome.
Edit 1: I found out that $_[KERNEL]->delay
wasn't setting timers on a per child basis. I am able to get this to work by using $_[KERNEL]->delay_set
instead. What I am unable to piece together now, is how to restrict the program from running more than 3 processes at a given time.
I am creating the initial timers in _start
. As spawn
is being called $_[KERNEL]->delay_adjust
should extend the delay by 5 seconds if the child process count is 3 or higher.
Apologies for the time it took to respond to questions, this lives on my work PC and this edit is on Monday, my first day back.