Perl POE: Alarm/Delay queueing

220 views Asked by At

This is my first attempt at the Perl Object Environment (POE). I am attempting to create a program that will run as a service that obtains a list of devices via couchdb, spawns child processes to ping them in 60 second intervals, while restricting the maximum number of concurrent child processes to 3.

I am able to successfully re-queue child processes after a delay (1 min), however, I am not sure how to manage multiple alarms/delays that call the same event. I am attempting to store them in $_[HEAP]->{timers}->{$_} where $_ is a given host.

#!/usr/bin/perl

use warnings;
use strict;
use POE qw(Wheel::Run Filter::Reference);
use CouchDB::Client;
use Data::Dumper;

use constant MAX_CONCURRENT_TASKS => 3;

our $couch      = CouchDB::Client->new(uri => 'http://192.168.1.100:5984/')->newDB('devices');

POE::Session->create(
        inline_states => {
                _start      => sub {
                        push (@{$_[HEAP]->{devices}}, $couch->newDoc($_->id)->retrieve->id) for @{$couch->listDocs};
                        $_[KERNEL]->delay_set('refresh', 60);
                        $_[HEAP]->{timers}->{$_} = $_[KERNEL]->delay_set('spawn', 1, $_) for @{$_[HEAP]->{devices}};
                }, 
                refresh => sub { 
                        undef @{$_[HEAP]->{devices}};
                        $_[KERNEL]->delay_set('refresh', 60); 
                        push (@{$_[HEAP]->{devices}}, $couch->newDoc($_->id)->retrieve->id) for @{$couch->listDocs};
                        print "\nRefreshing device list.\n\n";
                }, 
                spawn   => sub { 
                        if (keys(%{$_[HEAP]->{task}}) < MAX_CONCURRENT_TASKS) { 
                                print "Starting $_[ARG0].\t # of tasks running: ". keys(%{$_[HEAP]->{task}}),$/;
                                my $host = $_[ARG0];
                                my $task = POE::Wheel::Run->new(        
                                        Program      => sub { \&do_stuff($host) },
                                        StdoutFilter => POE::Filter::Reference->new(),
                                        StdoutEvent  => "task_result",
                                        StderrEvent  => "task_debug",
                                        CloseEvent   => "task_done"
                                );
                                $_[HEAP]->{task}->{$task->ID} = $task;
                                $_[KERNEL]->sig_child($task->PID, "sig_child", $_[ARG0]) 
                        } else { 
                                $_[KERNEL]->delay_adjust($_[HEAP]->{timers}->{$_[ARG0]}, 5);
                        }
                        print Dumper 'spawn', sort $_[HEAP]->{timers};
                },
                task_result => sub { 
                        print "Result for $_[ARG0]->{task}: $_[ARG0]->{status}\n";
                },
                task_done   => sub { 
                        delete $_[HEAP]->{task}->{$_[ARG0]};
                },
                task_debug  => sub { 
                        print "Debug: $_[ARG0]\n";
                },
                sig_child   => sub { 
                        delete $_[HEAP]->{$_[ARG1]};
                        $_[HEAP]->{timers}->{$_[ARG3]} = $_[KERNEL]->delay_set('spawn', 60, $_[ARG3]) if $_[ARG3] ~~ $_[HEAP]->{devices};  
                        $_[KERNEL]->alarm_remove($_[HEAP]->{timers}->{$_[ARG0]});
                }
        }
);
sub do_stuff {
        binmode(STDOUT);
        my $filter = POE::Filter::Reference->new();

        sleep(rand 5);

        my %result = (
                task   => shift,
                status => "complete.",
        );

        my $output = $filter->put([\%result]);
        print @$output;
}
POE::Kernel->run();
exit 0;

Any advice/strategies would be welcome.

Edit 1: I found out that $_[KERNEL]->delay wasn't setting timers on a per child basis. I am able to get this to work by using $_[KERNEL]->delay_set instead. What I am unable to piece together now, is how to restrict the program from running more than 3 processes at a given time.

I am creating the initial timers in _start. As spawn is being called $_[KERNEL]->delay_adjust should extend the delay by 5 seconds if the child process count is 3 or higher.

Apologies for the time it took to respond to questions, this lives on my work PC and this edit is on Monday, my first day back.

0

There are 0 answers