Reconnect using AnyEvent::Handle and tcp_connect

1k views Asked by At

I have a simple TCP server and client written using AnyEvent::Handle leveraging tcp_connect and tcp_server. The client connects to the server and sends the string Test Message every 5 seconds.

This works without issue if the server is reachable, however, if the server is unavailable when the client launches, or becomes unavailable, the client script never tries to reconnect.

I'd like it to try and reconnect if the connection handle is not available (destroyed?). If unavailable, do things (print status message perhaps), but try and reconnect every 5 seconds would be the ideal outcome.

I'm not sure how to do that. I've pared down my client and server code to the following.

Client

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;

my @bulk;

# Start Timer
my $timer = AnyEvent->timer(
    after    => 5,
    interval => 5,
    cb       => sub {
        push( @bulk, "Test message" );
        flush( \@bulk );
        undef @bulk;
    } );

my $host = '127.0.0.1';
my $port = 9999;

my $conn_cv = AnyEvent->condvar;
my $conn_hdl;

$conn_hdl = AnyEvent::Handle->new(
    connect          => [$host, $port],
    keepalive        => 1,
    on_connect_error => sub {
        print "Could not connect: $_[1]\n";
        $conn_hdl->destroy;

        #$conn_cv->send;
    },
    on_error => sub {
        my ( $out_hdl, $fatal, $msg ) = @_;
        AE::log error => $msg;
        $conn_hdl->destroy;

        #$conn_cv->send;
    },
    on_read => sub {
        my ( $self ) = @_;
        $self->unshift_read(
            line => sub {
                my ( $hdl, $data ) = @_;
                print $data. "\n";
            } );
    } );

$conn_cv->recv;

# Flush array of events
sub flush {
    my ( $bulk ) = @_;
    return 0 if scalar @{$bulk} == 0;

    my $output = join( ",", @{$bulk} );
    $output = compress( $output );
    my $l = pack( "N", length( $output ) );
    $output = $l . $output;
    $conn_hdl->push_write( $output );
}

Server

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;

my %holding;

my $host = '127.0.0.1';
my $port = 9999;

my %connections;

# Start Timer
my $timer = AnyEvent->timer(
    after    => 5,
    interval => 5,
    cb       => sub {
        print "Number of connected hosts: ";
        print scalar keys %connections;
        print "\n";
        foreach my $k ( keys %connections ) {
            delete $connections{$k} if $connections{$k}->destroyed;
        }
    } );

my $server_cv = AnyEvent->condvar;
my $server    = tcp_server(
    $host, $port,
    sub {
        my ( $fh, $h, $p ) = @_;
        my $handle;

        $handle = AnyEvent::Handle->new(
            fh        => $fh,
            poll      => 'r',
            keepalive => 1,
            on_read   => sub {
                my ( $self ) = @_;

                # Get Length Header
                $self->unshift_read(
                    chunk => 4,
                    sub {
                        my $len = unpack( "N", $_[1] );

                        # Get Data
                        $self->unshift_read(
                            chunk => $len,
                            sub {
                                my $data = $_[1];
                                $data = uncompress( $data );
                                print $data. "\n";
                            } );
                    } );

            },
            on_eof => sub {
                my ( $hdl ) = @_;
                $hdl->destroy();
            },
            on_error => sub {
                my ( $hdl ) = @_;
                $hdl->destroy();
            },
        );

        $connections{ $h . ':' . $p } = $handle;    # keep it alive.
    } );

$server_cv->recv;
2

There are 2 answers

4
ikegami On BEST ANSWER

You can use the following:

package MyConnector;

use strict;
use warnings;

use AE               qw( );
use AnyEvent::Handle qw( );
use Scalar::Util     qw( );

sub new {
   my $class = shift;
   my %opts = @_;

   my $self = bless({}, $class);

   {
      Scalar::Util::weaken(my $self = $self);

      my $on_connect       = delete($opts{on_connect});
      my $on_connect_error = delete($opts{on_connect_error});

      my $tries    = delete($opts{tries})    ||  5;
      my $cooldown = delete($opts{cooldown}) || 15;

      $self->{_connect} = sub {
         $self->{_timer} = undef;

         $self->{_handle} = AnyEvent::Handle->new(
            %opts,

            on_connect => sub {
               my ($handle, $host, $port, $retry) = @_;

               $self->{handle} = $handle;
               delete @{$self}{qw( _connect _handle _timer )};

               $on_connect->($handle, $host, $port, $retry)
                  if $on_connect;
            },

            on_connect_error => sub {
               my ($handle, $message) = @_;

               if (!$tries--) {
                  $on_connect_error->($handle, $message)
                     if $on_connect_error;

                  delete @{$self}{qw( _connect _handle _timer )};

                  return;
               }

               # This will happen when this callback returns,
               # but that might not be for a while, so let's
               # do it now in case it saves resources.
               $handle->destroy();

               $self->{_timer} = AE::timer($cooldown, 0, $self->{_connect});
            },
         );
      };

      $self->{_connect}->();
   }

   return $self;
}

sub handle {
   my ($self) = @_;
   return $self->{handle};
}

1;

I'm pretty sure it's free of memory leaks (unlike your code). You'd use it as follows:

use strict;
use warnings;

use AE          qw( );
use MyConnector qw( );

my $host = $ARGV[0] || 'www.stackoverflow.com';
my $port = $ARGV[1] || 80;

my $conn_cv = AE::cv();

my $connector = MyConnector->new(
   connect   => [ $host, $port ],
   keepalive => 1,

   on_connect => sub {
       print("Connected successfully\n");
       $conn_cv->send();
   },

   on_connect_error => sub {
       warn("Could not connect: $_[1]\n");
       $conn_cv->send();
   },

   # ...
);

$conn_cv->recv();
0
Sgt B On

Thanks to ikegami, I figured I might want to track the connection status and use that in conjunction with another AnyEvent timer watcher to reconnect if the connection isn't established. This results in connection attempts every second if the connection status ($isConnected) is zero. Meanwhile, the events are queued up for when the connection is reestablished.

If there is a simpler way to accomplish this, I'm all ears, but for now I think this will solve the issue.

my @bulk;
my $host = '127.0.0.1';
my $port = 9999;
my $isConnected = 0;

my $conn_cv = AnyEvent->condvar;
my $conn_hdl;

# Flush Timer
my $timer = AnyEvent->timer(
    after => 5,
    interval => 5,
    cb => sub {
        push(@bulk,"Test message");
        if ($isConnected == 1) {
            flush(\@bulk);
            undef @bulk;
        }   
    }
);

# Reconnect Timer
my $reconn = AnyEvent->timer(
    after => 1,
    interval => 1,
    cb => sub {

        if ($isConnected == 0) {

            $conn_hdl = AnyEvent::Handle->new(
                connect => [$host, $port],
                keepalive => 1,
                on_connect => sub {
                    $isConnected = 1;
                },
                on_connect_error => sub {
                    warn "Could not connect: $_[1]\n";
                    $conn_hdl->destroy;
                    $isConnected = 0;       
                },
                on_error => sub {
                    my ($out_hdl, $fatal, $msg) = @_;
                    AE::log error => $msg;
                    $conn_hdl->destroy;
                    $isConnected = 0;
                },
                on_eof => sub {
                    warn "EOF\n";
                    $conn_hdl->destroy;
                    $isConnected = 0;
                },
                on_read => sub {
                    my ($self) = @_;
                        $self->unshift_read(line => sub {
                            my ($hdl,$data) = @_;
                            print $data."\n";
                    });
                }
            );  
        }
    }
);

$conn_cv->recv;