X-Git-Url: http://www.dxcluster.org/gitweb/gitweb.cgi?p=spider.git;a=blobdiff_plain;f=perl%2FDXMsg.pm;h=e949c6827223b4b5f2b8c8079854f6051c0748ce;hp=ea4a0791d8ca7ba6c121f59fec7e0fe5554d8305;hb=78ed3f6025103ec1c47c90725e37b417647d83c8;hpb=2663e17b1d546b5b6068825f5964bc684e6131cb diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index ea4a0791..e949c682 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -23,11 +23,33 @@ use FileHandle; use Carp; use strict; -use vars qw($stream %work @msg $msgdir $msgnofn); +use vars qw(%work @msg $msgdir %valid); %work = (); # outstanding jobs @msg = (); # messages we have -$msgdir = "$main::data/msg"; # directory contain the msgs +$msgdir = "$main::root/msg"; # directory contain the msgs + +%valid = ( + fromnode => '9,From Node', + tonode => '9,To Node', + to => '0,To', + from => '0,From', + t => '0,Msg Time,cldatetime', + private => '9,Private', + subject => '0,Subject', + linesreq => '0,Lines per Gob', + rrreq => '9,Read Confirm', + origin => '0,Origin', + lines => '5,Data', + stream => '9,Stream No', + count => '9,Gob Linecnt', + file => '9,File?,yesno', + gotit => '9,Got it Nodes,parray', + lines => '9,Lines,parray', + read => '9,Times read', + size => '0,Size', + msgno => '0,Msgno', +); # allocate a new object # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper @@ -35,19 +57,15 @@ sub alloc { my $pkg = shift; my $self = bless {}, $pkg; - $self->{fromnode} = shift; - $self->{tonode} = shift; + $self->{msgno} = shift; $self->{to} = shift; $self->{from} = shift; $self->{t} = shift; $self->{private} = shift; $self->{subject} = shift; - $self->{linesreq} = shift; # this the number of lines to send or receive between PC31s - $self->{rrreq} = shift; # a read receipt is required $self->{origin} = shift; - $self->{stream} = shift; - $self->{lines} = []; - + $self->{read} = shift; + return $self; } @@ -57,7 +75,11 @@ sub workclean delete $ref->{lines}; delete $ref->{linesreq}; delete $ref->{tonode}; + delete $ref->{fromnode}; delete $ref->{stream}; + delete $ref->{lines}; + delete $ref->{file}; + delete $ref->{count}; } sub process @@ -70,11 +92,18 @@ sub process if ($pcno == 28) { # incoming message my $t = cltounix($f[5], $f[6]); my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($f[1], $f[2], $f[3], $f[4], $t, $f[7], $f[8], $f[10], $f[11], $f[13], $stream); + my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0'); + + # fill in various forwarding state variables + $ref->{fromnode} = $f[2]; + $ref->{tonode} = $f[1]; + $ref->{rrreq} = $f[11]; + $ref->{linesreq} = $f[10]; + $ref->{stream} = $stream; + $ref->{count} = 0; # no of lines between PC31s dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n"); $work{"$f[1]$f[2]$stream"} = $ref; # store in work - $self->send(DXProt::pc30($f[2], $f[1], $stream)); - $ref->{count} = 0; # no of lines between PC31s + $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack last SWITCH; } @@ -105,7 +134,8 @@ sub process my $ref = $work{"$f[1]$f[2]$f[3]"}; if ($ref) { $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it - $ref->store(); # store it (whatever that may mean) + $ref->store($ref->{lines}); # store it (whatever that may mean) + $ref->workclean; delete $work{"$f[1]$f[2]$f[3]"}; # remove the reference from the work vector } last SWITCH; @@ -120,7 +150,7 @@ sub process $f[3] =~ s/\.//og; # remove dots $f[3] = lc $f[3]; # to lower case; dbg('msg', "incoming file $f[3]\n"); - last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|data\/msg)\//; # prevent access to executables + last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|msg)\//; # prevent access to executables # create any directories my @part = split /\//, $f[3]; @@ -134,11 +164,17 @@ sub process dbg('msg', "created directory $fn\n"); } my $stream = next_transno($f[2]); - my $ref = DXMsg->alloc($f[1], $f[2], "$main::root/$f[3]", undef, time, !$f[4], undef, $f[5], 0, ' ', $stream); + my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0'); + + # forwarding variables + $ref->{fromnode} = $f[1]; + $ref->{tonode} = $f[2]; + $ref->{linesreq} = $f[5]; + $ref->{stream} = $stream; + $ref->{count} = 0; # no of lines between PC31s $ref->{file} = 1; $work{"$f[1]$f[2]$stream"} = $ref; # store in work - $self->send(DXProt::pc30($f[2], $f[1], $stream)); - $ref->{count} = 0; # no of lines between PC31s + $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack last SWITCH; } @@ -150,10 +186,10 @@ sub process sub store { my $ref = shift; + my $lines = shift; # we only proceed if there are actually any lines in the file - if (@{$ref->{lines}} == 0) { - delete $ref->{lines}; + if (@{$lines} == 0) { return; } @@ -163,7 +199,7 @@ sub store my $fh = new FileHandle "$ref->{to}", "w"; if (defined $fh) { my $line; - foreach $line (@{$ref->{lines}}) { + foreach $line (@{$lines}) { print $fh "$line\n"; } $fh->close; @@ -171,26 +207,29 @@ sub store } else { confess "can't open file $ref->{to} $!"; } +# push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode}; } else { # a normal message # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol - my $msgno = next_transno("msgno"); + my $msgno = next_transno("Msgno"); # attempt to open the message file - my $fn = sprintf "$msgdir/m%06d", $msgno; + my $fn = filename($msgno); dbg('msg', "To be stored in $fn\n"); my $fh = new FileHandle "$fn", "w"; if (defined $fh) { - print $fh "=== $ref->{to}^$ref->{from}^$ref->{private}^$ref->{subject}^$ref->{origin}\n"; + print $fh "=== $msgno^$ref->{to}^$ref->{from}^$ref->{t}^$ref->{private}^$ref->{subject}^$ref->{origin}^$ref->{read}\n"; print $fh "=== $ref->{fromnode}\n"; my $line; - foreach $line (@{$ref->{lines}}) { - $ref->{size} += length $line + 1; + foreach $line (@{$lines}) { + $ref->{size} += (length $line) + 1; print $fh "$line\n"; } - $ref->workclean(); + $ref->{gotit} = []; + $ref->{msgno} = $msgno; + push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode}; push @msg, $ref; # add this message to the incore message list $fh->close; dbg('msg', "msg $msgno stored\n"); @@ -200,6 +239,86 @@ sub store } } +# delete a message +sub del_msg +{ + my $self = shift; + + # remove it from the active message list + @msg = map { $_ != $self ? $_ : () } @msg; + + # remove the file + unlink filename($self->{msgno}); +} + +# read in a message header +sub read_msg_header +{ + my $fn = shift; + my $file; + my $line; + my $ref; + my @f; + my $size; + + $file = new FileHandle; + if (!open($file, $fn)) { + print "Error reading $fn $!\n"; + return undef; + } + $size = -s $fn; + $line = <$file>; # first line + chomp $line; + $size -= length $line; + if (! $line =~ /^===/o) { + print "corrupt first line in $fn ($line)\n"; + return undef; + } + $line =~ s/^=== //o; + @f = split /\^/, $line; + $ref = DXMsg->alloc(@f); + + $line = <$file>; # second line + chomp $line; + $size -= length $line; + if (! $line =~ /^===/o) { + print "corrupt second line in $fn ($line)\n"; + return undef; + } + $line =~ s/^=== //o; + $ref->{gotit} = []; + @f = split /\^/, $line; + push @{$ref->{goit}}, @f; + $ref->{size} = $size; + + close($file); + + return $ref; +} + +# read in a message header +sub read_msg_body +{ + my $self = shift; + my $msgno = $self->{msgno}; + my $file; + my $line; + my $fn = filename($msgno); + my @out; + + $file = new FileHandle; + if (!open($file, $fn)) { + print "Error reading $fn $!\n"; + return undef; + } + chomp (@out = <$file>); + close($file); + + shift @out if $out[0] =~ /^=== \d+\^/; + shift @out if $out[0] =~ /^=== \d+\^/; + return @out; +} + # get a new transaction number from the file specified sub next_transno { @@ -224,12 +343,75 @@ sub next_transno return $msgno; } -# initialise the message 'system' +# initialise the message 'system', read in all the message headers sub init { + my $dir = new FileHandle; + my @dir; + my $ref; + # read in the directory + opendir($dir, $msgdir) or confess "can't open $msgdir $!"; + @dir = readdir($dir); + closedir($dir); + + for (sort @dir) { + next if /^\./o; + next if ! /^m\d+/o; + + $ref = read_msg_header("$msgdir/$_"); + next if !$ref; + + # add the clusters that have this + push @msg, $ref; + + } +} + +# return all the current messages +sub get_all +{ + return @msg; +} + +# return the official filename for a message no +sub filename +{ + return sprintf "$msgdir/m%06d", shift; } +# +# return a list of valid elements +# + +sub fields +{ + return keys(%valid); +} + +# +# return a prompt for a field +# + +sub field_prompt +{ + my ($self, $ele) = @_; + return $valid{$ele}; +} + +no strict; +sub AUTOLOAD +{ + my $self = shift; + my $name = $AUTOLOAD; + return if $name =~ /::DESTROY$/; + $name =~ s/.*:://o; + + confess "Non-existant field '$AUTOLOAD'" if !$valid{$name}; + @_ ? $self->{$name} = shift : $self->{$name} ; +} + + 1; __END__