Perl: Recommendations on how to forward messages received from blocking calls to multiple network sockets

236 views Asked by At

I have the following Perl problem: I am trying to build a simple network-enabled forwarder/consumer script.

  • the forwarder part loops and calls a local blocking function to retrieve a new string (i.e., from an existing library)

  • the consumer part consists of a server where multiple clients can connect to the server (TCP socket)

  • the idea is that every message retrieved by the forwarder part is passed on to the clients, i.e., forwarded to all connected client sockets.

I looked into threads/shared variables, but sockets cannot be shared this way. I also looked into the POE TCP Forwarding example (http://poe.perl.org/?POE_Cookbook/TCP_Forwarding), but there, I did not find out how to start a thread that can still use send to the %clients or the poe_kernel in this specific example (maybe there is a way ...).

In a nutshell:

  1. I need to fork or use some other threading mechanism to first start the forwarder and initiate the loop on the retrieval function
  2. I need to hand the retrieved data over to all connected clients/consumers.

Here is what I intend to do (BEWARE: abstract code):

$pid = fork();

if ($pid == 0)
{
  # forwarder/producer thread
  while(1)
  {
    $string = blocking_receive_function();
    foreach(@clients)
    {
      print($_ "$string");
    }
  }
}
else
{
  # start server and add clients to consumer list
  $server = IO::Socket::INET->new( ... );
  while ($client = $server->accept()) {
    push(@clients, $client);
    # fork for this client (to allow others) and 
    # wait for specific client it closes connection
  }
}

Any suggestions/recommendations for a good and efficient way to implement such a application is highly appreciated!

2

There are 2 answers

0
Oleg G On BEST ANSWER

Ok, there is more than one way to do it. But IMHO the simplest is to use Coro.

Here is quick example

use strict;
use Coro;
use Coro::Socket;
use Coro::Handle;
use Coro::PatchSet;

my @clients;
pipe(my $reader, my $writer);

defined( my $child = fork )
    or die 'fork: ', $!;

if ($child == 0) {
    close $reader;

    sub blocking_receive_function {
        sleep 1;
        return "test\n";
    }

    while (1) {
        my $str = blocking_receive_function();
        syswrite($writer, $str);
    }

    exit;
}

close $writer;
$reader = unblock $reader; # make it Coro aware

async_pool {
    # thread for reading blocking_receive_function results
    # and write it to clients
    while (1) {
        my $line = $reader->readline();

        for (my $i=$#clients; $i>=0; $i--) {
            unless (defined $clients[$i]->syswrite($line)) {
                # some error
                # we need to do smth
                # let's remove this client
                warn "Error on $clients[$i]: $!";
                splice @clients, $i, 1;
            }
        }
    }
}

my $server = Coro::Socket->new(Listen => 1024, LocalPort => 8000)
    or die "Can't create server", $@;

while (1) {
    my $client = $server->accept()
        or next;
    push @clients, $client;
}
0
Cahu On

Sharing file descriptors among processes might be too much pain for your task. I came across this problem some time ago and I figured it was better to rethink my approach by using a select() call instead.

However, if you really want to achieve this without dependencies, it is possible to pass file descriptors between processes using Unix sockets configured with the right SOL_SOCKET options via the setsockopt() call.