fixed the syntax error
[spider.git] / perl / DXMsg.pm
index 168a978280e56337e8b17db7de48e894b92347c1..800fb5e74b26ce25ee86c1f24b20d006e4e6b31d 100644 (file)
@@ -28,12 +28,11 @@ use DXDebug;
 use DXLog;
 use IO::File;
 use Fcntl;
-use Carp;
 
 use strict;
 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
                        @badmsg @swop $swopfn $badmsgfn $forwardfn @forward $timeout $waittime
-                   $queueinterval $lastq $importfn);
+                   $queueinterval $lastq $importfn $minchunk $maxchunk);
 
 %work = ();                                            # outstanding jobs
 @msg = ();                                             # messages we have
@@ -49,6 +48,8 @@ $waittime = 30*60;              # time an aborted outgoing message waits before
 $queueinterval = 1*60;          # run the queue every 1 minute
 $lastq = 0;
 
+$minchunk = 4800;               # minimum chunk size for a split message
+$maxchunk = 6000;               # maximum chunk size
 
 $badmsgfn = "$msgdir/badmsg.pl";    # list of TO address we wont store
 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
@@ -109,6 +110,7 @@ sub alloc
        $self->{rrreq} = shift;
        $self->{gotit} = [];
        $self->{lastt} = $main::systime;
+       $self->{lines} = [];
     
        return $self;
 }
@@ -121,7 +123,6 @@ sub workclean
        delete $ref->{tonode};
        delete $ref->{fromnode};
        delete $ref->{stream};
-       delete $ref->{lines};
        delete $ref->{file};
        delete $ref->{count};
        delete $ref->{lastt} if exists $ref->{lastt};
@@ -135,7 +136,7 @@ sub process
        # this is periodic processing
        if (!$self || !$line) {
 
-               if ($main::systime > $lastq + $queueinterval) {
+               if ($main::systime >= $lastq + $queueinterval) {
 
                        # wander down the work queue stopping any messages that have timed out
                        for (keys %busy) {
@@ -182,6 +183,7 @@ sub process
 
                        my $t = cltounix($f[5], $f[6]);
                        my $stream = next_transno($f[2]);
+                       $f[13] = $self->call unless $f[13] && $f[13] gt ' ';
                        my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
                        
                        # fill in various forwarding state variables
@@ -192,6 +194,7 @@ sub process
                        $ref->{stream} = $stream;
                        $ref->{count} = 0;      # no of lines between PC31s
                        dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
+                       Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" );
                        $work{"$f[2]$stream"} = $ref; # store in work
                        $busy{$f[2]} = $ref; # set interlock
                        $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
@@ -209,6 +212,7 @@ sub process
                if ($pcno == 29) {              # incoming text
                        my $ref = $work{"$f[2]$f[3]"};
                        if ($ref) {
+                               $f[4] =~ s/\%5E/^/g;
                                push @{$ref->{lines}}, $f[4];
                                $ref->{count}++;
                                if ($ref->{count} >= $ref->{linesreq}) {
@@ -234,7 +238,6 @@ sub process
                                $work{"$f[2]$f[3]"} = $ref;     # new ref
                                dbg('msg', "incoming subject ack stream $f[3]\n");
                                $busy{$f[2]} = $ref; # interlock
-                               $ref->{lines} = [];
                                push @{$ref->{lines}}, ($ref->read_msg_body);
                                $ref->send_tranche($self);
                                $ref->{lastt} = $main::systime;
@@ -280,8 +283,8 @@ sub process
                                                        if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) {
                                                                $ref->stop_msg($self->call);
                                                                my $msgno = $m->{msgno};
-                                                               dbg('msg', "duplicate message to $msgno\n");
-                                                               Log('msg', "duplicate message to $msgno");
+                                                               dbg('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno");
+                                                               Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno");
                                                                return;
                                                        }
                                                }
@@ -404,12 +407,7 @@ sub store
 {
        my $ref = shift;
        my $lines = shift;
-       
-       # we only proceed if there are actually any lines in the file
-#      if (!$lines || @{$lines} == 0) {
-#              return;
-#      }
-       
+
        if ($ref->{file}) {                     # a file
                dbg('msg', "To be stored in $ref->{to}\n");
                
@@ -460,7 +458,7 @@ sub del_msg
        my $self = shift;
        
        # remove it from the active message list
-       @msg = map { $_ != $self ? $_ : () } @msg;
+       @msg = grep { ref($_) && $_ != $self } @msg;
        
        # belt and braces (one day I will ask someone if this is REALLY necessary)
        delete $self->{gotit};
@@ -478,7 +476,7 @@ sub clean_old
        
        # mark old messages for deletion
        foreach $ref (@msg) {
-               if (!$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
+               if (ref($ref) && !$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
                        $ref->{deleteme} = 1;
                        delete $ref->{gotit};
                        delete $ref->{list};
@@ -488,7 +486,7 @@ sub clean_old
        }
        
        # remove them all from the active message list
-       @msg = map { $_->{deleteme} ? () : $_ } @msg;
+       @msg = grep { ref($_) && !$_->{deleteme} } @msg;
        $last_clean = $main::systime;
 }
 
@@ -502,17 +500,24 @@ sub read_msg_header
        my @f;
        my $size;
        
-       $file = new IO::File;
-       if (!open($file, $fn)) {
-               print "Error reading $fn $!\n";
+       $file = new IO::File "$fn";
+       if (!$file) {
+           dbg('err', "Error reading $fn $!");
+           Log('err', "Error reading $fn $!");
                return undef;
        }
        $size = -s $fn;
        $line = <$file>;                        # first line
+       if ($size == 0 || !$line) {
+           dbg('err', "Empty $fn $!");
+           Log('err', "Empty $fn $!");
+               return undef;
+       }
        chomp $line;
        $size -= length $line;
        if (! $line =~ /^===/o) {
-               print "corrupt first line in $fn ($line)\n";
+               dbg('err', "corrupt first line in $fn ($line)");
+               Log('err', "corrupt first line in $fn ($line)");
                return undef;
        }
        $line =~ s/^=== //o;
@@ -523,7 +528,8 @@ sub read_msg_header
        chomp $line;
        $size -= length $line;
        if (! $line =~ /^===/o) {
-               print "corrupt second line in $fn ($line)\n";
+           dbg('err', "corrupt second line in $fn ($line)");
+           Log('err', "corrupt second line in $fn ($line)");
                return undef;
        }
        $line =~ s/^=== //o;
@@ -549,10 +555,11 @@ sub read_msg_body
        
        $file = new IO::File;
        if (!open($file, $fn)) {
-               print "Error reading $fn $!\n";
+               dbg('err' ,"Error reading $fn $!");
+               Log('err' ,"Error reading $fn $!");
                return undef;
        }
-       chomp (@out = <$file>);
+       @out = map {chomp; $_} <$file>;
        close($file);
        
        shift @out if $out[0] =~ /^=== /;
@@ -588,12 +595,12 @@ sub queue_msg
        my $call = shift;
        my $ref;
        my $clref;
-       my @nodelist = DXChannel::get_all_ak1a();
        
        # bat down the message list looking for one that needs to go off site and whose
        # nearest node is not busy.
 
        dbg('msg', "queue msg ($sort)\n");
+       my @nodelist = DXChannel::get_all_nodes;
        foreach $ref (@msg) {
                # firstly, is it private and unread? if so can I find the recipient
                # in my cluster node list offsite?
@@ -610,11 +617,11 @@ sub queue_msg
                        next if $ref->{'read'};           # if it is read, it is stuck here
                        $clref = DXCluster->get_exact($ref->{to});
                        unless ($clref) {             # otherwise look for a homenode
-                               my $uref = DXUser->get($ref->{to});
+                               my $uref = DXUser->get_current($ref->{to});
                                my $hnode =  $uref->homenode if $uref;
                                $clref = DXCluster->get_exact($hnode) if $hnode;
                        }
-                       if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) {
+                       if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all()) {
                                next if $clref->call eq $main::mycall;  # i.e. it lives here
                                $noderef = $clref->{dxchan};
                                $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
@@ -738,9 +745,9 @@ sub init
        my $ref;
                
        # load various control files
-       print "load badmsg: ", (load_badmsg() or "Ok"), "\n";
-       print "load forward: ", (load_forward() or "Ok"), "\n";
-       print "load swop: ", (load_swop() or "Ok"), "\n";
+       dbg('err', "load badmsg: " . (load_badmsg() or "Ok"));
+       dbg('err', "load forward: " . (load_forward() or "Ok"));
+       dbg('err', "load swop: " . (load_swop() or "Ok"));
 
        # read in the directory
        opendir($dir, $msgdir) or confess "can't open $msgdir $!";
@@ -749,10 +756,15 @@ sub init
 
        @msg = ();
        for (sort @dir) {
-               next unless /^m\d+$/o;
+               next unless /^m\d\d\d\d\d\d$/;
                
                $ref = read_msg_header("$msgdir/$_");
-               next unless $ref;
+               unless ($ref) {
+                       dbg('err', "Deleting $_");
+                       Log('err', "Deleting $_");
+                       unlink "$msgdir/$_";
+                       next;
+               }
                
                # delete any messages to 'badmsg.pl' places
                if (grep $ref->{to} eq $_, @badmsg) {
@@ -1049,8 +1061,8 @@ sub import_msgs
        # are there any to do in this directory?
        return unless -d $importfn;
        unless (opendir(DIR, $importfn)) {
-               dbg('msg', "can't open $importfn $!");
-               Log('msg', "can't open $importfn $!");
+               dbg('msg', "can\'t open $importfn $!");
+               Log('msg', "can\'t open $importfn $!");
                return;
        } 
 
@@ -1059,18 +1071,19 @@ sub import_msgs
        my $name;
        foreach $name (@names) {
                next if $name =~ /^\./;
+               my $splitit = $name =~ /^split/;
                my $fn = "$importfn/$name";
                next unless -f $fn;
                unless (open(MSG, $fn)) {
-                       dbg('msg', "can't open import file $fn $!");
-                       Log('msg', "can't open import file $fn $!");
+                       dbg('msg', "can\'t open import file $fn $!");
+                       Log('msg', "can\'t open import file $fn $!");
                        unlink($fn);
                        next;
                }
                my @msg = map { chomp; $_ } <MSG>;
                close(MSG);
                unlink($fn);
-               my @out = import_one($DXProt::me, \@msg);
+               my @out = import_one($DXProt::me, \@msg, $splitit);
                Log('msg', @out);
        }
 }
@@ -1081,6 +1094,7 @@ sub import_one
 {
        my $dxchan = shift;
        my $ref = shift;
+       my $splitit = shift;
        my $private = '1';
        my $rr = '0';
        my $notincalls = 1;
@@ -1092,7 +1106,7 @@ sub import_one
        # first line;
        my $line = shift @$ref;
        my @f = split /\s+/, $line;
-       unless ($f[0] =~ /^(:?S|SP|SB|SEND)$/ ) {
+       unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) {
                my $m = "invalid first line in import '$line'";
                dbg('MSG', $m );
                return (1, $m);
@@ -1140,52 +1154,81 @@ sub import_one
                                        }
                                }
                        }
-
+                       
                        if (grep $_ eq $f, @DXMsg::badmsg) {
                                push @out, $dxchan->msg('m3', $f);
                        } else {
-                               push @to, $f;
+                               push @to, $f;
                        }
                }
        }
-
+       
        # subject is the next line
        my $subject = shift @$ref;
        
        # strip off trailing lines 
-       pop @$ref while (@$ref && ($$ref[-1] eq '' || $$ref[-1] =~ /^\s+$/));
-
+       pop @$ref while (@$ref && $$ref[-1] =~ /^\s*$/);
+       
        # strip off /EX or /ABORT
-       return ("aborted") if (@$ref && $$ref[-1] =~ m{^/ABORT$}i)
+       return ("aborted") if @$ref && $$ref[-1] =~ m{^/ABORT$}i
        pop @$ref if (@$ref && $$ref[-1] =~ m{^/EX$}i);                                                                  
 
+       # sort out any splitting that needs to be done
+       my @chunk;
+       if ($splitit) {
+               my $lth = 0;
+               my $lines = [];
+               for (@$ref) {
+                       if ($lth >= $maxchunk || ($lth > $minchunk && /^\s*$/)) {
+                               push @chunk, $lines;
+                               $lines = [];
+                               $lth = 0;
+                       } 
+                       push @$lines, $_;
+                       $lth += length; 
+               }
+               push @chunk, $lines if @$lines;
+       } else {
+               push @chunk, $ref;
+       }
+                                 
     # write all the messages away
-       my $to;
-       foreach $to (@to) {
-               my $systime = $main::systime;
-               my $mycall = $main::mycall;
-               my $mref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
-                                                       $to,
-                                                       $from, 
-                                                       $systime,
-                                                       $private, 
-                                                       $subject, 
-                                                       $origin,
-                                                       '0',
-                                                       $rr);
-               $mref->swop_it($main::mycall);
-               $mref->store($ref);
-               $mref->add_dir();
-               push @out, $dxchan->msg('m11', $mref->{msgno}, $to);
-               #push @out, "msgno $ref->{msgno} sent to $to";
-               my $todxchan = DXChannel->get(uc $to);
-               if ($todxchan) {
-                       if ($todxchan->is_user()) {
-                               $todxchan->send($todxchan->msg('m9'));
+       my $i;
+       for ( $i = 0;  $i < @chunk; $i++) {
+               my $chunk = $chunk[$i];
+               my $ch_subject;
+               if (@chunk > 1) {
+                       my $num = " [" . ($i+1) . "/" . scalar @chunk . "]";
+                       $ch_subject = substr($subject, 0, 27 - length $num) .  $num;
+               } else {
+                       $ch_subject = $subject;
+               }
+               my $to;
+               foreach $to (@to) {
+                       my $systime = $main::systime;
+                       my $mycall = $main::mycall;
+                       my $mref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
+                                                                       $to,
+                                                                       $from, 
+                                                                       $systime,
+                                                                       $private, 
+                                                                       $ch_subject, 
+                                                                       $origin,
+                                                                       '0',
+                                                                       $rr);
+                       $mref->swop_it($main::mycall);
+                       $mref->store($chunk);
+                       $mref->add_dir();
+                       push @out, $dxchan->msg('m11', $mref->{msgno}, $to);
+                       #push @out, "msgno $ref->{msgno} sent to $to";
+                       my $todxchan = DXChannel->get(uc $to);
+                       if ($todxchan) {
+                               if ($todxchan->is_user()) {
+                                       $todxchan->send($todxchan->msg('m9'));
+                               }
                        }
                }
        }
-
        return @out;
 }
 
@@ -1198,6 +1241,9 @@ sub AUTOLOAD
        $name =~ s/.*:://o;
        
        confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
+       # this clever line of code creates a subroutine which takes over from autoload
+       # from OO Perl - Conway
+       *{$AUTOLOAD} = sub {@_ > 1 ? $_[0]->{$name} = $_[1] : $_[0]->{$name}} ;
        @_ ? $self->{$name} = shift : $self->{$name} ;
 }