質問

Recently I ran into a great perl module "AnyEvent", which allows user to do asynchronous/event-driven programing.

Created the following snippet which works fine. The problem I have is that after it opens and closes lots sockets, it quickly exhausted all the client ports ("netstat -ant" shows 20,000+ sockets are in TIME_WAIT state).

$hdl = new AnyEvent::Handle (
  connect => [$ip, $port],
  on_connect=> sub {
      my ($handle, $host, $port, $tmp) = @_;
      #print "connect routine for $handle->{ue}\r\n";
      #update states.
  },
  on_read => sub {
      my $hdl = $_[0];
      #read data
      #send response.
  });

I wonder if it's possible to create TCP socket with IO::Socket::INET and then use the newly created socket in AnyEvent::Handle:

my $sock = IO::Socket::INET->new( Proto    => 'tcp',
                             PeerAddr => $ue->{vars}->{ip},
                             PeerPort => $ue->{vars}->{dstPort},
                             ReusePort => 1,
            KeepAlive => 1
) || die "failed to setup outsock $@\n";
$hdl = new AnyEvent::Handle (
  fh => $sock,
  on_connect=> sub {
      my ($handle, $host, $port, $tmp) = @_;
      #print "connect routine for $handle->{ue}\r\n";
      #update states.
  },
  on_read => sub {
      my $hdl = $_[0];
      #read data
      #send response.
  });

Tried it but it doesn't work. Appreciate any suggestions/comments.

Thanks to ikegami who looked into it and gave an suggestion. However, it seems that SO_REUSEADDR doesn't take effect. Here is the code I used (based on his suggestion)

use strict;
use warnings;

use AnyEvent           qw( );
use AnyEvent::Handle   qw( );
use AnyEvent::Impl::EV qw( );
use AnyEvent::Socket   qw( tcp_connect );
use Socket             qw( SOL_SOCKET SO_REUSEPORT SO_REUSEADDR);

my $ts = 0;
my $trans = 0;
my $currentTS;

sub transaction {
   my ($host, $port) = @_;
   tcp_connect($host, $port, sub {
      my ($sock) = @_
         or die "Can't connect: $!";

      my $handle;
      $handle = AnyEvent::Handle->new(
         fh => $sock,
         on_eof => sub {
            $handle->destroy();
         },
         on_read => sub {
            my ($handle) = @_;
            #print $handle->rbuf();
            $trans ++;
            $currentTS = time();
            if ($currentTS > $ts) {
                $ts = $currentTS;
                print "$trans\n";
            }
            #printf "recved %d bytes of data\n", length($handle->rbuf);
            # This should continue to read until header +
            # Content-Length bytes have been read instead
            # of stopping after one read.
            if (length($handle->rbuf) > 0) {
                $handle->destroy();
            }
         },
      );
      $handle->push_write("GET /s HTTP/1.1\r\nHost: $host\r\n\r\n");
      #$handle->push_shutdown();  # Done writing.
   }, sub {
      my ($sock) = @_;

      #setsockopt($sock, SOL_SOCKET, SO_REUSEPORT, 1) or die $!;
      setsockopt($sock, SOL_SOCKET, SO_REUSEADDR, 1)  or die $!;
        #   die "failed to set linger $!\n";
      return undef;
   });
}
{
   my $cv = AnyEvent->condvar();

   my $t = AnyEvent->timer(after=>0.001, interval=>1, cb=> sub {
      transaction("10.3.0.6", 80 );
   });

   $cv->recv();
}

My system is Ubuntu 11.04. In directory /proc/sys/net/ipv4, here are the content of two files:

% more tcp_tw_recycle

1

% more tcp_tw_reuse

1

役に立ちましたか?

解決

I can't run the following because Windows doesn't provide SO_REUSEPORT, but I'm extremely confident that the following does what you requested.

That said, I'm not sure it will help. From what I've read, SO_REUSEPORT allows you to bind to an already active port, but you're not binding to any port.

use strict;
use warnings;

use AnyEvent           qw( );
use AnyEvent::Handle   qw( );
use AnyEvent::Impl::EV qw( );
use AnyEvent::Socket   qw( tcp_connect );
use Socket             qw( SOL_SOCKET SO_REUSEPORT );

sub transaction {
   my ($host, $port) = @_;
   tcp_connect($host, $port, sub {
      my ($sock) = @_
         or die "Can't connect: $!";

      my $handle;
      $handle = AnyEvent::Handle->new(
         fh => $sock,
         on_eof => sub {
            $handle->destroy();
         },
         on_read => sub {
            my ($handle) = @_;
            print $handle->rbuf();

            # This should continue to read until header +
            # Content-Length bytes have been read instead
            # of stopping after one read.
            $handle->destroy();
         },
      );

      $handle->push_write("GET / HTTP/1.1\r\nHost: $host\r\n\r\n");
      $handle->push_shutdown();  # Done writing.
   }, sub {
      my ($sock) = @_;

      setsockopt($sock, SOL_SOCKET, SO_REUSEPORT, 1)
         or die $!;

      return undef;
   });
}

{
   my $cv = AnyEvent->condvar();

   my $t = AnyEvent->timer(after=>0.001, interval=>0.001, cb=> sub {
      transaction("localhost", $ARGV[0] // die("usage"));
   });

   $cv->recv();
}

Server used for testing:

use strict;
use warnings;
use 5.010;

use IO::Socket::INET qw( );
use Socket           qw( inet_ntoa );

my $serv = IO::Socket::INET->new(
   Listen => 1,
);

say inet_ntoa($serv->sockaddr) . ":" . $serv->sockport;

while (my $client = $serv->accept()) {
   say "Connection from ".inet_ntoa($client->peeraddr).":".$client->peerport;
   while (<$client>) {
      last if /^(?:\r?\n)?\z/;
   }

   say $client "HTTP/1.1 200 OK\r\n"
      .        "Content-Type: text/plain\r\n"
      .        "\r\n"
      .        "Hello\n";

   say "   done.";
}

他のヒント

What you are doing cannot be done with TCP/IP - what happens is that you are running into a protection mechanism (the TIME_WAIT state) that ensures that the (srcip, srcport, dstip, dstport) tuples are no longer in use. Neither REUSEADDR nor REUSEPORT nor IO::Socket can help here - it's a protocol limitation.

The only legal way out is to not create identical tuples too quickly, for example, by using different source IP addresses or reusing existing tcp conenctions for more transaction. Or even use multiple destination ports or ip addresses - anything that makes the tuples more "unique" will help.

If you are desperate, you could enable unsafe port reuse in your OS, but that brings the real danger of data corruption, so don't do this when other people's data is involved :)

And as far as I can see, ikegamis example snippets are quite good and show nicely how to "import" sockets from IO::Socket, and how to use tcp_connect to bind a socket. If you chose to use multiple source ip addresses then his code shows how to bind to them.

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top