X-Git-Url: http://www.dxcluster.org/gitweb/gitweb.cgi?p=spider.git;a=blobdiff_plain;f=perl%2FDXMsg.pm;h=9c032ba56f02caf41b682cd87731c6dcbd2c1c0b;hp=f916ff5405da91ca8d27ea4a5383426414fd3070;hb=57b5e464bc44ae8eee23ab94c1f499f527595dc9;hpb=23d995215379c4786c2cb1d930a09c734c2472aa diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index f916ff54..9c032ba5 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -23,10 +23,11 @@ use FileHandle; use Carp; use strict; -use vars qw(%work @msg $msgdir %valid); +use vars qw(%work @msg $msgdir %valid %busy); %work = (); # outstanding jobs @msg = (); # messages we have +%busy = (); # station interlocks $msgdir = "$main::root/msg"; # directory contain the msgs %valid = ( @@ -65,6 +66,7 @@ sub alloc $self->{subject} = shift; $self->{origin} = shift; $self->{read} = shift; + $self->{gotit} = []; return $self; } @@ -92,7 +94,7 @@ sub process if ($pcno == 28) { # incoming message my $t = cltounix($f[5], $f[6]); my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0'); + my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]); # fill in various forwarding state variables $ref->{fromnode} = $f[2]; @@ -102,13 +104,14 @@ sub process $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n"); - $work{"$f[1]$f[2]$stream"} = $ref; # store in work + $work{"$f[2]$stream"} = $ref; # store in work + $busy{$f[2]} = $ref; # set interlock $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack last SWITCH; } if ($pcno == 29) { # incoming text - my $ref = $work{"$f[1]$f[2]$f[3]"}; + my $ref = $work{"$f[2]$f[3]"}; if ($ref) { push @{$ref->{lines}}, $f[4]; $ref->{count}++; @@ -121,35 +124,72 @@ sub process last SWITCH; } - if ($pcno == 30) { + if ($pcno == 30) { # this is a incoming subject ack + my $ref = $work{$f[2]}; # note no stream at this stage + delete $work{$f[2]}; + $ref->{stream} = $f[3]; + $ref->{count} = 0; + $ref->{linesreq} = 5; + $work{"$f[2]$f[3]"} = $ref; # new ref + dbg('msg', "incoming subject ack stream $[3]\n"); + $busy{$f[2]} = $ref; # interlock + $ref->{lines} = []; + push @{$ref->{lines}}, ($ref->read_msg_body); + $ref->send_tranche($self); last SWITCH; } - if ($pcno == 31) { + if ($pcno == 31) { # acknowledge a tranche of lines + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + dbg('msg', "tranche ack stream $f[3]\n"); + $ref->send_tranche($self); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + } last SWITCH; } if ($pcno == 32) { # incoming EOM dbg('msg', "stream $f[3]: EOM received\n"); - my $ref = $work{"$f[1]$f[2]$f[3]"}; + my $ref = $work{"$f[2]$f[3]"}; if ($ref) { $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol - # store the file or message - # remove extraneous rubbish from the hash - # remove it from the work in progress vector - # stuff it on the msg queue - $ref->{msgno} = next_transno("Msgno") if !$ref->{file}; - $ref->store($ref->{lines}); - $ref->workclean; - delete $work{"$f[1]$f[2]$f[3]"}; - push @msg, $ref; + # store the file or message + # remove extraneous rubbish from the hash + # remove it from the work in progress vector + # stuff it on the msg queue + if ($ref->{lines} && @{$ref->{lines}} > 0) { # ignore messages with 0 lines + $ref->{msgno} = next_transno("Msgno") if !$ref->{file}; + push @{$ref->{gotit}}, $f[2]; # mark this up as being received + $ref->store($ref->{lines}); + add_dir($ref); + } + $ref->stop_msg($self); + queue_msg(); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } + queue_msg(); last SWITCH; } - if ($pcno == 33) { + if ($pcno == 33) { # acknowledge the end of message + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + if ($ref->{private}) { # remove it if it private and gone off site# + $ref->del_msg; + } else { + push @{$ref->{gotit}}, $f[2]; # mark this up as being received + $ref->store($ref->{lines}); # re- store the file + } + $ref->stop_msg($self); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + } + queue_msg(); last SWITCH; } @@ -172,7 +212,7 @@ sub process dbg('msg', "created directory $fn\n"); } my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0'); + my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0'); # forwarding variables $ref->{fromnode} = $f[1]; @@ -181,23 +221,36 @@ sub process $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s $ref->{file} = 1; - $work{"$f[1]$f[2]$stream"} = $ref; # store in work + $work{"$f[2]$stream"} = $ref; # store in work $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack last SWITCH; } + + if ($pcno == 42) { # abort transfer + dbg('msg', "stream $f[3]: abort received\n"); + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + $ref->stop_msg($self); + $ref = undef; + } + + last SWITCH; + } } } # store a message away on disc or whatever +# +# NOTE the second arg is a REFERENCE not a list sub store { my $ref = shift; my $lines = shift; # we only proceed if there are actually any lines in the file - if (@{$lines} == 0) { + if (!$lines || @{$lines} == 0) { return; } @@ -222,19 +275,20 @@ sub store my $fn = filename($ref->{msgno}); dbg('msg', "To be stored in $fn\n"); - + + # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem) my $fh = new FileHandle "$fn", "w"; if (defined $fh) { - print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$ref->{private}^$ref->{subject}^$ref->{origin}^$ref->{read}\n"; - print $fh "=== $ref->{fromnode}\n"; + my $rr = $ref->{rrreq} ? '1' : '0'; + my $priv = $ref->{private} ? '1': '0'; + print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{read}^$rr\n"; + print $fh "=== ", join('^', @{$ref->{gotit}}), "\n"; my $line; $ref->{size} = 0; foreach $line (@{$lines}) { $ref->{size} += (length $line) + 1; print $fh "$line\n"; } - $ref->{gotit} = []; - push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode}; $fh->close; dbg('msg', "msg $ref->{msgno} stored\n"); } else { @@ -251,8 +305,13 @@ sub del_msg # remove it from the active message list @msg = map { $_ != $self ? $_ : () } @msg; + # belt and braces (one day I will ask someone if this is REALLY necessary) + delete $self->{gotit}; + delete $self->{list}; + # remove the file unlink filename($self->{msgno}); + dbg('msg', "deleting $self->{msgno}\n"); } # read in a message header @@ -292,7 +351,7 @@ sub read_msg_header $line =~ s/^=== //o; $ref->{gotit} = []; @f = split /\^/, $line; - push @{$ref->{goit}}, @f; + push @{$ref->{gotit}}, @f; $ref->{size} = $size; close($file); @@ -323,6 +382,116 @@ sub read_msg_body return @out; } +# send a tranche of lines to the other end +sub send_tranche +{ + my ($self, $dxchan) = @_; + my @out; + my $to = $self->{tonode}; + my $from = $self->{fromnode}; + my $stream = $self->{stream}; + my $i; + + for ($i = 0; $i < $self->{linesreq} && $self->{count} < @{$self->{lines}}; $i++, $self->{count}++) { + push @out, DXProt::pc29($to, $from, $stream, ${$self->{lines}}[$self->{count}]); + } + push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq}; + $dxchan->send(@out); +} + + +# find a message to send out and start the ball rolling +sub queue_msg +{ + my $sort = shift; + my @nodelist = DXProt::get_all_ak1a(); + my $ref; + my $clref; + my $dxchan; + + # bat down the message list looking for one that needs to go off site and whose + # nearest node is not busy. + + dbg('msg', "queue msg ($sort)\n"); + foreach $ref (@msg) { + # firstly, is it private and unread? if so can I find the recipient + # in my cluster node list offsite? + if ($ref->{private}) { + if ($ref->{read} == 0) { + $clref = DXCluster->get($ref->{to}); + if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) { + $dxchan = $clref->{dxchan}; + $ref->start_msg($dxchan) if $clref && !get_busy($dxchan->call); + } + } + } elsif ($sort == undef) { + # otherwise we are dealing with a bulletin, compare the gotit list with + # the nodelist up above, if there are sites that haven't got it yet + # then start sending it - what happens when we get loops is anyone's + # guess, use (to, from, time, subject) tuple? + my $noderef; + foreach $noderef (@nodelist) { + next if $noderef->call eq $main::mycall; + next if grep { $_ eq $noderef->call } @{$ref->{gotit}}; + + # if we are here we have a node that doesn't have this message + $ref->start_msg($noderef) if !get_busy($noderef->call); + last; + } + } + + # if all the available nodes are busy then stop + last if @nodelist == scalar grep { get_busy($_->call) } @nodelist; + } +} + +# start the message off on its travels with a PC28 +sub start_msg +{ + my ($self, $dxchan) = @_; + + dbg('msg', "start msg $self->{msgno}\n"); + $self->{linesreq} = 5; + $self->{count} = 0; + $self->{tonode} = $dxchan->call; + $self->{fromnode} = $main::mycall; + $busy{$dxchan->call} = $self; + $work{"$self->{tonode}"} = $self; + $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); +} + +# get the ref of a busy node +sub get_busy +{ + my $call = shift; + return $busy{$call}; +} + +# get the busy queue +sub get_all_busy +{ + return values %busy; +} + +# get the forwarding queue +sub get_fwq +{ + return values %work; +} + +# stop a message from continuing, clean it out, unlock interlocks etc +sub stop_msg +{ + my ($self, $dxchan) = @_; + my $node = $dxchan->call; + + dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n"); + delete $work{$node}; + delete $work{"$node$self->{stream}"}; + $self->workclean; + delete $busy{$node}; +} + # get a new transaction number from the file specified sub next_transno { @@ -366,12 +535,20 @@ sub init $ref = read_msg_header("$msgdir/$_"); next if !$ref; - # add the clusters that have this - push @msg, $ref; + # add the message to the available queue + add_dir($ref); } } +# add the message to the directory listing +sub add_dir +{ + my $ref = shift; + confess "tried to add a non-ref to the msg directory" if !ref $ref; + push @msg, $ref; +} + # return all the current messages sub get_all { @@ -426,6 +603,68 @@ sub AUTOLOAD @_ ? $self->{$name} = shift : $self->{$name} ; } +sub do_send_stuff +{ + my $self = shift; + my $line = shift; + my @out; + + if ($self->state eq 'send1') { +# $DB::single = 1; + confess "local var gone missing" if !ref $self->{loc}; + my $loc = $self->{loc}; + $loc->{subject} = $line; + $loc->{lines} = []; + $self->state('sendbody'); + #push @out, $self->msg('sendbody'); + push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit"; + } elsif ($self->state eq 'sendbody') { + confess "local var gone missing" if !ref $self->{loc}; + my $loc = $self->{loc}; + if ($line eq "\032" || uc $line eq "/EX") { + my $to; + + if (@{$loc->{lines}} > 0) { + foreach $to (@{$loc->{to}}) { + my $ref; + my $systime = $main::systime; + my $mycall = $main::mycall; + $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'), + uc $to, + $self->call, + $systime, + $loc->{private}, + $loc->{subject}, + $mycall, + $loc->{rrreq}); + $ref->store($loc->{lines}); + $ref->add_dir(); + #push @out, $self->msg('sendsent', $to); + push @out, "msgno $ref->{msgno} sent to $to"; + } + } + delete $loc->{lines}; + delete $loc->{to}; + delete $self->{loc}; + $self->state('prompt'); + $self->func(undef); + DXMsg::queue_msg(); + } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") { + #push @out, $self->msg('sendabort'); + push @out, "aborted"; + delete $loc->{lines}; + delete $loc->{to}; + delete $self->{loc}; + $self->func(undef); + $self->state('prompt'); + } else { + + # i.e. it ain't and end or abort, therefore store the line + push @{$loc->{lines}}, $line; + } + } + return (1, @out); +} 1;