1. added WWV filtering
authordjk <djk>
Sun, 27 Jun 1999 11:01:31 +0000 (11:01 +0000)
committerdjk <djk>
Sun, 27 Jun 1999 11:01:31 +0000 (11:01 +0000)
2. added timeouts to for forwarding Messages, also added a waiting time for
failed (stalled) outgoing messages.
3. Incoming messages will now have priority over outgoing messages to the
same node.
4. Added 'catchup' command which will 'catchup' messages to date for a node.
This means that when you start forwarding to a node, it doesn't get all the
messages queued up that are probably old.

Changes
cmd/catchup.pl [new file with mode: 0644]
cmd/reply.pl
cmd/send.pl
cmd/uncatchup.pl [new file with mode: 0644]
filter/wwv/GB7MBC.pl.issue [new file with mode: 0644]
perl/DXMsg.pm
perl/DXProt.pm
perl/Messages
perl/cluster.pl

diff --git a/Changes b/Changes
index 7955dafe3923168ab1da978e122d1ac05d73040e..d154f217945d91eab303bc7a6eb45995cdeb5b87 100644 (file)
--- a/Changes
+++ b/Changes
@@ -1,3 +1,12 @@
+27Jun99=======================================================================
+1. added WWV filtering
+2. added timeouts to for forwarding Messages, also added a waiting time for
+failed (stalled) outgoing messages.
+3. Incoming messages will now have priority over outgoing messages to the
+same node.
+4. Added 'catchup' command which will 'catchup' messages to date for a node. 
+This means that when you start forwarding to a node, it doesn't get all the
+messages queued up that are probably old.
 21Jun99=======================================================================
 1. changed regex for cluster->client msgs so that strings like |---| are no
 longer ignored.
diff --git a/cmd/catchup.pl b/cmd/catchup.pl
new file mode 100644 (file)
index 0000000..4cda120
--- /dev/null
@@ -0,0 +1,18 @@
+#
+# catchup some or all of the non-private messages for a node.
+#
+# in other words mark all messages as being already received
+# by this node.
+#
+# $Id$
+#
+# Copyright (c) 1999 Dirk Koopman G1TLH
+#
+
+my ($self, $line) = @_;
+my @f = split /\s+/, $line;
+my $call = uc shift @f;
+my @out;
+
+
+return (1, @out);
index c5ceaddf9a26c08bd746ff44f4428ebfe42fe54f..c8ef615d12a7c3776450ad5432ae9e2f0bc66b88 100644 (file)
@@ -57,13 +57,13 @@ if ($self->state eq "prompt") {
                $oref = DXMsg::get($f[$i]);
                if (!$oref) {
                        delete $self->{loc};
-                       return (0, "can't access message $i");
+                       return (1, $self->msg('m4', $i));
                }
        } else {
                if (!($oref = DXMsg::get($self->lastread))) {
                        delete $self->{loc};
-                       #return (0, $self->msg('esend2'));
-                       return (0, "need a message number");
+                       return (1, $self->msg('m5'));
+                       #return (1, "need a message number");
                }
        }
        
@@ -78,9 +78,12 @@ if ($self->state eq "prompt") {
        $self->func("DXMsg::do_send_stuff");
        $self->state('sendbody');
        #push @out, $self->msg('sendsubj');
-       push @out, "Reply to: $to";
-       push @out, "Subject : $loc->{subject}";
-       push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
+#      push @out, "Reply to: $to";
+#      push @out, "Subject : $loc->{subject}";
+#      push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
+       push @out, $self->msg('m6', $to);
+       push @out, $self->msg('m7', $loc->{subject});
+       push @out, $self->msg('m8');
 }
 
 return (1, @out);
index 2ebca2a93f9a5aaeb7816360d3aa955382ab80d3..ade1dce0aca7e3789e8f3d1daf4eb88ba7addb92 100644 (file)
@@ -68,8 +68,8 @@ if ($self->state eq "prompt") {
                        push @list, $oref->read_msg_body();
                        $nref->store(\@list);
                        $nref->add_dir();
-                       #push @out, $self->msg('sendcc', $oref->msgno, $f[$i]);
-                       push @out, "copy of msg $oref->{msgno} sent to $to";
+                       push @out, $self->msg('m2', $oref->msgno, $to);
+#                      push @out, "copy of msg $oref->{msgno} sent to $to";
                }
                DXMsg::queue_msg();
                return (1, @out);
@@ -108,7 +108,8 @@ if ($self->state eq "prompt") {
        foreach  $t (@f[ $i..$#f ]) {
                $t = uc $t;
                if (grep $_ eq $t, @DXMsg::badmsg) {
-                       push @out, "Sorry, $t is an unacceptable TO address";
+#                      push @out, "Sorry, $t is an unacceptable TO address";
+                       push @out, $self->msg('m3', $t);
                } else {
                        push @to, $t;
                }
@@ -123,8 +124,8 @@ if ($self->state eq "prompt") {
        # keep calling me for every line until I relinquish control
        $self->func("DXMsg::do_send_stuff");
        $self->state('send1');
-       #push @out, $self->msg('sendsubj');
-       push @out, "Enter Subject (30 characters) >";
+       push @out, $self->msg('m1');
+       #push @out, "Enter Subject (30 characters) >";
 }
 
 return (1, @out);
diff --git a/cmd/uncatchup.pl b/cmd/uncatchup.pl
new file mode 100644 (file)
index 0000000..15edb1a
--- /dev/null
@@ -0,0 +1,18 @@
+#
+# uncatchup some or all of the non-private messages for a node.
+#
+# in other words mark  messages as NOT being already received
+# by this node.
+#
+# $Id$
+#
+# Copyright (c) 1999 Dirk Koopman G1TLH
+#
+
+my ($self, $line) = @_;
+my @f = split /\s+/, $line;
+my $call = uc shift @f;
+my @out;
+
+
+return (1, @out);
diff --git a/filter/wwv/GB7MBC.pl.issue b/filter/wwv/GB7MBC.pl.issue
new file mode 100644 (file)
index 0000000..7f3dfc1
--- /dev/null
@@ -0,0 +1,22 @@
+#
+# This is an example WWV filter
+# 
+# The element list is:-
+# 0 - nominal unix date of spot (ie the day + hour:13)
+# 1 - the hour
+# 2 - SFI
+# 3 - K
+# 4 - I
+# 5 - text
+# 6 - spotter
+# 7 - origin
+# 8 - incoming interface callsign
+#
+# this one doesn't filter, it just sets the hop count to 6 and is
+# used mainly just to override any isolation from WWV coming from
+# the internet.
+
+$in = [
+        [ 1, 0, 'd', 0, 6 ]
+];
+
index 13af2cc02eac81e6bb85369add0fa39b3ce4a75e..c1f0ae7a2ac8ef3ef970edda16f733485721f90c 100644 (file)
@@ -32,7 +32,7 @@ use Carp;
 
 use strict;
 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
-                       @badmsg $badmsgfn $forwardfn @forward);
+                       @badmsg $badmsgfn $forwardfn @forward $timeout $waittime);
 
 %work = ();                                            # outstanding jobs
 @msg = ();                                             # messages we have
@@ -41,6 +41,8 @@ $msgdir = "$main::root/msg";  # directory contain the msgs
 $maxage = 30 * 86400;                  # the maximum age that a message shall live for if not marked 
 $last_clean = 0;                               # last time we did a clean
 @forward = ();                  # msg forward table
+$timeout = 30*60;               # forwarding timeout
+$waittime = 60*60;              # time an aborted outgoing message waits before trying again
 
 $badmsgfn = "$msgdir/badmsg.pl";  # list of TO address we wont store
 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
@@ -66,6 +68,8 @@ $forwardfn = "$msgdir/forward.pl";  # the forwarding table
                  size => '0,Size',
                  msgno => '0,Msgno',
                  keep => '0,Keep this?,yesno',
+                 lastt => '9,Last processed,cldatetime',
+                 waitt => '9,Wait until,cldatetime',
                 );
 
 sub DESTROY
@@ -95,6 +99,7 @@ sub alloc
        $self->{'read'} = shift;
        $self->{rrreq} = shift;
        $self->{gotit} = [];
+       $self->{lastt} = $main::systime;
     
        return $self;
 }
@@ -110,16 +115,49 @@ sub workclean
        delete $ref->{lines};
        delete $ref->{file};
        delete $ref->{count};
+       delete $ref->{lastt} if exists $ref->{lastt};
+       delete $ref->{waitt} if exists $ref->{waitt};
 }
 
 sub process
 {
        my ($self, $line) = @_;
+
+       # this is periodic processing
+       if (undef $self || undef $line) {
+
+               # wander down the work queue stopping any messages that have timed out
+               for (keys %work) {
+                       my $ref = $work{$_};
+                       if ($main::systime > $ref->{lastt} + $timeout) {
+                               my $tonode = $ref->{tonode};
+                               $ref->stop_msg();
+
+                               # delay any outgoing messages that fail
+                               $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall;
+                       }
+               }
+               
+               # clean the message queue
+               clean_old() if $main::systime - $last_clean > 3600 ;
+               return;
+       }
+
        my @f = split /\^/, $line;
        my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
-       
+
  SWITCH: {
                if ($pcno == 28) {              # incoming message
+
+                       # first look for any messages in the busy queue 
+                       # and cancel them this should both resolve timed out incoming messages
+                       # and crossing of message between nodes, incoming messages have priority
+                       if (exists $busy{$f[2]}) {
+                               my $ref = $busy{$f[2]};
+                               my $tonode = $ref->{tonode};
+                               $ref->stop_msg();
+                       }
+
                        my $t = cltounix($f[5], $f[6]);
                        my $stream = next_transno($f[2]);
                        my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
@@ -148,6 +186,7 @@ sub process
                                        dbg('msg', "stream $f[3]: $ref->{count} lines received\n");
                                        $ref->{count} = 0;
                                }
+                               $ref->{lastt} = $main::systime;
                        }
                        last SWITCH;
                }
@@ -165,6 +204,7 @@ sub process
                                $ref->{lines} = [];
                                push @{$ref->{lines}}, ($ref->read_msg_body);
                                $ref->send_tranche($self);
+                               $ref->{lastt} = $main::systime;
                        } else {
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
@@ -176,6 +216,7 @@ sub process
                        if ($ref) {
                                dbg('msg', "tranche ack stream $f[3]\n");
                                $ref->send_tranche($self);
+                               $ref->{lastt} = $main::systime;
                        } else {
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
@@ -202,7 +243,7 @@ sub process
                                                my $m;
                                                for $m (@msg) {
                                                        if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) {
-                                                               $ref->stop_msg($self);
+                                                               $ref->stop_msg();
                                                                my $msgno = $m->{msgno};
                                                                dbg('msg', "duplicate message to $msgno\n");
                                                                Log('msg', "duplicate message to $msgno");
@@ -212,7 +253,7 @@ sub process
                                                        
                                                # look for 'bad' to addresses 
                                                if (grep $ref->{to} eq $_, @badmsg) {
-                                                       $ref->stop_msg($self);
+                                                       $ref->stop_msg();
                                                        dbg('msg', "'Bad' TO address $ref->{to}");
                                                        Log('msg', "'Bad' TO address $ref->{to}");
                                                        return;
@@ -223,11 +264,11 @@ sub process
                                                $ref->store($ref->{lines});
                                                add_dir($ref);
                                                my $dxchan = DXChannel->get($ref->{to});
-                                               $dxchan->send($dxchan->msg('msgnew')) if $dxchan;
+                                               $dxchan->send($dxchan->msg('m9')) if $dxchan;
                                                Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
                                        }
                                }
-                               $ref->stop_msg($self);
+                               $ref->stop_msg();
                                queue_msg(0);
                        } else {
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
@@ -247,7 +288,7 @@ sub process
                                        push @{$ref->{gotit}}, $f[2]; # mark this up as being received
                                        $ref->store($ref->{lines});     # re- store the file
                                }
-                               $ref->stop_msg($self);
+                               $ref->stop_msg();
                        } else {
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
@@ -294,7 +335,7 @@ sub process
                        dbg('msg', "stream $f[3]: abort received\n");
                        my $ref = $work{"$f[2]$f[3]"};
                        if ($ref) {
-                               $ref->stop_msg($self);
+                               $ref->stop_msg();
                                $ref = undef;
                        }
                        
@@ -310,8 +351,6 @@ sub process
                        }
                }
        }
-       
-       clean_old() if $main::systime - $last_clean > 3600 ; # clean the message queue
 }
 
 
@@ -516,6 +555,13 @@ sub queue_msg
        foreach $ref (@msg) {
                # firstly, is it private and unread? if so can I find the recipient
                # in my cluster node list offsite?
+
+               # ignore 'delayed' messages until their waiting time has expired
+               if (exists $ref->{waitt}) {
+                       next if $ref->{waitt} < $main::systime;
+                       delete $ref->{waitt};
+               } 
+               
                if ($ref->{private}) {
                        if ($ref->{'read'} == 0) {
                                $clref = DXCluster->get_exact($ref->{to});
@@ -606,11 +652,13 @@ sub get_fwq
 sub stop_msg
 {
        my ($self, $dxchan) = @_;
-       my $node = $dxchan->call;
+       my $node = $self->{tonode}
+       my $stream = $self->{stream} if exists $self->{stream};
        
-       dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n");
+       
+       dbg('msg', "stop msg $self->{msgno} -> node $node\n");
        delete $work{$node};
-       delete $work{"$node$self->{stream}"};
+       delete $work{"$node$stream"} if $stream;
        $self->workclean;
        delete $busy{$node};
 }
@@ -743,7 +791,7 @@ sub do_send_stuff
                $loc->{lines} = [];
                $self->state('sendbody');
                #push @out, $self->msg('sendbody');
-               push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
+               push @out, $self->msg('m8');)
        } elsif ($self->state eq 'sendbody') {
                confess "local var gone missing" if !ref $self->{loc};
                my $loc = $self->{loc};
@@ -766,12 +814,12 @@ sub do_send_stuff
                                                                                $loc->{rrreq});
                                        $ref->store($loc->{lines});
                                        $ref->add_dir();
-                                       #push @out, $self->msg('sendsent', $to);
-                                       push @out, "msgno $ref->{msgno} sent to $to";
+                                       push @out, $self->msg('m11', $ref->{msgno}, $to);
+                                       #push @out, "msgno $ref->{msgno} sent to $to";
                                        my $dxchan = DXChannel->get(uc $to);
                                        if ($dxchan) {
                                                if ($dxchan->is_user()) {
-                                                       $dxchan->send("New mail has arrived for you");
+                                                       $dxchan->send($dxchan->msg('m9'));
                                                }
                                        }
                                }
@@ -780,11 +828,12 @@ sub do_send_stuff
                        delete $loc->{to};
                        delete $self->{loc};
                        $self->func(undef);
+                       
                        DXMsg::queue_msg(0);
                        $self->state('prompt');
                } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
                        #push @out, $self->msg('sendabort');
-                       push @out, "aborted";
+                       push @out, $self->msg('m10');
                        delete $loc->{lines};
                        delete $loc->{to};
                        delete $self->{loc};
index 4349b87c506e4fb5a058a1b6874a803e34de92c3..0d137e529bd3710f7dd391eadf9ee126f4fe0083 100644 (file)
@@ -516,11 +516,11 @@ sub normal
                        $wwvdup{$dupkey} = $d;
                        $field[6] =~ s/-\d+$//o;            # remove spotter's ssid
                
-                       my $wwv = Geomag::update($d, $field[2], $sfi, $k, $i, @field[6..$#field]);
+                       my $wwv = Geomag::update($d, $field[2], $sfi, $k, $i, @field[6..8]);
 
                        my $r;
                        eval {
-                               $r = Local::wwv($self, $field[1], $field[2], $sfi, $k, $i, @field[6..$#field]);
+                               $r = Local::wwv($self, $field[1], $field[2], $sfi, $k, $i, @field[6..8]);
                        };
 #                      dbg('local', "Local::wwv2 error $@") if $@;
                        return if $r;
@@ -528,9 +528,9 @@ sub normal
                        # DON'T be silly and send on PC27s!
                        return if $pcno == 27;
 
-                       # broadcast to the eager users
-                       broadcast_users("WWV de $field[7] <$field[2]>:   SFI=$sfi, A=$k, K=$i, $field[6]", 'wwv', $wwv );
-                       last SWITCH;
+                       # broadcast to the eager world
+                       send_wwv_spot($self, $line, $d, $field[2], $sfi, $k, $i, @field[6..8]);
+                       return;
                }
                
                if ($pcno == 24) {              # set here status
@@ -836,6 +836,45 @@ sub send_dx_spot
        }
 }
 
+sub send_wwv_spot
+{
+       my $self = shift;
+       my $line = shift;
+       my @dxchan = DXChannel->get_all();
+       my $dxchan;
+       
+       # send it if it isn't the except list and isn't isolated and still has a hop count
+       # taking into account filtering and so on
+       foreach $dxchan (@dxchan) {
+               my $routeit;
+               my ($filter, $hops) = Filter::it($dxchan->{wwvfilter}, @_, $self->{call} ) if $dxchan->{wwvfilter};
+               if ($dxchan->is_ak1a) {
+                       next if $dxchan == $self;
+                       if ($hops) {
+                               $routeit = $line;
+                               $routeit =~ s/\^H\d+\^\~$/\^H$hops\^\~/;
+                       } else {
+                               $routeit = adjust_hops($dxchan, $line);  # adjust its hop count by node name
+                               next unless $routeit;
+                       }
+                       if ($filter) {
+                               $dxchan->send($routeit) if $routeit;
+                       } else {
+                               $dxchan->send($routeit) unless $dxchan->{isolate} || $self->{isolate};
+                               
+                       }
+               } elsif ($dxchan->is_user && $dxchan->{wwv}) {
+                       my $buf = "WWV de $_[6] <$_[1]>:   SFI=$_[2], A=$_[3], K=$_[4], $_[5]";
+                       $buf .= "\a\a" if $dxchan->{beep};
+                       if ($dxchan->{state} eq 'prompt' || $dxchan->{state} eq 'convers') {
+                               $dxchan->send($buf) if !$hops || ($hops && $filter);
+                       } else {
+                               $dxchan->delay($buf) if !$hops || ($hops && $filter);
+                       }
+               }                                       
+       }
+}
+
 sub send_local_config
 {
        my $self = shift;
index f1e7a954f7bbb3dd31f023836d1e162ceb27ee5a..b1ef57ad351c3d1034cf2a1a8d8bc0e108857e9c 100644 (file)
@@ -71,9 +71,18 @@ package DXM;
                                lockout => '$_[0] Locked out',
                                lockoutc => '$_[0] Created and Locked out',
                                lockoutun => '$_[0] Unlocked',
-                               m2 => '$_[0] Information: $_[1]',
+                               m1 => 'Enter Subject (30 characters) >',
+                               m2 => 'Copy of msg $_[0] sent to $_[1]',
+                               m3 => 'Sorry, $_[0] is an unacceptable TO address',
+                               m4 => 'Sorry, can\'t access message $_[0]',
+                               m5 => 'Sorry, I need a message number', 
+                               m6 => 'Reply to: $_[0]',
+                               m7 => 'Subject : $_[0]',
+                               m8 => 'Enter Message /EX to send or /ABORT to exit',
+                               m9 => 'New mail has arrived for you',
+                               m10 => 'Message Aborted',
+                               m11 => 'Message no $_[0] saved and directed to $_[1]',
                                merge1 => 'Merge request for $_[1] spots and $_[2] WWV sent to $_[0]',
-                               msgnew => 'New mail has arrived for you',
                                namee1 => 'Please enter your name, set/name <your name>',
                                namee2 => 'Can\'t find user $_[0]!',
                                name => 'Your name is now \"$_[0]\"',
index d1bc62573a9cd336f1fd33a63228508446ff91d3..1af0388a3449ed82a9bfdeb7a370f4579505e3a8 100755 (executable)
@@ -374,6 +374,7 @@ for (;;) {
                DXCommandmode::process(); # process ongoing command mode stuff
                DXProt::process();              # process ongoing ak1a pcxx stuff
                DXConnect::process();
+               DXMsg::process();
                eval { 
                        Local::process();       # do any localised processing
                };