Started the new routing stuff which will run in parallel for a while.
[spider.git] / perl / DXMsg.pm
index 9e4893b23485c386f00f3cda79931247b0a5d2f0..d5631904971148fae3c246cbff338d4033b4cb2b 100644 (file)
@@ -15,8 +15,6 @@
 
 package DXMsg;
 
-@ISA = qw(DXProt DXChannel);
-
 use DXUtil;
 use DXChannel;
 use DXUser;
@@ -28,7 +26,6 @@ use DXDebug;
 use DXLog;
 use IO::File;
 use Fcntl;
-use Carp;
 
 use strict;
 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
@@ -83,13 +80,6 @@ $importfn = "$msgdir/import";       # import directory
                  waitt => '5,Wait until,cldatetime',
                 );
 
-sub DESTROY
-{
-       my $self = shift;
-       undef $self->{lines};
-       undef $self->{gotit};
-}
-
 # allocate a new object
 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
 sub alloc                  
@@ -111,6 +101,7 @@ sub alloc
        $self->{rrreq} = shift;
        $self->{gotit} = [];
        $self->{lastt} = $main::systime;
+       $self->{lines} = [];
     
        return $self;
 }
@@ -123,7 +114,6 @@ sub workclean
        delete $ref->{tonode};
        delete $ref->{fromnode};
        delete $ref->{stream};
-       delete $ref->{lines};
        delete $ref->{file};
        delete $ref->{count};
        delete $ref->{lastt} if exists $ref->{lastt};
@@ -137,20 +127,7 @@ sub process
        # this is periodic processing
        if (!$self || !$line) {
 
-               if ($main::systime > $lastq + $queueinterval) {
-
-                       # wander down the work queue stopping any messages that have timed out
-                       for (keys %busy) {
-                               my $node = $_;
-                               my $ref = $busy{$_};
-                               if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) {
-                                       dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
-                                       $ref->stop_msg($node);
-                                       
-                                       # delay any outgoing messages that fail
-                                       $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
-                               }
-                       }
+               if ($main::systime >= $lastq + $queueinterval) {
 
                        # queue some message if the interval timer has gone off
                        queue_msg(0);
@@ -184,6 +161,7 @@ sub process
 
                        my $t = cltounix($f[5], $f[6]);
                        my $stream = next_transno($f[2]);
+                       $f[13] = $self->call unless $f[13] && $f[13] gt ' ';
                        my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
                        
                        # fill in various forwarding state variables
@@ -194,6 +172,7 @@ sub process
                        $ref->{stream} = $stream;
                        $ref->{count} = 0;      # no of lines between PC31s
                        dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
+                       Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" );
                        $work{"$f[2]$stream"} = $ref; # store in work
                        $busy{$f[2]} = $ref; # set interlock
                        $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
@@ -201,7 +180,7 @@ sub process
 
                        # look to see whether this is a non private message sent to a known callsign
                        my $uref = DXUser->get_current($ref->{to});
-                       if (iscallsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) {
+                       if (is_callsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) {
                                $ref->{private} = 1;
                                dbg('msg', "set bull to $ref->{to} to private");
                        }
@@ -211,6 +190,7 @@ sub process
                if ($pcno == 29) {              # incoming text
                        my $ref = $work{"$f[2]$f[3]"};
                        if ($ref) {
+                               $f[4] =~ s/\%5E/^/g;
                                push @{$ref->{lines}}, $f[4];
                                $ref->{count}++;
                                if ($ref->{count} >= $ref->{linesreq}) {
@@ -236,7 +216,6 @@ sub process
                                $work{"$f[2]$f[3]"} = $ref;     # new ref
                                dbg('msg', "incoming subject ack stream $f[3]\n");
                                $busy{$f[2]} = $ref; # interlock
-                               $ref->{lines} = [];
                                push @{$ref->{lines}}, ($ref->read_msg_body);
                                $ref->send_tranche($self);
                                $ref->{lastt} = $main::systime;
@@ -282,8 +261,8 @@ sub process
                                                        if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) {
                                                                $ref->stop_msg($self->call);
                                                                my $msgno = $m->{msgno};
-                                                               dbg('msg', "duplicate message to $msgno\n");
-                                                               Log('msg', "duplicate message to $msgno");
+                                                               dbg('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno");
+                                                               Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno");
                                                                return;
                                                        }
                                                }
@@ -292,8 +271,7 @@ sub process
                                                $ref->swop_it($self->call);
                                                
                                                # look for 'bad' to addresses 
-#                                              if (grep $ref->{to} eq $_, @badmsg) {
-                                               if ($ref->dump_it($self->call)) {
+                                               if ($ref->dump_it) {
                                                        $ref->stop_msg($self->call);
                                                        dbg('msg', "'Bad' message $ref->{to}");
                                                        Log('msg', "'Bad' message $ref->{to}");
@@ -406,12 +384,7 @@ sub store
 {
        my $ref = shift;
        my $lines = shift;
-       
-       # we only proceed if there are actually any lines in the file
-#      if (!$lines || @{$lines} == 0) {
-#              return;
-#      }
-       
+
        if ($ref->{file}) {                     # a file
                dbg('msg', "To be stored in $ref->{to}\n");
                
@@ -462,15 +435,13 @@ sub del_msg
        my $self = shift;
        
        # remove it from the active message list
-       @msg = map { $_ != $self ? $_ : () } @msg;
-       
-       # belt and braces (one day I will ask someone if this is REALLY necessary)
-       delete $self->{gotit};
-       delete $self->{list};
+       dbg('msg', "\@msg = " . scalar @msg . " before delete");
+       @msg = grep { $_ != $self } @msg;
        
        # remove the file
        unlink filename($self->{msgno});
        dbg('msg', "deleting $self->{msgno}\n");
+       dbg('msg', "\@msg = " . scalar @msg . " after delete");
 }
 
 # clean out old messages from the message queue
@@ -479,18 +450,18 @@ sub clean_old
        my $ref;
        
        # mark old messages for deletion
+       dbg('msg', "\@msg = " . scalar @msg . " before delete");
        foreach $ref (@msg) {
-               if (!$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
+               if (ref($ref) && !$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
                        $ref->{deleteme} = 1;
-                       delete $ref->{gotit};
-                       delete $ref->{list};
                        unlink filename($ref->{msgno});
                        dbg('msg', "deleting old $ref->{msgno}\n");
                }
        }
        
        # remove them all from the active message list
-       @msg = map { $_->{deleteme} ? () : $_ } @msg;
+       @msg = grep { !$_->{deleteme} } @msg;
+       dbg('msg', "\@msg = " . scalar @msg . " after delete");
        $last_clean = $main::systime;
 }
 
@@ -504,17 +475,24 @@ sub read_msg_header
        my @f;
        my $size;
        
-       $file = new IO::File;
-       if (!open($file, $fn)) {
-               print "Error reading $fn $!\n";
+       $file = new IO::File "$fn";
+       if (!$file) {
+           dbg('err', "Error reading $fn $!");
+           Log('err', "Error reading $fn $!");
                return undef;
        }
        $size = -s $fn;
        $line = <$file>;                        # first line
+       if ($size == 0 || !$line) {
+           dbg('err', "Empty $fn $!");
+           Log('err', "Empty $fn $!");
+               return undef;
+       }
        chomp $line;
        $size -= length $line;
        if (! $line =~ /^===/o) {
-               print "corrupt first line in $fn ($line)\n";
+               dbg('err', "corrupt first line in $fn ($line)");
+               Log('err', "corrupt first line in $fn ($line)");
                return undef;
        }
        $line =~ s/^=== //o;
@@ -525,7 +503,8 @@ sub read_msg_header
        chomp $line;
        $size -= length $line;
        if (! $line =~ /^===/o) {
-               print "corrupt second line in $fn ($line)\n";
+           dbg('err', "corrupt second line in $fn ($line)");
+           Log('err', "corrupt second line in $fn ($line)");
                return undef;
        }
        $line =~ s/^=== //o;
@@ -551,7 +530,8 @@ sub read_msg_body
        
        $file = new IO::File;
        if (!open($file, $fn)) {
-               print "Error reading $fn $!\n";
+               dbg('err' ,"Error reading $fn $!");
+               Log('err' ,"Error reading $fn $!");
                return undef;
        }
        @out = map {chomp; $_} <$file>;
@@ -590,15 +570,13 @@ sub queue_msg
        my $call = shift;
        my $ref;
        my $clref;
-       my @nodelist = DXChannel::get_all_ak1a();
        
        # bat down the message list looking for one that needs to go off site and whose
        # nearest node is not busy.
 
        dbg('msg', "queue msg ($sort)\n");
+       my @nodelist = DXChannel::get_all_nodes;
        foreach $ref (@msg) {
-               # firstly, is it private and unread? if so can I find the recipient
-               # in my cluster node list offsite?
 
                # ignore 'delayed' messages until their waiting time has expired
                if (exists $ref->{waitt}) {
@@ -606,20 +584,36 @@ sub queue_msg
                        delete $ref->{waitt};
                } 
 
+               # any time outs?
+               if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) {
+                       my $node = $ref->{tonode};
+                       dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
+                       Log('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
+                       $ref->stop_msg($node);
+                       
+                       # delay any outgoing messages that fail
+                       $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
+                       delete $ref->{lastt};
+                       next;
+               }
+
+               # firstly, is it private and unread? if so can I find the recipient
+               # in my cluster node list offsite?
+
                # deal with routed private messages
-               my $noderef;
+               my $dxchan;
                if ($ref->{private}) {
                        next if $ref->{'read'};           # if it is read, it is stuck here
                        $clref = DXCluster->get_exact($ref->{to});
                        unless ($clref) {             # otherwise look for a homenode
-                               my $uref = DXUser->get($ref->{to});
+                               my $uref = DXUser->get_current($ref->{to});
                                my $hnode =  $uref->homenode if $uref;
                                $clref = DXCluster->get_exact($hnode) if $hnode;
                        }
-                       if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all()) {
+                       if ($clref && !grep { $clref->dxchan == $_ } DXCommandmode::get_all()) {
                                next if $clref->call eq $main::mycall;  # i.e. it lives here
-                               $noderef = $clref->{dxchan};
-                               $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
+                               $dxchan = $clref->dxchan;
+                               $ref->start_msg($dxchan) if $dxchan && !get_busy($dxchan->call)  && $dxchan->state eq 'normal';
                        }
                }
                
@@ -628,13 +622,15 @@ sub queue_msg
                # the nodelist up above, if there are sites that haven't got it yet
                # then start sending it - what happens when we get loops is anyone's
                # guess, use (to, from, time, subject) tuple?
-               foreach $noderef (@nodelist) {
-                       next if $noderef->call eq $main::mycall;
-                       next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
-                       next unless $ref->forward_it($noderef->call);           # check the forwarding file
+               foreach $dxchan (@nodelist) {
+                       my $call = $dxchan->call;
+                       next unless $call;
+                       next if $call eq $main::mycall;
+                       next if ref $ref->{gotit} && grep $_ eq $call, @{$ref->{gotit}};
+                       next unless $ref->forward_it($call);           # check the forwarding file
 
                        # if we are here we have a node that doesn't have this message
-                       $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
+                       $ref->start_msg($dxchan) if !get_busy($call)  && $dxchan->state eq 'normal';
                        last;
                }
 
@@ -664,7 +660,7 @@ sub start_msg
        my ($self, $dxchan) = @_;
        
        dbg('msg', "start msg $self->{msgno}\n");
-       $self->{linesreq} = 5;
+       $self->{linesreq} = 10;
        $self->{count} = 0;
        $self->{tonode} = $dxchan->call;
        $self->{fromnode} = $main::mycall;
@@ -740,9 +736,9 @@ sub init
        my $ref;
                
        # load various control files
-       print "load badmsg: ", (load_badmsg() or "Ok"), "\n";
-       print "load forward: ", (load_forward() or "Ok"), "\n";
-       print "load swop: ", (load_swop() or "Ok"), "\n";
+       dbg('err', "load badmsg: " . (load_badmsg() or "Ok"));
+       dbg('err', "load forward: " . (load_forward() or "Ok"));
+       dbg('err', "load swop: " . (load_swop() or "Ok"));
 
        # read in the directory
        opendir($dir, $msgdir) or confess "can't open $msgdir $!";
@@ -751,13 +747,18 @@ sub init
 
        @msg = ();
        for (sort @dir) {
-               next unless /^m\d+$/o;
+               next unless /^m\d\d\d\d\d\d$/;
                
                $ref = read_msg_header("$msgdir/$_");
-               next unless $ref;
+               unless ($ref) {
+                       dbg('err', "Deleting $_");
+                       Log('err', "Deleting $_");
+                       unlink "$msgdir/$_";
+                       next;
+               }
                
                # delete any messages to 'badmsg.pl' places
-               if (grep $ref->{to} eq $_, @badmsg) {
+               if ($ref->dump_it) {
                        dbg('msg', "'Bad' TO address $ref->{to}");
                        Log('msg', "'Bad' TO address $ref->{to}");
                        $ref->del_msg;
@@ -972,7 +973,6 @@ sub forward_it
 sub dump_it
 {
        my $ref = shift;
-       my $call = shift;
        my $i;
        
        for ($i = 0; $i < @badmsg; $i += 3) {
@@ -1231,6 +1231,9 @@ sub AUTOLOAD
        $name =~ s/.*:://o;
        
        confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
+       # this clever line of code creates a subroutine which takes over from autoload
+       # from OO Perl - Conway
+       *{$AUTOLOAD} = sub {@_ > 1 ? $_[0]->{$name} = $_[1] : $_[0]->{$name}} ;
        @_ ? $self->{$name} = shift : $self->{$name} ;
 }