X-Git-Url: http://www.dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=6a6f104aa346073dc0a6eb5e979b7beee023ed72;hb=cef696652d16bbeec53aca45234ea0b64f3496d3;hp=e949c6827223b4b5f2b8c8079854f6051c0748ce;hpb=78ed3f6025103ec1c47c90725e37b417647d83c8;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index e949c682..6a6f104a 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -19,14 +19,16 @@ use DXCluster; use DXProtVars; use DXProtout; use DXDebug; +use DXLog; use FileHandle; use Carp; use strict; -use vars qw(%work @msg $msgdir %valid); +use vars qw(%work @msg $msgdir %valid %busy); %work = (); # outstanding jobs @msg = (); # messages we have +%busy = (); # station interlocks $msgdir = "$main::root/msg"; # directory contain the msgs %valid = ( @@ -65,6 +67,8 @@ sub alloc $self->{subject} = shift; $self->{origin} = shift; $self->{read} = shift; + $self->{rrreq} = shift; + $self->{gotit} = []; return $self; } @@ -92,7 +96,7 @@ sub process if ($pcno == 28) { # incoming message my $t = cltounix($f[5], $f[6]); my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0'); + my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]); # fill in various forwarding state variables $ref->{fromnode} = $f[2]; @@ -102,13 +106,14 @@ sub process $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n"); - $work{"$f[1]$f[2]$stream"} = $ref; # store in work + $work{"$f[2]$stream"} = $ref; # store in work + $busy{$f[2]} = $ref; # set interlock $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack last SWITCH; } if ($pcno == 29) { # incoming text - my $ref = $work{"$f[1]$f[2]$f[3]"}; + my $ref = $work{"$f[2]$f[3]"}; if ($ref) { push @{$ref->{lines}}, $f[4]; $ref->{count}++; @@ -121,27 +126,74 @@ sub process last SWITCH; } - if ($pcno == 30) { + if ($pcno == 30) { # this is a incoming subject ack + my $ref = $work{$f[2]}; # note no stream at this stage + delete $work{$f[2]}; + $ref->{stream} = $f[3]; + $ref->{count} = 0; + $ref->{linesreq} = 5; + $work{"$f[2]$f[3]"} = $ref; # new ref + dbg('msg', "incoming subject ack stream $f[3]\n"); + $busy{$f[2]} = $ref; # interlock + $ref->{lines} = []; + push @{$ref->{lines}}, ($ref->read_msg_body); + $ref->send_tranche($self); last SWITCH; } - if ($pcno == 31) { + if ($pcno == 31) { # acknowledge a tranche of lines + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + dbg('msg', "tranche ack stream $f[3]\n"); + $ref->send_tranche($self); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + } last SWITCH; } if ($pcno == 32) { # incoming EOM dbg('msg', "stream $f[3]: EOM received\n"); - my $ref = $work{"$f[1]$f[2]$f[3]"}; + my $ref = $work{"$f[2]$f[3]"}; if ($ref) { $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it - $ref->store($ref->{lines}); # store it (whatever that may mean) - $ref->workclean; - delete $work{"$f[1]$f[2]$f[3]"}; # remove the reference from the work vector + + # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol + # store the file or message + # remove extraneous rubbish from the hash + # remove it from the work in progress vector + # stuff it on the msg queue + if ($ref->{lines} && @{$ref->{lines}} > 0) { # ignore messages with 0 lines + $ref->{msgno} = next_transno("Msgno") if !$ref->{file}; + push @{$ref->{gotit}}, $f[2]; # mark this up as being received + $ref->store($ref->{lines}); + add_dir($ref); + my $dxchan = DXChannel->get($ref->{to}); + $dxchan->send("New mail has arrived for you") if $dxchan; + } + $ref->stop_msg($self); + queue_msg(); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } + queue_msg(); last SWITCH; } - if ($pcno == 33) { + if ($pcno == 33) { # acknowledge the end of message + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + if ($ref->{private}) { # remove it if it private and gone off site# + $ref->del_msg; + } else { + push @{$ref->{gotit}}, $f[2]; # mark this up as being received + $ref->store($ref->{lines}); # re- store the file + } + $ref->stop_msg($self); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + } + queue_msg(); last SWITCH; } @@ -164,7 +216,7 @@ sub process dbg('msg', "created directory $fn\n"); } my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0'); + my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0'); # forwarding variables $ref->{fromnode} = $f[1]; @@ -173,23 +225,36 @@ sub process $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s $ref->{file} = 1; - $work{"$f[1]$f[2]$stream"} = $ref; # store in work + $work{"$f[2]$stream"} = $ref; # store in work $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack last SWITCH; } + + if ($pcno == 42) { # abort transfer + dbg('msg', "stream $f[3]: abort received\n"); + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + $ref->stop_msg($self); + $ref = undef; + } + + last SWITCH; + } } } # store a message away on disc or whatever +# +# NOTE the second arg is a REFERENCE not a list sub store { my $ref = shift; my $lines = shift; # we only proceed if there are actually any lines in the file - if (@{$lines} == 0) { + if (!$lines || @{$lines} == 0) { return; } @@ -204,35 +269,34 @@ sub store } $fh->close; dbg('msg', "file $ref->{to} stored\n"); + Log('msg', "file $ref->{to} from $ref->{from} stored" ); } else { confess "can't open file $ref->{to} $!"; } # push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode}; } else { # a normal message - # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol - my $msgno = next_transno("Msgno"); - # attempt to open the message file - my $fn = filename($msgno); + my $fn = filename($ref->{msgno}); dbg('msg', "To be stored in $fn\n"); - + + # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem) my $fh = new FileHandle "$fn", "w"; if (defined $fh) { - print $fh "=== $msgno^$ref->{to}^$ref->{from}^$ref->{t}^$ref->{private}^$ref->{subject}^$ref->{origin}^$ref->{read}\n"; - print $fh "=== $ref->{fromnode}\n"; + my $rr = $ref->{rrreq} ? '1' : '0'; + my $priv = $ref->{private} ? '1': '0'; + print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{read}^$rr\n"; + print $fh "=== ", join('^', @{$ref->{gotit}}), "\n"; my $line; + $ref->{size} = 0; foreach $line (@{$lines}) { $ref->{size} += (length $line) + 1; print $fh "$line\n"; } - $ref->{gotit} = []; - $ref->{msgno} = $msgno; - push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode}; - push @msg, $ref; # add this message to the incore message list $fh->close; - dbg('msg', "msg $msgno stored\n"); + dbg('msg', "msg $ref->{msgno} stored\n"); + Log('msg', "msg $ref->{msgno} from $ref->{from} to $ref->{to} stored" ); } else { confess "can't open msg file $fn $!"; } @@ -247,8 +311,13 @@ sub del_msg # remove it from the active message list @msg = map { $_ != $self ? $_ : () } @msg; + # belt and braces (one day I will ask someone if this is REALLY necessary) + delete $self->{gotit}; + delete $self->{list}; + # remove the file unlink filename($self->{msgno}); + dbg('msg', "deleting $self->{msgno}\n"); } # read in a message header @@ -288,7 +357,7 @@ sub read_msg_header $line =~ s/^=== //o; $ref->{gotit} = []; @f = split /\^/, $line; - push @{$ref->{goit}}, @f; + push @{$ref->{gotit}}, @f; $ref->{size} = $size; close($file); @@ -314,11 +383,121 @@ sub read_msg_body chomp (@out = <$file>); close($file); - shift @out if $out[0] =~ /^=== \d+\^/; - shift @out if $out[0] =~ /^=== \d+\^/; + shift @out if $out[0] =~ /^=== /; + shift @out if $out[0] =~ /^=== /; return @out; } +# send a tranche of lines to the other end +sub send_tranche +{ + my ($self, $dxchan) = @_; + my @out; + my $to = $self->{tonode}; + my $from = $self->{fromnode}; + my $stream = $self->{stream}; + my $i; + + for ($i = 0; $i < $self->{linesreq} && $self->{count} < @{$self->{lines}}; $i++, $self->{count}++) { + push @out, DXProt::pc29($to, $from, $stream, ${$self->{lines}}[$self->{count}]); + } + push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq}; + $dxchan->send(@out); +} + + +# find a message to send out and start the ball rolling +sub queue_msg +{ + my $sort = shift; + my @nodelist = DXProt::get_all_ak1a(); + my $ref; + my $clref; + my $dxchan; + + # bat down the message list looking for one that needs to go off site and whose + # nearest node is not busy. + + dbg('msg', "queue msg ($sort)\n"); + foreach $ref (@msg) { + # firstly, is it private and unread? if so can I find the recipient + # in my cluster node list offsite? + if ($ref->{private}) { + if ($ref->{read} == 0) { + $clref = DXCluster->get($ref->{to}); + if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) { + $dxchan = $clref->{dxchan}; + $ref->start_msg($dxchan) if $clref && !get_busy($dxchan->call); + } + } + } elsif ($sort == undef) { + # otherwise we are dealing with a bulletin, compare the gotit list with + # the nodelist up above, if there are sites that haven't got it yet + # then start sending it - what happens when we get loops is anyone's + # guess, use (to, from, time, subject) tuple? + my $noderef; + foreach $noderef (@nodelist) { + next if $noderef->call eq $main::mycall; + next if grep { $_ eq $noderef->call } @{$ref->{gotit}}; + + # if we are here we have a node that doesn't have this message + $ref->start_msg($noderef) if !get_busy($noderef->call); + last; + } + } + + # if all the available nodes are busy then stop + last if @nodelist == scalar grep { get_busy($_->call) } @nodelist; + } +} + +# start the message off on its travels with a PC28 +sub start_msg +{ + my ($self, $dxchan) = @_; + + dbg('msg', "start msg $self->{msgno}\n"); + $self->{linesreq} = 5; + $self->{count} = 0; + $self->{tonode} = $dxchan->call; + $self->{fromnode} = $main::mycall; + $busy{$dxchan->call} = $self; + $work{"$self->{tonode}"} = $self; + $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); +} + +# get the ref of a busy node +sub get_busy +{ + my $call = shift; + return $busy{$call}; +} + +# get the busy queue +sub get_all_busy +{ + return values %busy; +} + +# get the forwarding queue +sub get_fwq +{ + return values %work; +} + +# stop a message from continuing, clean it out, unlock interlocks etc +sub stop_msg +{ + my ($self, $dxchan) = @_; + my $node = $dxchan->call; + + dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n"); + delete $work{$node}; + delete $work{"$node$self->{stream}"}; + $self->workclean; + delete $busy{$node}; +} + # get a new transaction number from the file specified sub next_transno { @@ -362,18 +541,37 @@ sub init $ref = read_msg_header("$msgdir/$_"); next if !$ref; - # add the clusters that have this - push @msg, $ref; + # add the message to the available queue + add_dir($ref); } } +# add the message to the directory listing +sub add_dir +{ + my $ref = shift; + confess "tried to add a non-ref to the msg directory" if !ref $ref; + push @msg, $ref; +} + # return all the current messages sub get_all { return @msg; } +# get a particular message +sub get +{ + my $msgno = shift; + for (@msg) { + return $_ if $_->{msgno} == $msgno; + last if $_->{msgno} > $msgno; + } + return undef; +} + # return the official filename for a message no sub filename { @@ -399,6 +597,83 @@ sub field_prompt return $valid{$ele}; } +# +# send a message state machine +sub do_send_stuff +{ + my $self = shift; + my $line = shift; + my @out; + + if ($self->state eq 'send1') { +# $DB::single = 1; + confess "local var gone missing" if !ref $self->{loc}; + my $loc = $self->{loc}; + $loc->{subject} = $line; + $loc->{lines} = []; + $self->state('sendbody'); + #push @out, $self->msg('sendbody'); + push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit"; + } elsif ($self->state eq 'sendbody') { + confess "local var gone missing" if !ref $self->{loc}; + my $loc = $self->{loc}; + if ($line eq "\032" || uc $line eq "/EX") { + my $to; + + if (@{$loc->{lines}} > 0) { + foreach $to (@{$loc->{to}}) { + my $ref; + my $systime = $main::systime; + my $mycall = $main::mycall; + $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'), + uc $to, + $self->call, + $systime, + $loc->{private}, + $loc->{subject}, + $mycall, + '0', + $loc->{rrreq}); + $ref->store($loc->{lines}); + $ref->add_dir(); + #push @out, $self->msg('sendsent', $to); + push @out, "msgno $ref->{msgno} sent to $to"; + my $dxchan = DXChannel->get(uc $to); + $dxchan->send("New mail has arrived for you") if $dxchan; + } + } + delete $loc->{lines}; + delete $loc->{to}; + delete $self->{loc}; + $self->state('prompt'); + $self->func(undef); + DXMsg::queue_msg(); + } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") { + #push @out, $self->msg('sendabort'); + push @out, "aborted"; + delete $loc->{lines}; + delete $loc->{to}; + delete $self->{loc}; + $self->func(undef); + $self->state('prompt'); + } else { + + # i.e. it ain't and end or abort, therefore store the line + push @{$loc->{lines}}, $line; + } + } + return (1, @out); +} + +# return the standard directory line for this ref +sub dir +{ + my $ref = shift; + return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", + $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size, + $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject; +} + no strict; sub AUTOLOAD { @@ -411,7 +686,6 @@ sub AUTOLOAD @_ ? $self->{$name} = shift : $self->{$name} ; } - 1; __END__