#
#
-require 5.10;
+require 5.10.1;
# make sure that modules are searched in the order local then perl
BEGIN {
$zombies $root @listeners $lang $myalias @debug $userfn $clusteraddr
$clusterport $mycall $decease $is_win $routeroot $me $reqreg $bumpexisting
$allowdxby $dbh $dsn $dbuser $dbpass $do_xml $systime_days $systime_daystart
- $can_encode $maxconnect_user $maxconnect_node $idle_interval
+ $can_encode $maxconnect_user $maxconnect_node $idle_interval $log_flush_interval
);
@inqueue = (); # the main input queue, an array of hashes
$maxconnect_node = 0; # Ditto but for nodes. In either case if a new incoming connection
# takes the no of references in the routing table above these numbers
# then the connection is refused. This only affects INCOMING connections.
-$idle_interval = 0.100; # the wait between invocations of the main idle loop processing.
+$idle_interval = 0.500; # the wait between invocations of the main idle loop processing.
+$log_flush_interval = 2; # interval to wait between log flushes
+
+our $ending; # signal that we are ending;
+
# send a message to call on conn and disconnect
sub already_conn
$conn->disconnect;
}
-sub error_handler
-{
- my $dxchan = shift;
- $dxchan->{conn}->set_error(undef) if exists $dxchan->{conn};
- $dxchan->disconnect(1);
-}
-
# handle incoming messages
sub new_channel
{
my $dxchan = DXChannel::get($call);
if ($dxchan) {
if ($user && $user->is_node) {
- already_conn($conn, $call, DXM::msg($lang, 'concluster', $call, $main::mycall));
+ already_conn($conn, $call, DXM::msg($lang, 'conother', $call, $main::mycall));
return;
}
if ($bumpexisting) {
$conn->conns($call) if $conn->isa('IntMsg');
# set callbacks
- $conn->set_error(sub {error_handler($dxchan)});
+ $conn->set_error(sub {my $err = shift; LogDbg('DXCommand', "Comms error '$err' received for call $dxchan->{call}"); $dxchan->disconnect(1);});
+ $conn->set_on_eof(sub {$dxchan->disconnect});
$conn->set_rproc(sub {my ($conn,$msg) = @_; $dxchan->rec($msg);});
$dxchan->rec($msg);
}
return \&new_channel;
}
+our $ceasing;
+
# cease running this program, close down all the connections nicely
sub cease
{
my $dxchan;
+ cluck("ceasing") if $ceasing;
+
+ return if $ceasing++;
+
unless ($is_win) {
$SIG{'TERM'} = 'IGNORE';
$SIG{'INT'} = 'IGNORE';
dbg("Local::finish error $@") if $@;
}
- # disconnect nodes
- foreach $dxchan (DXChannel::get_all_nodes) {
- $dxchan->disconnect(2) unless $dxchan == $main::me;
- }
-
- # disconnect users
- foreach $dxchan (DXChannel::get_all_users) {
- $dxchan->disconnect;
- }
# disconnect AGW
AGWMsg::finish();
$dbh->finish if $dbh;
unlink $lockfn;
-# $SIG{__WARN__} = $SIG{__DIE__} = sub {my $a = shift; cluck($a); };
- exit(0);
}
# the reaper of children
AGWMsg::init(\&new_channel);
}
+our $io_disconnected;
+
sub idle_loop
{
my $timenow = time;
- DXChannel::process();
+ BPQMsg::process();
+# DXChannel::process();
# $DB::trace = 0;
DXDb::process();
DXUser::process();
DXDupe::process();
- $systime_days = $days;
- $systime_daystart = $days * 86400;
+ DXCron::process(); # do cron jobs
+ IsoTime::update($systime);
+ DXProt::process(); # process ongoing ak1a pcxx stuff
+ DXConnect::process();
+ DXUser::process();
+ AGWMsg::process();
+
+ Timer::handler();
+ DXLog::flushall();
}
- IsoTime::update($systime);
- DXCron::process(); # do cron jobs
- DXCommandmode::process(); # process ongoing command mode stuff
- DXXml::process();
- DXProt::process(); # process ongoing ak1a pcxx stuff
- DXConnect::process();
- DXMsg::process();
- DXDb::process();
- DXUser::process();
- DXDupe::process();
- AGWMsg::process();
- BPQMsg::process();
-
- Timer::handler();
if (defined &Local::process) {
eval {
};
dbg("Local::process error $@") if $@;
}
+
+ while ($ending) {
+ my $dxchan;
+
+ dbg("DXSpider Ending $ending");
+
+ unless ($io_disconnected++) {
+
+ # disconnect users
+ foreach $dxchan (DXChannel::get_all_users) {
+ $dxchan->disconnect;
+ }
+
+ # disconnect nodes
+ foreach $dxchan (DXChannel::get_all_nodes) {
+ next if $dxchan == $main::me;
+ $dxchan->disconnect(2);
+ }
+ $main::me->disconnect;
+ }
+
+ Mojo::IOLoop->stop if --$ending <= 0;
+ }
}
# prime some signals
unless ($DB::VERSION) {
- $SIG{INT} = $SIG{TERM} = sub { Mojo::IOLoop->stop; };
+ $SIG{INT} = $SIG{TERM} = sub { $ending = 10; };
}
unless ($is_win) {
#open(DB::OUT, "|tee /tmp/aa");
my $main_loop = Mojo::IOLoop->recurring($idle_interval => \&idle_loop);
+my $log_flush_loop = Mojo::IOLoop->recurring($log_flush_interval => \&DXLog::flushall);
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+dbg("After Mojo::IOLoop");
cease(0);
exit(0);
-