X-Git-Url: http://www.dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=c81273e29b329a5070b2bd53e34e7371564e3c68;hb=refs%2Fheads%2Fstaging;hp=d62cb744034377f54592a3a7233571f9e88f842d;hpb=06a6935d583a684869129350fed170d467ad8acc;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index d62cb744..c81273e2 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -18,16 +18,20 @@ use Mojo::IOLoop; use Mojo::IOLoop::Stream; use DXDebug; -use Timer; +use DXTimer; -use vars qw($now %conns $noconns $cnum $total_in $total_out); +use vars qw($now %conns $noconns $cnum $total_in $total_out $total_lines_in $total_lines_out $connect_timeout $disc_waittime); $total_in = $total_out = 0; +$total_lines_in = $total_lines_out = 0; $now = time; $cnum = 0; +$connect_timeout = 5; +$disc_waittime = 1.5; +our %delqueue; # #----------------------------------------------------------------- @@ -40,15 +44,19 @@ sub new my $class = $obj || $pkg; my $conn = { - rproc => $rproc, - inqueue => [], - outqueue => [], - state => 0, - lineend => "\r\n", - csort => 'telnet', - timeval => 60, - blocking => 0, - cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)), + rproc => $rproc, + inqueue => [], + outqueue => [], + state => 0, + lineend => "\r\n", + csort => 'telnet', + timeval => 60, + blocking => 0, + cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)), + linesin => 0, + linesout => 0, + datain => 0, + dataout => 0, }; $noconns++; @@ -61,14 +69,14 @@ sub set_error { my $conn = shift; my $callback = shift; - $conn->{sock}->on(error => sub {my ($stream, $err) = @_; $callback->($conn, $err);}); + $conn->{sock}->on(error => sub {$callback->($_[1]);}); } sub set_on_eof { my $conn = shift; my $callback = shift; - $conn->{sock}->on(close => sub {$callback->($conn);}); + $conn->{sock}->on(close => sub {$callback->()}); } sub set_rproc @@ -120,20 +128,70 @@ sub ax25 sub peerhost { my $conn = shift; - $conn->{peerhost} ||= 'ax25' if $conn->ax25; - $conn->{peerhost} ||= $conn->{sock}->handle->peerhost if $conn->{sock}; - $conn->{peerhost} ||= 'UNKNOWN'; + unless ($conn->{peerhost}) { + $conn->{peerhost} ||= 'ax25' if $conn->ax25; + $conn->{peerhost} ||= $conn->{sock}->handle->peerhost if $conn->{sock}; + $conn->{peerhost} ||= 'UNKNOWN'; + } + $conn->{peerhost} =~ s/^::ffff://; return $conn->{peerhost}; } +sub sockhost +{ + my $conn = shift; + unless ($conn->{sockhost}) { + $conn->{sockhost} ||= 'ax25' if $conn->ax25; + $conn->{sockhost} ||= $conn->{sock}->handle->sockhost if $conn->{sock}; + $conn->{sockhost} ||= 'UNKNOWN'; + } + $conn->{sockhost} =~ s/^::ffff://; + if (! defined $main::localhost_alias_ipv4 && $conn->{sockhost} =~ /\./ && $conn->{sockhost} !~ /^127\./) { + $main::localhost_alias_ipv4 = $conn->{sockhost}; + dbg("Msg: localhost_alias_ipv4 = '$main::localhost_alias_ipv4'"); + } elsif (! defined $main::localhost_alias_ipv6 && $conn->{sockhost} =~ /:/ && $conn->{sockhost} !~ /^::1$/) { + $main::localhost_alias_ipv6 = $conn->{sockhost}; + dbg("Msg: localhost_alias_ipv6 = '$main::localhost_alias_ipv6'"); + } + return $conn->{sockhost}; +} #----------------------------------------------------------------- # Send side routines -sub connect { - my ($pkg, $to_host, $to_port, $rproc) = @_; +sub _on_connect +{ + my $conn = shift; + my $handle = shift; + undef $conn->{sock}; + my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle); + $sock->on(read => sub {$conn->_rcv($_[1]);} ); + $sock->on(error => sub {delete $conn->{sock}; $conn->disconnect;}); + $sock->on(close => sub {delete $conn->{sock}; $conn->disconnect;}); + $sock->timeout(0); + $sock->start; + $conn->{peerhost} = eval { $handle->peerhost; }; + $conn->{sockhost} = eval { $handle->sockhost; }; + dbg((ref $conn) . " connected $conn->{cnum}:$conn->{sockhost} to $conn->{peerhost}:$conn->{peerport}") if isdbg('conn') || isdbg ('connect'); + if ($conn->{on_connect}) { + &{$conn->{on_connect}}($conn, $handle); + } +} + +sub is_connected +{ + my $conn = shift; + my $sock = $conn->{sock}; + return ref $sock && $sock->isa('Mojo::IOLoop::Stream'); +} + +sub connect { + my ($pkg, $to_host, $to_port, %args) = @_; + my $timeout = delete $args{timeout} || $connect_timeout; + # Create a connection end-point object my $conn = $pkg; unless (ref $pkg) { + my $rproc = delete $args{rproc}; $conn = $pkg->new($rproc); } $conn->{peerhost} = $to_host; @@ -144,17 +202,26 @@ sub connect { my $sock; $conn->{sock} = $sock = Mojo::IOLoop::Client->new; - $sock->on(connect => sub { dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');}, - error => {$conn->disconnect}, - close => {$conn->disconnect}); + $sock->on(connect => sub { + $conn->_on_connect($_[1]) + } ); + $sock->on(error => sub { + &{$conn->{eproc}}($conn, $_[1]) if exists $conn->{eproc}; + delete $conn->{sock}; + $conn->disconnect + }); + $sock->on(close => sub { + delete $conn->{sock}; + $conn->disconnect} + ); + + # copy any args like on_connect, on_disconnect etc + while (my ($k, $v) = each %args) { + $conn->{$k} = $v; + } - $sock->connect(address => $to_host, port => $to_port); + $sock->connect(address => $to_host, port => $to_port, timeout => $timeout); - dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll'); - - if ($conn->{rproc}) { - $sock->on(read => sub {my ($stream, $msg) = @_; $conn->_rcv($msg);} ); - } return $conn; } @@ -207,16 +274,22 @@ sub start_program return $pid; } -sub disconnect +sub disconnect { - my $conn = shift; - return if exists $conn->{disconnecting}; - - $conn->{disconnecting} = 1; - my $sock = delete $conn->{sock}; - $conn->{state} = 'E'; - $conn->{timeout}->del if $conn->{timeout}; + my $conn = shift; + my $count = $conn->{disconnecting}++; + my $dbg = isdbg('connll'); + my ($pkg, $fn, $line) = caller if $dbg; + + if ($count >= 2) { + dbgtrace((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line FORCING CLOSE ") if $dbg; + _close_it($conn); + return; + } + dbg((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ") if $dbg; + return if $count; + # remove this conn from the active queue # be careful to delete the correct one my $call; if ($call = $conn->{call}) { @@ -224,7 +297,52 @@ sub disconnect delete $conns{$call} if $ref && $ref == $conn; } $call ||= 'unallocated'; - dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll'); + + $delqueue{$conn} = $conn; # save this connection until everything is finished + my $sock = $conn->{sock}; + if ($sock) { + if ($sock->{buffer}) { + my $lth = length $sock->{buffer}; + Mojo::IOLoop->timer($disc_waittime, sub { + dbg("Buffer contained $lth characters, coordinated for $disc_waittime secs, now disconnecting $call") if $dbg; + _close_it($conn); + }); + } else { + dbg("Buffer empty, just close $call") if $dbg; + _close_it($conn); + } + } + else { + dbg((ref $conn) . " socket missing on $conn->{call}") if $dbg; + _close_it($conn); + } +} + +sub _close_it +{ + my $conn = shift; + my $sock = delete $conn->{sock}; + $conn->{state} = 'E'; + $conn->{timeout}->del if $conn->{timeout}; + + my $call = $conn->{call}; + + if (isdbg('connll')) { + my ($pkg, $fn, $line) = caller; + dbg((ref $conn) . "::_close_it on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line "); + } + + + dbg((ref $conn) . " Connection $conn->{cnum} $call starting to close") if isdbg('connll'); + + if ($conn->{on_disconnect}) { + &{$conn->{on_disconnect}}($conn); + } + + if ($sock) { + dbg((ref $conn) . " Connection $conn->{cnum} $call closing gracefully") if isdbg('connll'); + $sock->close_gracefully if $sock->can('close_gracefully'); + } # get rid of any references for (keys %$conn) { @@ -233,9 +351,7 @@ sub disconnect } } - if (defined($sock)) { - $sock->remove; - } + delete $delqueue{$conn}; # finally remove the $conn unless ($main::is_win) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; @@ -247,18 +363,22 @@ sub _send_stuff my $conn = shift; my $rq = $conn->{outqueue}; my $sock = $conn->{sock}; + return unless defined $sock; + return if $conn->{disconnecting}; + while (@$rq) { my $data = shift @$rq; my $lth = length $data; my $call = $conn->{call} || 'none'; if (isdbg('raw')) { - if (isdbg('raw')) { - dbgdump('raw', "$call send $lth: ", $lth); - } + dbgdump('raw', "$call send $lth:", $data); } - if (defined $sock && !$sock->destroyed) { + if (defined $sock) { $sock->write($data); - $total_out = $lth; + $total_out += $lth; + $conn->{dataout} += $lth; + ++$conn->{linesout}; + ++$total_lines_out; } else { dbg("_send_stuff $call ending data ignored: $data"); } @@ -275,6 +395,13 @@ sub send_later { goto &send_now; } +sub send_raw +{ + my ($conn, $msg) = @_; + push @{$conn->{outqueue}}, $msg; + _send_stuff($conn); +} + sub enqueue { my $conn = shift; push @{$conn->{outqueue}}, defined $_[0] ? $_[0] : ''; @@ -300,9 +427,10 @@ sub new_server my ($pkg, $my_host, $my_port, $login_proc) = @_; my $conn = $pkg->new($login_proc); - $conn->{sock} = Mojo::IOLoop::Server->new; - $conn->{sock}->on(accept=>sub{$conn->new_client()}); - $conn->{sock}->listen(address=>$my_host, port=>$my_port); + my $sock = $conn->{sock} = Mojo::IOLoop::Server->new; + $sock->on(accept=>sub{$conn->new_client($_[1]);}); + $sock->listen(address=>$my_host, port=>$my_port); + $sock->start; die "Could not create socket: $! \n" unless $conn->{sock}; return $conn; @@ -326,6 +454,8 @@ sub dequeue } else { $conn->{msg} = pop @lines; } + $conn->{linesin} += @lines; + $total_lines_in += @lines; for (@lines) { last if $conn->{disconnecting}; &{$conn->{rproc}}($conn, defined $_ ? $_ : ''); @@ -335,73 +465,57 @@ sub dequeue sub _rcv { # Complement to _send my $conn = shift; # $rcv_now complement of $flush - # Find out how much has already been received, if at all - my ($msg, $offset, $bytes_to_read, $bytes_read); + my $msg = shift; my $sock = $conn->{sock}; return unless defined($sock); + return if $conn->{disonnecting}; - my @lines; -# if ($conn->{blocking}) { -# blocking($sock, 0); -# $conn->{blocking} = 0; -# } - $bytes_read = sysread ($sock, $msg, 1024, 0); - if (defined ($bytes_read)) { - if ($bytes_read > 0) { - $total_in += $bytes_read; - if (isdbg('raw')) { - my $call = $conn->{call} || 'none'; - dbgdump('raw', "$call read $bytes_read: ", $msg); - } - if ($conn->{echo}) { - my @ch = split //, $msg; - my $out; - for (@ch) { - if (/[\cH\x7f]/) { - $out .= "\cH \cH"; - $conn->{msg} =~ s/.$//; - } else { - $out .= $_; - $conn->{msg} .= $_; - } - } - if (defined $out) { - set_event_handler ($sock, write => sub{$conn->_send(0)}); - push @{$conn->{outqueue}}, $out; + $total_in += length $msg; + $conn->{datain} += length $msg; + + if (isdbg('raw')) { + my $call = $conn->{call} || 'none'; + my $lth = length $msg; + dbgdump('raw', "$call read $lth: ", $msg); + } + if ($conn->{echo}) { + my @ch = split //, $msg; + my $out; + for (@ch) { + if (/[\cH\x7f]/) { + $out .= "\cH \cH"; + $conn->{msg} =~ s/.$//; + } else { + $out .= $_; + $conn->{msg} .= $_; } - } else { - $conn->{msg} .= $msg; } - } + if (defined $out) { + $conn->send_raw($out); + } } else { - if (_err_will_block($!)) { - return ; - } else { - $bytes_read = 0; - } - } + $conn->{msg} .= $msg; + } -FINISH: - if (defined $bytes_read && $bytes_read == 0) { - &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc}; - $conn->disconnect; - } else { - unless ($conn->{disable_read}) { - $conn->dequeue if exists $conn->{msg}; - } + unless ($conn->{disable_read}) { + $conn->dequeue if exists $conn->{msg}; } } sub new_client { my $server_conn = shift; - my $client = shift; + my $handle = shift; my $conn = $server_conn->new($server_conn->{rproc}); - my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($client); + my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle); $sock->on(read => sub {$conn->_rcv($_[1])}); - dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll'); - - my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $client->peerhost, $conn->{peerport} = $client->peerport); + $sock->timeout(0); + $sock->start; + $conn->{peerhost} = $handle->peerhost || 'unknown'; + $conn->{peerport} = $handle->peerport || 0; + $conn->{sockhost} = $handle->sockhost || ''; + dbg((ref $conn) . " accept $conn->{cnum}:$conn->{sockhost} from $conn->{peerhost}:$conn->{peerport}") if isdbg('conn') || isdbg('connect'); + my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost}, $conn->{peerport}); $conn->{sort} = 'Incoming'; if ($eproc) { $conn->{eproc} = $eproc; @@ -412,6 +526,7 @@ sub new_client { &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; $conn->disconnect(); } + return $conn; } sub close_server @@ -463,9 +578,23 @@ sub sleep sub DESTROY { my $conn = shift; + my $call = $conn->{call} || 'unallocated'; + + if (isdbg('connll')) { + my ($pkg, $fn, $line) = caller; + dbgtrace((ref $conn) . "::DESTROY on call $call called from ${pkg}::${fn} line $line "); + } + my $call = $conn->{call} || 'unallocated'; my $host = $conn->{peerhost} || ''; my $port = $conn->{peerport} || ''; + my $sock = $conn->{sock}; + + if ($sock) { + $sock->close_gracefully if $sock->can('close_gracefully'); + delete $conn->{sock}; + } + $noconns--; dbg((ref $conn) . " Connection $conn->{cnum} $call [$host $port] being destroyed (total $noconns)") if isdbg('connll'); }