X-Git-Url: http://www.dxcluster.org/gitweb/gitweb.cgi?p=spider.git;a=blobdiff_plain;f=perl%2FDXMsg.pm;h=153a2c1b1dc998379fd3e69ea999d7bd3f6a47d9;hp=f136b16e7b23750192b817f27055caf0592e8fef;hb=48f0cb90d0cfbe3037f353fc25adfc33561634fa;hpb=77fd72a34c14013dcb9430408e83cb30581f7cc2 diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index f136b16e..153a2c1b 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -4,7 +4,7 @@ # # Copyright (c) 1998 Dirk Koopman G1TLH # -# $Id$ +# # # # Notes for implementors:- @@ -26,9 +26,15 @@ use DXLog; use IO::File; use Fcntl; +eval { + require Net::SMTP; +}; + use strict; -use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean + +use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean $residencetime @badmsg @swop $swopfn $badmsgfn $forwardfn @forward $timeout $waittime + $email_server $email_prog $email_from $queueinterval $lastq $importfn $minchunk $maxchunk $bulltopriv); %work = (); # outstanding jobs @@ -48,7 +54,10 @@ $lastq = 0; $minchunk = 4800; # minimum chunk size for a split message $maxchunk = 6000; # maximum chunk size $bulltopriv = 1; # convert msgs with callsigns to private if they are bulls - +$residencetime = 2*86400; # keep deleted messages for this amount of time +$email_server = undef; # DNS address of smtp server if 'smtp' +$email_prog = undef; # program name + args for sending mail +$email_from = undef; # the from address the email will appear to be from $badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store $forwardfn = "$msgdir/forward.pl"; # the forwarding table @@ -57,30 +66,40 @@ $importfn = "$msgdir/import"; # import directory %valid = ( - fromnode => '5,From Node', - tonode => '5,To Node', - to => '0,To', - from => '0,From', - t => '0,Msg Time,cldatetime', - private => '5,Private', - subject => '0,Subject', - linesreq => '0,Lines per Gob', - rrreq => '5,Read Confirm', - origin => '0,Origin', - lines => '5,Data', - stream => '9,Stream No', + 'read' => '5,Times read', count => '5,Gob Linecnt', + delete => '5,Awaiting Delete,yesno', + deletetime => '5,Deletion Time,cldatetime', file => '5,File?,yesno', + from => '0,From', + fromnode => '5,From Node', gotit => '5,Got it Nodes,parray', - lines => '5,Lines,parray', - 'read' => '5,Times read', - size => '0,Size', - msgno => '0,Msgno', keep => '0,Keep this?,yesno', lastt => '5,Last processed,cldatetime', + lines => '5,Data', + lines => '5,Lines,parray', + linesreq => '0,Lines per Gob', + msgno => '0,Msgno', + origin => '0,Origin', + private => '5,Private,yesno', + rrreq => '5,Read Confirm,yesno', + size => '0,Size', + stream => '9,Stream No', + subject => '0,Subject', + t => '0,Msg Time,cldatetime', + to => '0,To', + tonode => '5,To Node', waitt => '5,Wait until,cldatetime', ); +# fix up the default sendmail if available +for (qw(/usr/sbin/sendmail /usr/lib/sendmail /usr/sbin/sendmail)) { + if (-e $_) { + $email_prog = $_; + last; + } +} + # allocate a new object # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper sub alloc @@ -92,7 +111,6 @@ sub alloc # $to =~ s/-\d+$//o; $self->{to} = ($to eq $main::mycall) ? $main::myalias : $to; my $from = shift; - $from =~ s/-\d+$//o; $self->{from} = uc $from; $self->{t} = shift; $self->{private} = shift; @@ -100,296 +118,405 @@ sub alloc $self->{origin} = shift; $self->{'read'} = shift; $self->{rrreq} = shift; + $self->{delete} = shift; + $self->{deletetime} = shift || ($self->{t} + $maxage); + $self->{keep} = shift; $self->{gotit} = []; # $self->{lastt} = $main::systime; $self->{lines} = []; - $self->{private} = 1 if $bulltopriv && DXUser->get_current($self->{to}); + $self->{private} = 1 if $bulltopriv && DXUser::get_current($self->{to}); return $self; } -sub workclean -{ - my $ref = shift; - delete $ref->{lines}; - delete $ref->{linesreq}; - delete $ref->{tonode}; - delete $ref->{fromnode}; - delete $ref->{stream}; - delete $ref->{file}; - delete $ref->{count}; - delete $ref->{lastt} if exists $ref->{lastt}; - delete $ref->{waitt} if exists $ref->{waitt}; -} sub process { - my ($self, $line) = @_; - # this is periodic processing - if (!$self || !$line) { + if ($main::systime >= $lastq + $queueinterval) { - if ($main::systime >= $lastq + $queueinterval) { - - # queue some message if the interval timer has gone off - queue_msg(0); - - # import any messages in the import directory - import_msgs(); - - $lastq = $main::systime; - } - - # clean the message queue - clean_old() if $main::systime - $last_clean > 3600 ; - return; + # queue some message if the interval timer has gone off + queue_msg(0); + + # import any messages in the import directory + import_msgs(); + + $lastq = $main::systime; } - my @f = split /\^/, $line; - my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number + # clean the message queue + if ($main::systime >= $last_clean+3600) { + clean_old(); + $last_clean = $main::systime; + } + + # actual remove all the 'deleted' messages in one hit. + # this has to be delayed until here otherwise it only does one at + # a time because @msg is rewritten everytime del_msg is called. + my @del = grep {!$_->{tonode} && $_->{delete} && !$_->{keep} && $_->{deletetime} < $main::systime} @msg; + for (@del) { + $_->del_msg; + } + +} - SWITCH: { - if ($pcno == 28) { # incoming message +# incoming message +sub handle_28 +{ + my $dxchan = shift; + my ($tonode, $fromnode) = @_[1..2]; - # sort out various extant protocol errors that occur - my ($fromnode, $origin); - if ($self->is_arcluster && $f[13] eq $self->call) { - $fromnode = $f[13]; - $origin = $f[2]; - } else { - $fromnode = $f[2]; - $origin = $f[13]; - } - $origin = $self->call unless $origin && $origin gt ' '; + # sort out various extant protocol errors that occur + my $origin = $_[13]; + $origin = $dxchan->call unless $origin && $origin gt ' '; - # first look for any messages in the busy queue - # and cancel them this should both resolve timed out incoming messages - # and crossing of message between nodes, incoming messages have priority + # first look for any messages in the busy queue + # and cancel them this should both resolve timed out incoming messages + # and crossing of message between nodes, incoming messages have priority - if (exists $busy{$fromnode}) { - my $ref = $busy{$fromnode}; - my $tonode = $ref->{tonode}; - dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$tonode") if isdbg('msg'); - $ref->stop_msg($self->call); - } + my $ref = get_busy($fromnode); + if ($ref) { + my $otonode = $ref->{tonode} || "unknown"; + dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$otonode") if isdbg('msg'); + $ref->stop_msg($fromnode); + } - my $t = cltounix($f[5], $f[6]); - my $stream = next_transno($fromnode); - my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]); + my $t = cltounix($_[5], $_[6]); + my $stream = next_transno($fromnode); + $ref = DXMsg->alloc($stream, uc $_[3], $_[4], $t, $_[7], $_[8], $origin, '0', $_[11]); - # fill in various forwarding state variables - $ref->{fromnode} = $fromnode; - $ref->{tonode} = $f[1]; - $ref->{rrreq} = $f[11]; - $ref->{linesreq} = $f[10]; - $ref->{stream} = $stream; - $ref->{count} = 0; # no of lines between PC31s - dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg'); - Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" ); - $work{"$fromnode$stream"} = $ref; # store in work - $busy{$fromnode} = $ref; # set interlock - $self->send(DXProt::pc30($fromnode, $f[1], $stream)); # send ack - $ref->{lastt} = $main::systime; - - # look to see whether this is a non private message sent to a known callsign - my $uref = DXUser->get_current($ref->{to}); - if (is_callsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { - $ref->{private} = 1; - dbg("set bull to $ref->{to} to private") if isdbg('msg'); - } - last SWITCH; - } - - if ($pcno == 29) { # incoming text - my $ref = $work{"$f[2]$f[3]"}; - if ($ref) { - $f[4] =~ s/\%5E/^/g; - push @{$ref->{lines}}, $f[4]; - $ref->{count}++; - if ($ref->{count} >= $ref->{linesreq}) { - $self->send(DXProt::pc31($f[2], $f[1], $f[3])); - dbg("stream $f[3]: $ref->{count} lines received\n") if isdbg('msg'); - $ref->{count} = 0; - } - $ref->{lastt} = $main::systime; - } else { - dbg("PC29 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream - } - last SWITCH; - } + # fill in various forwarding state variables + $ref->{fromnode} = $fromnode; + $ref->{tonode} = $tonode; + $ref->{rrreq} = $_[11]; + $ref->{linesreq} = $_[10]; + $ref->{stream} = $stream; + $ref->{count} = 0; # no of lines between PC31s + dbg("new message from $_[4] to $_[3] '$_[8]' stream $fromnode/$stream\n") if isdbg('msg'); + Log('msg', "Incoming message $_[4] to $_[3] '$_[8]' origin: $origin" ); + set_fwq($fromnode, $stream, $ref); # store in work + set_busy($fromnode, $ref); # set interlock + $dxchan->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack + $ref->{lastt} = $main::systime; + + # look to see whether this is a non private message sent to a known callsign + my $uref = DXUser::get_current($ref->{to}); + if (is_callsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { + $ref->{private} = 1; + dbg("set bull to $ref->{to} to private") if isdbg('msg'); + Log('msg', "set bull to $ref->{to} to private"); + } +} - if ($pcno == 30) { # this is a incoming subject ack - my $ref = $work{$f[2]}; # note no stream at this stage - if ($ref) { - delete $work{$f[2]}; - $ref->{stream} = $f[3]; - $ref->{count} = 0; - $ref->{linesreq} = 5; - $work{"$f[2]$f[3]"} = $ref; # new ref - dbg("incoming subject ack stream $f[3]\n") if isdbg('msg'); - $busy{$f[2]} = $ref; # interlock - push @{$ref->{lines}}, ($ref->read_msg_body); - $ref->send_tranche($self); - $ref->{lastt} = $main::systime; +# incoming text +sub handle_29 +{ + my $dxchan = shift; + my ($tonode, $fromnode, $stream) = @_[1..3]; + + my $ref = get_fwq($fromnode, $stream); + if ($ref) { + $_[4] =~ s/\%5E/^/g; + if (@{$ref->{lines}}) { + push @{$ref->{lines}}, $_[4]; + } else { + # temporarily store any R: lines so that we end up with + # only the first and last ones stored. + if ($_[4] =~ m|^R:\d{6}/\d{4}|) { + push @{$ref->{tempr}}, $_[4]; } else { - dbg("PC30 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + if (exists $ref->{tempr}) { + push @{$ref->{lines}}, shift @{$ref->{tempr}}; + push @{$ref->{lines}}, pop @{$ref->{tempr}} if @{$ref->{tempr}}; + delete $ref->{tempr}; + } + push @{$ref->{lines}}, $_[4]; } - last SWITCH; } - - if ($pcno == 31) { # acknowledge a tranche of lines - my $ref = $work{"$f[2]$f[3]"}; - if ($ref) { - dbg("tranche ack stream $f[3]\n") if isdbg('msg'); - $ref->send_tranche($self); - $ref->{lastt} = $main::systime; - } else { - dbg("PC31 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream - } - last SWITCH; + $ref->{count}++; + if ($ref->{count} >= $ref->{linesreq}) { + $dxchan->send(DXProt::pc31($fromnode, $tonode, $stream)); + dbg("stream $stream: $ref->{count} lines received\n") if isdbg('msg'); + $ref->{count} = 0; } + $ref->{lastt} = $main::systime; + } else { + dbg("PC29 from unknown stream $stream from $fromnode") if isdbg('msg'); + $dxchan->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream + } +} - if ($pcno == 32) { # incoming EOM - dbg("stream $f[3]: EOM received\n") if isdbg('msg'); - my $ref = $work{"$f[2]$f[3]"}; - if ($ref) { - $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it +# this is a incoming subject ack +sub handle_30 +{ + my $dxchan = shift; + my ($tonode, $fromnode, $stream) = @_[1..3]; + + my $ref = get_fwq($fromnode); # note no stream at this stage + if ($ref) { + del_fwq($fromnode); + $ref->{stream} = $stream; + $ref->{count} = 0; + $ref->{linesreq} = 5; + set_fwq($fromnode, $stream, $ref); # new ref + set_busy($fromnode, $ref); # interlock + dbg("incoming subject ack stream $stream\n") if isdbg('msg'); + $ref->{lines} = [ $ref->read_msg_body ]; + $ref->send_tranche($dxchan); + $ref->{lastt} = $main::systime; + } else { + dbg("PC30 from unknown stream $stream from $fromnode") if isdbg('msg'); + $dxchan->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream + } +} + +# acknowledge a tranche of lines +sub handle_31 +{ + my $dxchan = shift; + my ($tonode, $fromnode, $stream) = @_[1..3]; + + my $ref = get_fwq($fromnode, $stream); + if ($ref) { + dbg("tranche ack stream $stream\n") if isdbg('msg'); + $ref->send_tranche($dxchan); + $ref->{lastt} = $main::systime; + } else { + dbg("PC31 from unknown stream $stream from $fromnode") if isdbg('msg'); + $dxchan->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream + } +} + +# incoming EOM +sub handle_32 +{ + my $dxchan = shift; + my ($tonode, $fromnode, $stream) = @_[1..3]; + + dbg("stream $stream: EOM received\n") if isdbg('msg'); + my $ref = get_fwq($fromnode, $stream); + if ($ref) { + $dxchan->send(DXProt::pc33($fromnode, $tonode, $stream)); # acknowledge it - # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol - # store the file or message - # remove extraneous rubbish from the hash - # remove it from the work in progress vector - # stuff it on the msg queue - if ($ref->{lines}) { - if ($ref->{file}) { - $ref->store($ref->{lines}); - } else { - - # does an identical message already exist? - my $m; - for $m (@msg) { - if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) { - $ref->stop_msg($self->call); - my $msgno = $m->{msgno}; - dbg("duplicate message from $ref->{from} -> $ref->{to} to $msgno") if isdbg('msg'); - Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno"); - return; - } - } - - # swop addresses - $ref->swop_it($self->call); + # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol + # store the file or message + # remove extraneous rubbish from the hash + # remove it from the work in progress vector + # stuff it on the msg queue + if ($ref->{lines}) { + if ($ref->{file}) { + $ref->store($ref->{lines}); + } else { + + # is it too old + if ($ref->{t}+$maxage < $main::systime ) { + $ref->stop_msg($fromnode); + dbg("old message from $ref->{from} -> $ref->{to} " . atime($ref->{t}) . " ignored") if isdbg('msg'); + Log('msg', "old message from $ref->{from} -> $ref->{to} " . cldatetime($ref->{t}) . " ignored"); + return; + } + + # does an identical message already exist? + my $m; + for $m (@msg) { + if (substr($ref->{subject},0,28) eq substr($m->{subject},0,28) && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) { + $ref->stop_msg($fromnode); + my $msgno = $m->{msgno}; + dbg("duplicate message from $ref->{from} -> $ref->{to} to msg: $msgno") if isdbg('msg'); + Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to msg: $msgno"); + return; + } + } + + # swop addresses + $ref->swop_it($dxchan->call); - # look for 'bad' to addresses - if ($ref->dump_it) { - $ref->stop_msg($self->call); - dbg("'Bad' message $ref->{to}") if isdbg('msg'); - Log('msg', "'Bad' message $ref->{to}"); - return; - } - - $ref->{msgno} = next_transno("Msgno"); - push @{$ref->{gotit}}, $f[2]; # mark this up as being received - $ref->store($ref->{lines}); - add_dir($ref); - my $dxchan = DXChannel->get($ref->{to}); - $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user; - Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}"); + # look for 'bad' to addresses + if ($ref->dump_it($dxchan->call)) { + $ref->stop_msg($fromnode); + dbg("'Bad' message $ref->{to}") if isdbg('msg'); + Log('msg', "'Bad' message $ref->{to}"); + return; + } + + # check the message for bad words + my @bad; + my @words; + @bad = BadWords::check($ref->{subject}); + push @words, [$ref->{subject}, @bad] if @bad; + for (@{$ref->{lines}}) { + @bad = BadWords::check($_); + push @words, [$_, @bad] if @bad; + } + if (@words) { + LogDbg('msg',"$ref->{from} swore: $ref->{to} origin: $ref->{origin} via " . $dxchan->call); + LogDbg('msg',"subject: $ref->{subject}"); + for (@words) { + my $r = $_; + my $line = shift @$r; + LogDbg('msg', "line: $line (using words: ". join(',', @$r).")"); } + $ref->stop_msg($fromnode); + return; } - $ref->stop_msg($self->call); - } else { - dbg("PC32 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + + $ref->{msgno} = next_transno("Msgno"); + push @{$ref->{gotit}}, $fromnode; # mark this up as being received + $ref->store($ref->{lines}); + $ref->notify; + add_dir($ref); + Log('msg', "Message $ref->{msgno} from $ref->{from} received from $fromnode for $ref->{to}"); } - # queue_msg(0); - last SWITCH; } + $ref->stop_msg($fromnode); + } else { + dbg("PC32 from unknown stream $stream from $fromnode") if isdbg('msg'); + $dxchan->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream + } + # queue_msg(0); +} - if ($pcno == 33) { # acknowledge the end of message - my $ref = $work{"$f[2]$f[3]"}; - if ($ref) { - if ($ref->{private}) { # remove it if it private and gone off site# - Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2] and deleted"); - $ref->del_msg; - } else { - Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2]"); - push @{$ref->{gotit}}, $f[2]; # mark this up as being received - $ref->store($ref->{lines}); # re- store the file - } - $ref->stop_msg($self->call); - } else { - dbg("PC33 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream - } - - # send next one if present - queue_msg(0); - last SWITCH; +# acknowledge the end of message +sub handle_33 +{ + my $dxchan = shift; + my ($tonode, $fromnode, $stream) = @_[1..3]; + + my $ref = get_fwq($fromnode, $stream); + if ($ref) { + if ($ref->{private}) { # remove it if it private and gone off site# + Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode and deleted"); + $ref->mark_delete; + } else { + Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode"); + push @{$ref->{gotit}}, $fromnode; # mark this up as being received + $ref->store($ref->{lines}); # re- store the file } + $ref->stop_msg($fromnode); + } else { + dbg("PC33 from unknown stream $stream from $fromnode") if isdbg('msg'); + $dxchan->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream + } + + # send next one if present + queue_msg(0); +} - if ($pcno == 40) { # this is a file request - $f[3] =~ s/\\/\//og; # change the slashes - $f[3] =~ s/\.//og; # remove dots - $f[3] =~ s/^\///o; # remove the leading / - $f[3] = lc $f[3]; # to lower case; - dbg("incoming file $f[3]\n") if isdbg('msg'); - $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o; - - # create any directories - my @part = split /\//, $f[3]; - my $part; - my $fn = "$main::root"; - pop @part; # remove last part - foreach $part (@part) { - $fn .= "/$part"; - next if -e $fn; - last SWITCH if !mkdir $fn, 0777; - dbg("created directory $fn\n") if isdbg('msg'); - } - my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0'); +# this is a file request +sub handle_40 +{ + my $dxchan = shift; + my ($tonode, $fromnode) = @_[1..2]; + + $_[3] =~ s/\\/\//og; # change the slashes + $_[3] =~ s/\.//og; # remove dots + $_[3] =~ s/^\///o; # remove the leading / + $_[3] = lc $_[3]; # to lower case; + dbg("incoming file $_[3]\n") if isdbg('msg'); + $_[3] = 'packclus/' . $_[3] unless $_[3] =~ /^packclus\//o; - # forwarding variables - $ref->{fromnode} = $f[1]; - $ref->{tonode} = $f[2]; - $ref->{linesreq} = $f[5]; - $ref->{stream} = $stream; - $ref->{count} = 0; # no of lines between PC31s - $ref->{file} = 1; - $ref->{lastt} = $main::systime; - $work{"$f[2]$stream"} = $ref; # store in work - $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack + # create any directories + my @part = split /\//, $_[3]; + my $part; + my $fn = "$main::root"; + pop @part; # remove last part + foreach $part (@part) { + $fn .= "/$part"; + next if -e $fn; + last SWITCH if !mkdir $fn, 0777; + dbg("created directory $fn\n") if isdbg('msg'); + } + my $stream = next_transno($fromnode); + my $ref = DXMsg->alloc($stream, "$main::root/$_[3]", $dxchan->call, time, !$_[4], $_[3], ' ', '0', '0'); - last SWITCH; - } + # forwarding variables + $ref->{fromnode} = $tonode; + $ref->{tonode} = $fromnode; + $ref->{linesreq} = $_[5]; + $ref->{stream} = $stream; + $ref->{count} = 0; # no of lines between PC31s + $ref->{file} = 1; + $ref->{lastt} = $main::systime; + set_fwq($fromnode, $stream, $ref); # store in work + $dxchan->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack +} - if ($pcno == 42) { # abort transfer - dbg("stream $f[3]: abort received\n") if isdbg('msg'); - my $ref = $work{"$f[2]$f[3]"}; - if ($ref) { - $ref->stop_msg($self->call); - $ref = undef; - } - last SWITCH; +# abort transfer +sub handle_42 +{ + my $dxchan = shift; + my ($tonode, $fromnode, $stream) = @_[1..3]; + + dbg("stream $stream: abort received\n") if isdbg('msg'); + my $ref = get_fwq($fromnode, $stream); + if ($ref) { + $ref->stop_msg($fromnode); + $ref = undef; + } +} + +# global delete on subject +sub handle_49 +{ + my $dxchan = shift; + my $line = shift; + + for (@msg) { + if ($_->{from} eq $_[1] && $_->{subject} eq $_[2]) { + $_->mark_delete; + Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted"); + DXChannel::broadcast_nodes($line, $dxchan); } + } +} - if ($pcno == 49) { # global delete on subject - for (@msg) { - if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) { - $_->del_msg(); - Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted"); - DXProt::broadcast_ak1a($line, $self); + + +sub notify +{ + my $ref = shift; + my $to = $ref->{to}; + my $uref = DXUser::get_current($to); + my $dxchan = DXChannel::get($to); + if (((*Net::SMTP && $email_server) || $email_prog) && $uref && $uref->wantemail) { + my $email = $uref->email; + if ($email) { + my @rcpt = ref $email ? @{$email} : $email; + my $fromaddr = $email_from || $main::myemail; + my @headers = ("To: $ref->{to}", + "From: $fromaddr", + "Subject: [DXSpider: $ref->{from}] $ref->{subject}", + "X-DXSpider-To: $ref->{to}", + "X-DXSpider-From: $ref->{from}\@$ref->{origin}", + "X-DXSpider-Gateway: $main::mycall" + ); + my @data = ("Msgno: $ref->{msgno} To: $to From: $ref->{from}\@$ref->{origin} Gateway: $main::mycall", + "", + $ref->read_msg_body + ); + my $msg; + undef $!; + if (*Net::SMTP && $email_server) { + $msg = Net::SMTP->new($email_server); + if ($msg) { + $msg->mail($fromaddr); + $msg->to(@rcpt); + $msg->data(map {"$_\n"} @headers, '', @data); + $msg->quit; + } + } elsif ($email_prog) { + $msg = new IO::File "|$email_prog " . join(' ', @rcpt); + if ($msg) { + print $msg map {"$_\r\n"} @headers, '', @data, '.'; + $msg->close; } } + dbg("email forwarding error $!") if isdbg('msg') && !$msg && defined $!; } } + $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user; } - # store a message away on disc or whatever # # NOTE the second arg is a REFERENCE not a list @@ -425,11 +552,15 @@ sub store if (defined $fh) { my $rr = $ref->{rrreq} ? '1' : '0'; my $priv = $ref->{private} ? '1': '0'; - print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{'read'}^$rr\n"; + my $del = $ref->{delete} ? '1' : '0'; + my $delt = $ref->{deletetime} || ($ref->{t} + $maxage); + my $keep = $ref->{keep} || '0'; + print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{'read'}^$rr^$del^$delt^$keep\n"; print $fh "=== ", join('^', @{$ref->{gotit}}), "\n"; my $line; $ref->{size} = 0; foreach $line (@{$lines}) { + $line =~ s/[\x00-\x08\x0a-\x1f\x80-\x9f]/./g; $ref->{size} += (length $line) + 1; print $fh "$line\n"; } @@ -440,21 +571,53 @@ sub store confess "can't open msg file $fn $!"; } } + } # delete a message sub del_msg { my $self = shift; + my $dxchan = shift; + my $call = ''; + $call = ' by ' . $dxchan->call if $dxchan; - # remove it from the active message list - dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg'); - @msg = grep { $_ != $self } @msg; + if ($self->{tonode}) { + $self->{delete}++; + $self->{deletetime} = 0; + dbg("Msgno $self->{msgno} but marked as expunged$call") if isdbg('msg'); + } else { + # remove it from the active message list + @msg = grep { $_ != $self } @msg; + + Log('msg', "Msgno $self->{msgno} expunged$call"); + dbg("Msgno $self->{msgno} expunged$call") if isdbg('msg'); + + # remove the file + unlink filename($self->{msgno}); + } +} + +sub mark_delete +{ + my $ref = shift; + my $t = shift; + + return if $ref->{keep}; - # remove the file - unlink filename($self->{msgno}); - dbg("deleting $self->{msgno}\n") if isdbg('msg'); - dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg'); + $t = $main::systime + $residencetime unless defined $t; + + $ref->{delete}++; + $ref->{deletetime} = $t; + $ref->store( [$ref->read_msg_body] ); +} + +sub unmark_delete +{ + my $ref = shift; + my $t = shift; + $ref->{delete} = 0; + $ref->{deletetime} = 0; } # clean out old messages from the message queue @@ -463,19 +626,14 @@ sub clean_old my $ref; # mark old messages for deletion - dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg'); foreach $ref (@msg) { - if (ref($ref) && !$ref->{keep} && $ref->{t} < $main::systime - $maxage) { - $ref->{deleteme} = 1; - unlink filename($ref->{msgno}); - dbg("deleting old $ref->{msgno}\n") if isdbg('msg'); + if (ref($ref) && !$ref->{keep} && $ref->{deletetime} < $main::systime) { + + # this is for IMMEDIATE destruction + $ref->{delete}++; + $ref->{deletetime} = 0; } } - - # remove them all from the active message list - @msg = grep { !$_->{deleteme} } @msg; - dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg'); - $last_clean = $main::systime; } # read in a message header @@ -545,7 +703,7 @@ sub read_msg_body if (!open($file, $fn)) { dbg("Error reading $fn $!"); Log('err' ,"Error reading $fn $!"); - return undef; + return (); } @out = map {chomp; $_} <$file>; close($file); @@ -580,7 +738,6 @@ sub send_tranche sub queue_msg { my $sort = shift; - my $call = shift; my $ref; my $clref; @@ -605,11 +762,17 @@ sub queue_msg $ref->stop_msg($node); # delay any outgoing messages that fail - $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; + $ref->{waitt} = $main::systime + $waittime + int rand(120) if $node ne $main::mycall; delete $ref->{lastt}; next; } + # is it being sent anywhere currently? + next if $ref->{tonode}; # ignore it if it already being processed + + # is it awaiting deletion? + next if $ref->{delete}; + # firstly, is it private and unread? if so can I find the recipient # in my cluster node list offsite? @@ -618,11 +781,6 @@ sub queue_msg if ($ref->{private}) { next if $ref->{'read'}; # if it is read, it is stuck here $clref = Route::get($ref->{to}); -# unless ($clref) { # otherwise look for a homenode -# my $uref = DXUser->get_current($ref->{to}); -# my $hnode = $uref->homenode if $uref; -# $clref = Route::Node::get($hnode) if $hnode; -# } if ($clref) { $dxchan = $clref->dxchan; if ($dxchan) { @@ -634,28 +792,34 @@ sub queue_msg dbg("Route: No dxchan for $ref->{to} " . ref($clref) ) if isdbg('msg'); } } - } - - # otherwise we are dealing with a bulletin or forwarded private message - # compare the gotit list with - # the nodelist up above, if there are sites that haven't got it yet - # then start sending it - what happens when we get loops is anyone's - # guess, use (to, from, time, subject) tuple? - foreach $dxchan (@nodelist) { - my $call = $dxchan->call; - next unless $call; - next if $call eq $main::mycall; - next if ref $ref->{gotit} && grep $_ eq $call, @{$ref->{gotit}}; - next unless $ref->forward_it($call); # check the forwarding file - - # if we are here we have a node that doesn't have this message - $ref->start_msg($dxchan) if !get_busy($call) && $dxchan->state eq 'normal'; - last; + } else { + + # otherwise we are dealing with a bulletin or forwarded private message + # compare the gotit list with + # the nodelist up above, if there are sites that haven't got it yet + # then start sending it - what happens when we get loops is anyone's + # guess, use (to, from, time, subject) tuple? + foreach $dxchan (@nodelist) { + my $call = $dxchan->call; + next unless $call; + next if $call eq $main::mycall; + next if ref $ref->{gotit} && grep $_ eq $call, @{$ref->{gotit}}; + next unless $ref->forward_it($call); # check the forwarding file + next if $ref->{tonode}; # ignore it if it already being processed + + # if we are here we have a node that doesn't have this message + if (!get_busy($call) && $dxchan->state eq 'normal') { + $ref->start_msg($dxchan); + last; + } + } } # if all the available nodes are busy then stop last if @nodelist == scalar grep { get_busy($_->call) } @nodelist; } + + } # is there a message for me? @@ -663,14 +827,15 @@ sub for_me { my $call = uc shift; my $ref; + my $count; foreach $ref (@msg) { # is it for me, private and unread? if ($ref->{to} eq $call && $ref->{private}) { - return 1 if !$ref->{'read'}; + $count++ unless $ref->{'read'} || $ref->{delete}; } } - return 0; + return $count; } # start the message off on its travels with a PC28 @@ -678,15 +843,19 @@ sub start_msg { my ($self, $dxchan) = @_; + confess("trying to start started msg $self->{msgno} nodes: $self->{fromnode} -> $self->{tonode}") if $self->{tonode}; dbg("start msg $self->{msgno}\n") if isdbg('msg'); $self->{linesreq} = 10; $self->{count} = 0; $self->{tonode} = $dxchan->call; $self->{fromnode} = $main::mycall; - $busy{$self->{tonode}} = $self; - $work{$self->{tonode}} = $self; + set_busy($self->{tonode}, $self); + set_fwq($self->{tonode}, undef, $self); $self->{lastt} = $main::systime; - $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); + my ($fromnode, $origin); + $fromnode = $self->{fromnode}; + $origin = $self->{origin}; + $dxchan->send(DXProt::pc28($self->{tonode}, $fromnode, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $origin, $self->{rrreq})); } # get the ref of a busy node @@ -696,16 +865,52 @@ sub get_busy return $busy{$call}; } -# get the busy queue +sub set_busy +{ + my $call = shift; + return $busy{$call} = shift; +} + +sub del_busy +{ + my $call = shift; + return delete $busy{$call}; +} + +# get the whole busy queue sub get_all_busy { - return values %busy; + return keys %busy; } -# get the forwarding queue +# get a forwarding queue entry sub get_fwq { - return values %work; + my $call = shift; + my $stream = shift || '0'; + return $work{"$call,$stream"}; +} + +# delete a forwarding queue entry +sub del_fwq +{ + my $call = shift; + my $stream = shift || '0'; + return delete $work{"$call,$stream"}; +} + +# set a fwq entry +sub set_fwq +{ + my $call = shift; + my $stream = shift || '0'; + return $work{"$call,$stream"} = shift; +} + +# get the whole forwarding queue +sub get_all_fwq +{ + return keys %work; } # stop a message from continuing, clean it out, unlock interlocks etc @@ -713,14 +918,28 @@ sub stop_msg { my $self = shift; my $node = shift; - my $stream = $self->{stream} if exists $self->{stream}; + my $stream = $self->{stream}; dbg("stop msg $self->{msgno} -> node $node\n") if isdbg('msg'); - delete $work{$node}; - delete $work{"$node$stream"} if $stream; + del_fwq($node, $stream); $self->workclean; - delete $busy{$node}; + del_busy($node); +} + +sub workclean +{ + my $ref = shift; + delete $ref->{lines}; + delete $ref->{linesreq}; + delete $ref->{tonode}; + delete $ref->{fromnode}; + delete $ref->{stream}; + delete $ref->{file}; + delete $ref->{count}; + delete $ref->{tempr}; + delete $ref->{lastt}; + delete $ref->{waitt}; } # get a new transaction number from the file specified @@ -777,7 +996,7 @@ sub init } # delete any messages to 'badmsg.pl' places - if ($ref->dump_it) { + if ($ref->dump_it('')) { dbg("'Bad' TO address $ref->{to}") if isdbg('msg'); Log('msg', "'Bad' TO address $ref->{to}"); $ref->del_msg; @@ -851,6 +1070,11 @@ sub do_send_stuff # $DB::single = 1; confess "local var gone missing" if !ref $self->{loc}; my $loc = $self->{loc}; + if (my @ans = BadWords::check($line)) { + $self->{badcount} += @ans; + Log('msg', $self->call . " used badwords: @ans to @{$loc->{to}} in msg"); + $loc->{reject}++; + } $loc->{subject} = $line; $loc->{lines} = []; $self->state('sendbody'); @@ -861,33 +1085,31 @@ sub do_send_stuff my $loc = $self->{loc}; if ($line eq "\032" || $line eq '%1A' || uc $line eq "/EX") { my $to; - - foreach $to (@{$loc->{to}}) { - my $ref; - my $systime = $main::systime; - my $mycall = $main::mycall; - $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'), - uc $to, - exists $loc->{from} ? $loc->{from} : $self->call, - $systime, - $loc->{private}, - $loc->{subject}, - exists $loc->{origin} ? $loc->{origin} : $mycall, - '0', - $loc->{rrreq}); - $ref->swop_it($self->call); - $ref->store($loc->{lines}); - $ref->add_dir(); - push @out, $self->msg('m11', $ref->{msgno}, $to); - #push @out, "msgno $ref->{msgno} sent to $to"; - my $dxchan = DXChannel->get(uc $to); - if ($dxchan) { - if ($dxchan->is_user()) { - $dxchan->send($dxchan->msg('m9')); - } + unless ($loc->{reject}) { + foreach $to (@{$loc->{to}}) { + my $ref; + my $systime = $main::systime; + my $mycall = $main::mycall; + $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'), + uc $to, + exists $loc->{from} ? $loc->{from} : $self->call, + $systime, + $loc->{private}, + $loc->{subject}, + exists $loc->{origin} ? $loc->{origin} : $mycall, + '0', + $loc->{rrreq}); + $ref->swop_it($self->call); + $ref->store($loc->{lines}); + $ref->add_dir(); + push @out, $self->msg('m11', $ref->{msgno}, $to); + #push @out, "msgno $ref->{msgno} sent to $to"; + $ref->notify; } + } else { + LogDbg('msg', $self->call . " swore to @{$loc->{to}} subject: '$loc->{subject}' in msg, REJECTED"); } - + delete $loc->{lines}; delete $loc->{to}; delete $self->{loc}; @@ -902,22 +1124,69 @@ sub do_send_stuff delete $self->{loc}; $self->func(undef); $self->state('prompt'); + } elsif ($line =~ m|^/+\w+|) { + # this is a command that you want display for your own reference + # or if it has TWO slashes is a command + $line =~ s|^/||; + my $store = $line =~ s|^/+||; + my @in = $self->run_cmd($line); + push @out, @in; + if ($store) { + foreach my $l (@in) { + if (my @ans = BadWords::check($l)) { + $self->{badcount} += @ans; + Log('msg', $self->call . " used badwords: @ans to @{$loc->{to}} subject: '$loc->{subject}' in msg") unless $loc->{reject}; + Log('msg', "line: $l"); + $loc->{reject}++; + } + push @{$loc->{lines}}, length($l) > 0 ? $l : " "; + } + } } else { + if (my @ans = BadWords::check($line)) { + $self->{badcount} += @ans; + Log('msg', $self->call . " used badwords: @ans to @{$loc->{to}} subject: '$loc->{subject}' in msg") unless $loc->{reject}; + Log('msg', "line: $line"); + $loc->{reject}++; + } + + if ($loc->{lines} && @{$loc->{lines}}) { + push @{$loc->{lines}}, length($line) > 0 ? $line : " "; + } else { + # temporarily store any R: lines so that we end up with + # only the first and last ones stored. + if ($line =~ m|^R:\d{6}/\d{4}|) { + push @{$loc->{tempr}}, $line; + } else { + if (exists $loc->{tempr}) { + push @{$loc->{lines}}, shift @{$loc->{tempr}}; + push @{$loc->{lines}}, pop @{$loc->{tempr}} if @{$loc->{tempr}}; + delete $loc->{tempr}; + } + push @{$loc->{lines}}, length($line) > 0 ? $line : " "; + } + } # i.e. it ain't and end or abort, therefore store the line - push @{$loc->{lines}}, length($line) > 0 ? $line : " "; } } - return (1, @out); + return @out; } # return the standard directory line for this ref sub dir { my $ref = shift; - return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", - $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size, - $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject; + my $flag = $ref->{private} && $ref->{read} ? '-' : ' '; + if ($ref->{keep}) { + $flag = '!'; + } elsif ($ref->{delete}) { + $flag = $ref->{deletetime} > $main::systime ? 'D' : 'E'; + } + return sprintf("%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", + $ref->{msgno}, $flag, $ref->{private} ? 'p' : ' ', + $ref->{size}, $ref->{to}, $ref->{from}, cldate($ref->{t}), + ztime($ref->{t}), $ref->{subject}); } # load the forward table @@ -989,9 +1258,36 @@ sub forward_it return 0; } +# +# look down the forward table to see whether this is a valid bull +# or not (ie it will forward somewhere even if it is only here) +# +sub valid_bull_addr +{ + my $call = shift; + my $i; + + unless (@forward) { + return 1 if $call =~ /^ALL/; + return 1 if $call =~ /^DX/; + return 0; + } + + for ($i = 0; $i < @forward; $i += 5) { + my ($sort, $field, $pattern, $action, $bbs) = @forward[$i..($i+4)]; + if ($field eq 'T') { + if (!$pattern || $call =~ m{$pattern}i) { + return 1; + } + } + } + return 0; +} + sub dump_it { my $ref = shift; + my $call = shift; my $i; for ($i = 0; $i < @badmsg; $i += 3) { @@ -1007,6 +1303,7 @@ sub dump_it $tested = $ref->{from} if $field eq 'F'; $tested = $ref->{origin} if $field eq 'O'; $tested = $ref->{subject} if $field eq 'S'; + $tested = $call if $field eq 'I'; if (!$pattern || $tested =~ m{$pattern}i) { return 1; @@ -1092,7 +1389,7 @@ sub import_msgs my @msg = map { chomp; $_ } ; close(MSG); unlink($fn); - my @out = import_one($DXProt::me, \@msg, $splitit); + my @out = import_one($main::me, \@msg, $splitit); Log('msg', @out); } } @@ -1114,7 +1411,9 @@ sub import_one # first line; my $line = shift @$ref; - my @f = split /\s+/, $line; + my @f = split /([\s\@\$])/, $line; + @f = map {s/\s+//g; length $_ ? $_ : ()} @f; + unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) { my $m = "invalid first line in import '$line'"; dbg($m) if isdbg('msg'); @@ -1131,16 +1430,16 @@ sub import_one ; } elsif ($notincalls && ($f eq 'RR')) { $rr = '1'; - } elsif ($f eq '@' && @f) { # this is bbs syntax, for origin - $origin = uc shift @f; + } elsif (($f =~ /^[\@\.\#\$]$/ || $f eq '.#') && @f) { # this is bbs syntax, for AT + shift @f; } elsif ($f eq '<' && @f) { # this is bbs syntax for from call $from = uc shift @f; } elsif ($f =~ /^\$/) { # this is bbs syntax for a bid next; - } elsif ($f =~ /^<\S+/) { # this is bbs syntax for from call - ($from) = $f =~ /^<(\S+)$/; - } elsif ($f =~ /^\@\S+/) { # this is bbs syntax for origin - ($origin) = $f =~ /^\@(\S+)$/; + } elsif ($f =~ /^<(\S+)/) { # this is bbs syntax for from call + $from = $1; + } elsif ($f =~ /^\$\S+/) { # this is bbs syntax for bid + ; } else { # callsign ? @@ -1200,7 +1499,18 @@ sub import_one } else { push @chunk, $ref; } - + + # does an identical message already exist? + my $m; + for $m (@msg) { + if (substr($subject,0,28) eq substr($m->{subject},0,28) && $from eq $m->{from} && grep $m->{to} eq $_, @to) { + my $msgno = $m->{msgno}; + dbg("duplicate message from $from -> $m->{to} to msg: $msgno") if isdbg('msg'); + Log('msg', "duplicate message from $from -> $m->{to} to msg: $msgno"); + return; + } + } + # write all the messages away my $i; for ( $i = 0; $i < @chunk; $i++) { @@ -1230,30 +1540,25 @@ sub import_one $mref->add_dir(); push @out, $dxchan->msg('m11', $mref->{msgno}, $to); #push @out, "msgno $ref->{msgno} sent to $to"; - my $todxchan = DXChannel->get(uc $to); - if ($todxchan) { - if ($todxchan->is_user()) { - $todxchan->send($todxchan->msg('m9')); - } - } + $mref->notify; } } return @out; } -no strict; +#no strict; sub AUTOLOAD { - my $self = shift; + no strict; my $name = $AUTOLOAD; return if $name =~ /::DESTROY$/; - $name =~ s/.*:://o; + $name =~ s/^.*:://o; confess "Non-existant field '$AUTOLOAD'" if !$valid{$name}; # this clever line of code creates a subroutine which takes over from autoload # from OO Perl - Conway - *{$AUTOLOAD} = sub {@_ > 1 ? $_[0]->{$name} = $_[1] : $_[0]->{$name}} ; - @_ ? $self->{$name} = shift : $self->{$name} ; + *$AUTOLOAD = sub {@_ > 1 ? $_[0]->{$name} = $_[1] : $_[0]->{$name}}; + goto &$AUTOLOAD; } 1;