How can I wait for the completion of http_request in AnyEvent::HTTP?

392 views Asked by At

The program takes jobs from the database and executes AnyEvent::HTTP::http_request, how to correctly exit the program, after waiting for the completion of all requests?

#!/usr/bin/perl
use strict;
use AnyEvent;
use AnyEvent::DBI;
use AnyEvent::HTTP;


my @queue;
my $added = 0;
my $finished = 0;
my $uid = int(rand(900000)) + 100000;
my ($run, $jobs, $log, $quit);

my $db = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');

my $cv = AnyEvent->condvar;

$run = AnyEvent->timer(
    after => 5,
    interval => 0.1,
    cb => sub {
        if ($#queue != -1 && $added - $finished < 300) {
            my $job = shift @queue;
            my $r; $r = http_request(
                GET => $job->{url},
                sub {
                    undef $r;
                    my ($body, $header) = @_;
                    ...
                    $finished++;
                }
            );
            $added++;
        }
    }
);

$jobs = AnyEvent->timer(
    after => 0.1,
    interval => 5,
    cb => sub {
        if ($#queue < 1000) {
            $db->exec(q{UPDATE `jobs` SET `lock` = ? WHERE `lock` = 0 ORDER BY `time` ASC LIMIT 1000}, $uid, sub {
                $db->exec(q{SELECT * FROM `jobs` WHERE `lock` = ?}, $uid, sub {
                    my ($db, $rows, $rv) = @_;
                    push @queue, @$rows;
                    $db->exec(q{UPDATE `jobs` SET `lock` = 1 WHERE `lock` = ?}, $uid, sub { });
                });
            });
        }
    }
);

$log = AnyEvent->timer(
    after => 5,
    interval => 3,
    cb => sub {
        printf "Queue: %-6d Added: %-6d During: %-6d Total: %-6d\n", $#queue, $added, $added-$finished, $finished;
    }
);

$quit = AnyEvent->timer(
    after => 1,
    interval => 10,
    cb => sub {
         if (-f 'stop') {
            print "Exit\n";
            $cv->send;
         }
    }
);

my $result = $cv->recv;

And maybe you know the best way to queue jobs and execute them, then show your templates in AnyEvent + AnyEvent::HTTP, I use AnyEvent timers, which is better and faster ?

New version :

#!/usr/bin/perl
use strict;
use AnyEvent;
use AnyEvent::DBI;
use AnyEvent::HTTP;

my $pid = int(rand(900000)) + 100000;
my $run = 1;
my $capacity = 300;
my $added = 0;
my $finished = 0;

my $cv = AnyEvent->condvar;

my $dbh = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');

my $log = AnyEvent->timer(
    after => 1,
    interval => 3,
    cb => sub {
        printf "Added: %-6d Finished: %-6d Active: %-6d\n", $added, $finished, $AnyEvent::HTTP::ACTIVE if $finished;
        if ($run == 0 && $AnyEvent::HTTP::ACTIVE == 0) {
            $cv->send;
        }
    }
);

while (! -f 'stop') {
        my $done = AnyEvent->condvar;
        $dbh->exec(q{UPDATE `jobs` SET `lock` = ? WHERE `lock` = 0 ORDER BY `time` ASC LIMIT ?}, $pid, $capacity, sub {
            $dbh->exec(q{SELECT * FROM `jobs` WHERE `lock` = ?}, $pid, sub {
                my ($dbh, $rows, $rv) = @_;
                $dbh->exec(q{UPDATE `jobs` SET `lock` = 1 WHERE `lock` = ?}, $pid, sub {});
                $done->send($rows);
            });
        });
        my $jobs = $done->recv;

        my $done = AnyEvent->condvar;
        foreach my $job (@$jobs) {
            my $content;
            my $r; $r = http_request(
                GET =>  $job->{url},
                sub {
                    undef $r;
                    my ($body, $header) = @_;
                    ...
                    $dbh->exec(q{UPDATE `jobs` SET `lock` = 0, `time` = CURRENT_TIMESTAMP WHERE `id` = ?}, $job->{id}, sub {});
                    $done->send if $AnyEvent::HTTP::ACTIVE < $capacity;
                    $finished++;
                }
            );
            $added++;
        }
        $done->recv;
}
$run = 0;
$cv->recv;
1

There are 1 answers

1
ikegami On BEST ANSWER

You basically need a three-state system:

  1. Running (new jobs accepted).
  2. Exiting (no new jobs accepted; waiting for existing jobs to complete).
  3. Exit (no jobs running).

Both of your version fail because they only have two states:

  1. Running (new jobs accepted)
  2. Exit (jobs could still be running!!!)

Here's a solution:

#!/usr/bin/perl
use strict;
use warnings;

use AE             qw( );
use AnyEvent       qw( );
use AnyEvent::DBI  qw( );
use AnyEvent::HTTP qw( );
use Scalar::Util   qw( weaken );
use Sys::Hostname  qw( hostname );

my $capacity = 300;

my $exit_request = AE::cv();
my $done         = AE::cv();
my $exiting      = 0;
my $grabbing     = 0;
my $added        = 0;
my $finished     = 0;

my $uid = join('.', hostname(), $$, int(rand(65536)));

my $db = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');

sub worker {
    my ($job, $cb) = @_;
    http_request(
        GET =>  $job->{url},
        sub {
            my ($body, $header) = @_;
            ...
            $cb->();
        },
    );
}

sub manager {
    my $active = $added - $finshed;
    if ($exiting) {
        $done->send() if !$active;
        return;
    }

    my $avail_slots = $capacity - $active;
    return if !$avail_slots;

    return if $grabbing;

    $grabbing = 1;
    $db->exec(qq{UPDATE `jobs` SET `lock` = ? WHERE `status` = 'queued' AND `lock` = 0 ORDER BY `time` ASC LIMIT $avail_slots}, $uid, sub {
        $db->exec(q{SELECT * FROM `jobs` WHERE `status` = 'queued' AND `lock` = ?}, $uid, sub {
            my (undef, $jobs, undef) = @_;
            $db->exec(q{UPDATE `jobs` SET `status = 'wip' WHERE `lock` = ?}, $uid, sub {
                $grabbing = 0;
                for my $job (@$jobs) {
                    ++$added;
                    worker($job, sub {
                        ++$finished;
                        $db->exec(q{UPDATE `jobs` SET `status = 'done', `lock` = 0, `time` = CURRENT_TIMESTAMP WHERE `id` = ?}, $job->{id}, sub { });
                        manager();
                    });
                });
            });
        });
    });
};

my $db_poll_timer = AE::timer(5, 0.5, \&manager);

my $exit_check_timer = AE::timer(0, 2, sub {
    $exit_request->send() if -f 'stop';
});

my $log_timer = AE::timer(1, 3, sub {
    printf("Added: %-6d Finished: %-6d Active: %-6d\n",
        $added, $finished, $added-$finished);
});

$exit_request->recv();
print("Exiting...\n");
undef $exit_check_timer;
$exiting = 1;

$done->recv();
print("Finished.\n");
undef $db_poll_timer;
undef $log_timer;

Features:

  • Removed the @queue in order to avoid stealing work from other tasks.
  • I separated the worker code from the worker management code.
  • I removed any dependency of the worker being an http_request.
  • Support for long-running workers (like your first version, but unlike your second version) through timer-based polling.
  • Fixed a race-condition in the job-grabbing code (that existed in your first version, but not in your second version) using $grabbing.

I went beyond the scope:

  • Used the shortcuts from AE.
  • I used a more reliable lock value. It's now a string which even identifies the machine and process that holds the lock.
  • I added a field to the job table (status) indicating the state of the job rather than reusing the lock field.

Future work: