X-Git-Url: http://www.dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=e9097d9ca4f8fa159b2ecce5092e08689a8d8a3f;hb=b5b58db69484da5554b4f7e10b813d13e8cf16cb;hp=ea4a0791d8ca7ba6c121f59fec7e0fe5554d8305;hpb=84505457c5b3757715d97a63acd792b28fc1841a;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index ea4a0791..e9097d9c 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -23,11 +23,34 @@ use FileHandle; use Carp; use strict; -use vars qw($stream %work @msg $msgdir $msgnofn); +use vars qw(%work @msg $msgdir %valid %busy); %work = (); # outstanding jobs @msg = (); # messages we have -$msgdir = "$main::data/msg"; # directory contain the msgs +%busy = (); # station interlocks +$msgdir = "$main::root/msg"; # directory contain the msgs + +%valid = ( + fromnode => '9,From Node', + tonode => '9,To Node', + to => '0,To', + from => '0,From', + t => '0,Msg Time,cldatetime', + private => '9,Private', + subject => '0,Subject', + linesreq => '0,Lines per Gob', + rrreq => '9,Read Confirm', + origin => '0,Origin', + lines => '5,Data', + stream => '9,Stream No', + count => '9,Gob Linecnt', + file => '9,File?,yesno', + gotit => '9,Got it Nodes,parray', + lines => '9,Lines,parray', + read => '9,Times read', + size => '0,Size', + msgno => '0,Msgno', +); # allocate a new object # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper @@ -35,19 +58,17 @@ sub alloc { my $pkg = shift; my $self = bless {}, $pkg; - $self->{fromnode} = shift; - $self->{tonode} = shift; + $self->{msgno} = shift; $self->{to} = shift; $self->{from} = shift; $self->{t} = shift; $self->{private} = shift; $self->{subject} = shift; - $self->{linesreq} = shift; # this the number of lines to send or receive between PC31s - $self->{rrreq} = shift; # a read receipt is required $self->{origin} = shift; - $self->{stream} = shift; - $self->{lines} = []; - + $self->{read} = shift; + $self->{rrreq} = shift; + $self->{gotit} = []; + return $self; } @@ -57,7 +78,11 @@ sub workclean delete $ref->{lines}; delete $ref->{linesreq}; delete $ref->{tonode}; + delete $ref->{fromnode}; delete $ref->{stream}; + delete $ref->{lines}; + delete $ref->{file}; + delete $ref->{count}; } sub process @@ -70,16 +95,24 @@ sub process if ($pcno == 28) { # incoming message my $t = cltounix($f[5], $f[6]); my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($f[1], $f[2], $f[3], $f[4], $t, $f[7], $f[8], $f[10], $f[11], $f[13], $stream); - dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n"); - $work{"$f[1]$f[2]$stream"} = $ref; # store in work - $self->send(DXProt::pc30($f[2], $f[1], $stream)); + my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]); + + # fill in various forwarding state variables + $ref->{fromnode} = $f[2]; + $ref->{tonode} = $f[1]; + $ref->{rrreq} = $f[11]; + $ref->{linesreq} = $f[10]; + $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s + dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n"); + $work{"$f[2]$stream"} = $ref; # store in work + $busy{$f[2]} = $ref; # set interlock + $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack last SWITCH; } if ($pcno == 29) { # incoming text - my $ref = $work{"$f[1]$f[2]$f[3]"}; + my $ref = $work{"$f[2]$f[3]"}; if ($ref) { push @{$ref->{lines}}, $f[4]; $ref->{count}++; @@ -92,26 +125,72 @@ sub process last SWITCH; } - if ($pcno == 30) { + if ($pcno == 30) { # this is a incoming subject ack + my $ref = $work{$f[2]}; # note no stream at this stage + delete $work{$f[2]}; + $ref->{stream} = $f[3]; + $ref->{count} = 0; + $ref->{linesreq} = 5; + $work{"$f[2]$f[3]"} = $ref; # new ref + dbg('msg', "incoming subject ack stream $f[3]\n"); + $busy{$f[2]} = $ref; # interlock + $ref->{lines} = []; + push @{$ref->{lines}}, ($ref->read_msg_body); + $ref->send_tranche($self); last SWITCH; } - if ($pcno == 31) { + if ($pcno == 31) { # acknowledge a tranche of lines + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + dbg('msg', "tranche ack stream $f[3]\n"); + $ref->send_tranche($self); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + } last SWITCH; } if ($pcno == 32) { # incoming EOM dbg('msg', "stream $f[3]: EOM received\n"); - my $ref = $work{"$f[1]$f[2]$f[3]"}; + my $ref = $work{"$f[2]$f[3]"}; if ($ref) { $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it - $ref->store(); # store it (whatever that may mean) - delete $work{"$f[1]$f[2]$f[3]"}; # remove the reference from the work vector + + # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol + # store the file or message + # remove extraneous rubbish from the hash + # remove it from the work in progress vector + # stuff it on the msg queue + if ($ref->{lines} && @{$ref->{lines}} > 0) { # ignore messages with 0 lines + $ref->{msgno} = next_transno("Msgno") if !$ref->{file}; + push @{$ref->{gotit}}, $f[2]; # mark this up as being received + $ref->store($ref->{lines}); + add_dir($ref); + } + $ref->stop_msg($self); + queue_msg(); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } + queue_msg(); last SWITCH; } - if ($pcno == 33) { + if ($pcno == 33) { # acknowledge the end of message + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + if ($ref->{private}) { # remove it if it private and gone off site# + $ref->del_msg; + } else { + push @{$ref->{gotit}}, $f[2]; # mark this up as being received + $ref->store($ref->{lines}); # re- store the file + } + $ref->stop_msg($self); + } else { + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + } + queue_msg(); last SWITCH; } @@ -120,7 +199,7 @@ sub process $f[3] =~ s/\.//og; # remove dots $f[3] = lc $f[3]; # to lower case; dbg('msg', "incoming file $f[3]\n"); - last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|data\/msg)\//; # prevent access to executables + last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|msg)\//; # prevent access to executables # create any directories my @part = split /\//, $f[3]; @@ -134,11 +213,28 @@ sub process dbg('msg', "created directory $fn\n"); } my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($f[1], $f[2], "$main::root/$f[3]", undef, time, !$f[4], undef, $f[5], 0, ' ', $stream); - $ref->{file} = 1; - $work{"$f[1]$f[2]$stream"} = $ref; # store in work - $self->send(DXProt::pc30($f[2], $f[1], $stream)); + my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0'); + + # forwarding variables + $ref->{fromnode} = $f[1]; + $ref->{tonode} = $f[2]; + $ref->{linesreq} = $f[5]; + $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s + $ref->{file} = 1; + $work{"$f[2]$stream"} = $ref; # store in work + $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack + + last SWITCH; + } + + if ($pcno == 42) { # abort transfer + dbg('msg', "stream $f[3]: abort received\n"); + my $ref = $work{"$f[2]$f[3]"}; + if ($ref) { + $ref->stop_msg($self); + $ref = undef; + } last SWITCH; } @@ -147,13 +243,15 @@ sub process # store a message away on disc or whatever +# +# NOTE the second arg is a REFERENCE not a list sub store { my $ref = shift; + my $lines = shift; # we only proceed if there are actually any lines in the file - if (@{$ref->{lines}} == 0) { - delete $ref->{lines}; + if (!$lines || @{$lines} == 0) { return; } @@ -163,7 +261,7 @@ sub store my $fh = new FileHandle "$ref->{to}", "w"; if (defined $fh) { my $line; - foreach $line (@{$ref->{lines}}) { + foreach $line (@{$lines}) { print $fh "$line\n"; } $fh->close; @@ -171,35 +269,230 @@ sub store } else { confess "can't open file $ref->{to} $!"; } +# push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode}; } else { # a normal message - # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol - my $msgno = next_transno("msgno"); - # attempt to open the message file - my $fn = sprintf "$msgdir/m%06d", $msgno; + my $fn = filename($ref->{msgno}); dbg('msg', "To be stored in $fn\n"); - + + # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem) my $fh = new FileHandle "$fn", "w"; if (defined $fh) { - print $fh "=== $ref->{to}^$ref->{from}^$ref->{private}^$ref->{subject}^$ref->{origin}\n"; - print $fh "=== $ref->{fromnode}\n"; + my $rr = $ref->{rrreq} ? '1' : '0'; + my $priv = $ref->{private} ? '1': '0'; + print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{read}^$rr\n"; + print $fh "=== ", join('^', @{$ref->{gotit}}), "\n"; my $line; - foreach $line (@{$ref->{lines}}) { - $ref->{size} += length $line + 1; + $ref->{size} = 0; + foreach $line (@{$lines}) { + $ref->{size} += (length $line) + 1; print $fh "$line\n"; } - $ref->workclean(); - push @msg, $ref; # add this message to the incore message list $fh->close; - dbg('msg', "msg $msgno stored\n"); + dbg('msg', "msg $ref->{msgno} stored\n"); } else { confess "can't open msg file $fn $!"; } } } +# delete a message +sub del_msg +{ + my $self = shift; + + # remove it from the active message list + @msg = map { $_ != $self ? $_ : () } @msg; + + # belt and braces (one day I will ask someone if this is REALLY necessary) + delete $self->{gotit}; + delete $self->{list}; + + # remove the file + unlink filename($self->{msgno}); + dbg('msg', "deleting $self->{msgno}\n"); +} + +# read in a message header +sub read_msg_header +{ + my $fn = shift; + my $file; + my $line; + my $ref; + my @f; + my $size; + + $file = new FileHandle; + if (!open($file, $fn)) { + print "Error reading $fn $!\n"; + return undef; + } + $size = -s $fn; + $line = <$file>; # first line + chomp $line; + $size -= length $line; + if (! $line =~ /^===/o) { + print "corrupt first line in $fn ($line)\n"; + return undef; + } + $line =~ s/^=== //o; + @f = split /\^/, $line; + $ref = DXMsg->alloc(@f); + + $line = <$file>; # second line + chomp $line; + $size -= length $line; + if (! $line =~ /^===/o) { + print "corrupt second line in $fn ($line)\n"; + return undef; + } + $line =~ s/^=== //o; + $ref->{gotit} = []; + @f = split /\^/, $line; + push @{$ref->{gotit}}, @f; + $ref->{size} = $size; + + close($file); + + return $ref; +} + +# read in a message header +sub read_msg_body +{ + my $self = shift; + my $msgno = $self->{msgno}; + my $file; + my $line; + my $fn = filename($msgno); + my @out; + + $file = new FileHandle; + if (!open($file, $fn)) { + print "Error reading $fn $!\n"; + return undef; + } + chomp (@out = <$file>); + close($file); + + shift @out if $out[0] =~ /^=== /; + shift @out if $out[0] =~ /^=== /; + return @out; +} + +# send a tranche of lines to the other end +sub send_tranche +{ + my ($self, $dxchan) = @_; + my @out; + my $to = $self->{tonode}; + my $from = $self->{fromnode}; + my $stream = $self->{stream}; + my $i; + + for ($i = 0; $i < $self->{linesreq} && $self->{count} < @{$self->{lines}}; $i++, $self->{count}++) { + push @out, DXProt::pc29($to, $from, $stream, ${$self->{lines}}[$self->{count}]); + } + push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq}; + $dxchan->send(@out); +} + + +# find a message to send out and start the ball rolling +sub queue_msg +{ + my $sort = shift; + my @nodelist = DXProt::get_all_ak1a(); + my $ref; + my $clref; + my $dxchan; + + # bat down the message list looking for one that needs to go off site and whose + # nearest node is not busy. + + dbg('msg', "queue msg ($sort)\n"); + foreach $ref (@msg) { + # firstly, is it private and unread? if so can I find the recipient + # in my cluster node list offsite? + if ($ref->{private}) { + if ($ref->{read} == 0) { + $clref = DXCluster->get($ref->{to}); + if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) { + $dxchan = $clref->{dxchan}; + $ref->start_msg($dxchan) if $clref && !get_busy($dxchan->call); + } + } + } elsif ($sort == undef) { + # otherwise we are dealing with a bulletin, compare the gotit list with + # the nodelist up above, if there are sites that haven't got it yet + # then start sending it - what happens when we get loops is anyone's + # guess, use (to, from, time, subject) tuple? + my $noderef; + foreach $noderef (@nodelist) { + next if $noderef->call eq $main::mycall; + next if grep { $_ eq $noderef->call } @{$ref->{gotit}}; + + # if we are here we have a node that doesn't have this message + $ref->start_msg($noderef) if !get_busy($noderef->call); + last; + } + } + + # if all the available nodes are busy then stop + last if @nodelist == scalar grep { get_busy($_->call) } @nodelist; + } +} + +# start the message off on its travels with a PC28 +sub start_msg +{ + my ($self, $dxchan) = @_; + + dbg('msg', "start msg $self->{msgno}\n"); + $self->{linesreq} = 5; + $self->{count} = 0; + $self->{tonode} = $dxchan->call; + $self->{fromnode} = $main::mycall; + $busy{$dxchan->call} = $self; + $work{"$self->{tonode}"} = $self; + $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); +} + +# get the ref of a busy node +sub get_busy +{ + my $call = shift; + return $busy{$call}; +} + +# get the busy queue +sub get_all_busy +{ + return values %busy; +} + +# get the forwarding queue +sub get_fwq +{ + return values %work; +} + +# stop a message from continuing, clean it out, unlock interlocks etc +sub stop_msg +{ + my ($self, $dxchan) = @_; + my $node = $dxchan->call; + + dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n"); + delete $work{$node}; + delete $work{"$node$self->{stream}"}; + $self->workclean; + delete $busy{$node}; +} + # get a new transaction number from the file specified sub next_transno { @@ -224,10 +517,157 @@ sub next_transno return $msgno; } -# initialise the message 'system' +# initialise the message 'system', read in all the message headers sub init { + my $dir = new FileHandle; + my @dir; + my $ref; + + # read in the directory + opendir($dir, $msgdir) or confess "can't open $msgdir $!"; + @dir = readdir($dir); + closedir($dir); + + for (sort @dir) { + next if /^\./o; + next if ! /^m\d+/o; + + $ref = read_msg_header("$msgdir/$_"); + next if !$ref; + + # add the message to the available queue + add_dir($ref); + + } +} + +# add the message to the directory listing +sub add_dir +{ + my $ref = shift; + confess "tried to add a non-ref to the msg directory" if !ref $ref; + push @msg, $ref; +} +# return all the current messages +sub get_all +{ + return @msg; +} + +# get a particular message +sub get +{ + my $msgno = shift; + for (@msg) { + return $_ if $_->{msgno} == $msgno; + last if $_->{msgno} > $msgno; + } + return undef; +} + +# return the official filename for a message no +sub filename +{ + return sprintf "$msgdir/m%06d", shift; +} + +# +# return a list of valid elements +# + +sub fields +{ + return keys(%valid); +} + +# +# return a prompt for a field +# + +sub field_prompt +{ + my ($self, $ele) = @_; + return $valid{$ele}; +} + +# +# send a message state machine +sub do_send_stuff +{ + my $self = shift; + my $line = shift; + my @out; + + if ($self->state eq 'send1') { +# $DB::single = 1; + confess "local var gone missing" if !ref $self->{loc}; + my $loc = $self->{loc}; + $loc->{subject} = $line; + $loc->{lines} = []; + $self->state('sendbody'); + #push @out, $self->msg('sendbody'); + push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit"; + } elsif ($self->state eq 'sendbody') { + confess "local var gone missing" if !ref $self->{loc}; + my $loc = $self->{loc}; + if ($line eq "\032" || uc $line eq "/EX") { + my $to; + + if (@{$loc->{lines}} > 0) { + foreach $to (@{$loc->{to}}) { + my $ref; + my $systime = $main::systime; + my $mycall = $main::mycall; + $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'), + uc $to, + $self->call, + $systime, + $loc->{private}, + $loc->{subject}, + $mycall, + '0', + $loc->{rrreq}); + $ref->store($loc->{lines}); + $ref->add_dir(); + #push @out, $self->msg('sendsent', $to); + push @out, "msgno $ref->{msgno} sent to $to"; + } + } + delete $loc->{lines}; + delete $loc->{to}; + delete $self->{loc}; + $self->state('prompt'); + $self->func(undef); + DXMsg::queue_msg(); + } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") { + #push @out, $self->msg('sendabort'); + push @out, "aborted"; + delete $loc->{lines}; + delete $loc->{to}; + delete $self->{loc}; + $self->func(undef); + $self->state('prompt'); + } else { + + # i.e. it ain't and end or abort, therefore store the line + push @{$loc->{lines}}, $line; + } + } + return (1, @out); +} + +no strict; +sub AUTOLOAD +{ + my $self = shift; + my $name = $AUTOLOAD; + return if $name =~ /::DESTROY$/; + $name =~ s/.*:://o; + + confess "Non-existant field '$AUTOLOAD'" if !$valid{$name}; + @_ ? $self->{$name} = shift : $self->{$name} ; } 1;