47a8e67466e64d9d9c6a3dbbd854669b775c0714
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8
9
10 package DXMsg;
11
12 @ISA = qw(DXProt DXChannel);
13
14 use DXUtil;
15 use DXChannel;
16 use DXUser;
17 use DXM;
18 use DXCluster;
19 use DXProtVars;
20 use DXProtout;
21 use DXDebug;
22 use FileHandle;
23 use Carp;
24
25 use strict;
26 use vars qw(%work @msg $msgdir %valid);
27
28 %work = ();                # outstanding jobs
29 @msg = ();                 # messages we have
30 $msgdir = "$main::root/msg";              # directory contain the msgs
31
32 %valid = (
33   fromnode => '9,From Node',
34   tonode => '9,To Node',
35   to => '0,To',
36   from => '0,From',
37   t => '0,Msg Time,cldatetime',
38   private => '9,Private',
39   subject => '0,Subject',
40   linesreq => '0,Lines per Gob',
41   rrreq => '9,Read Confirm',
42   origin => '0,Origin',
43   lines => '5,Data',
44   stream => '9,Stream No',
45   count => '9,Gob Linecnt',
46   file => '9,File?,yesno',
47   gotit => '9,Got it Nodes,parray',
48   lines => '9,Lines,parray',
49   read => '9,Times read',
50   size => '0,Size',
51   msgno => '0,Msgno',
52 );
53
54 # allocate a new object
55 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
56 sub alloc                  
57 {
58   my $pkg = shift;
59   my $self = bless {}, $pkg;
60   $self->{msgno} = shift;
61   $self->{to} = shift;
62   $self->{from} = shift;
63   $self->{t} = shift;
64   $self->{private} = shift;
65   $self->{subject} = shift;
66   $self->{origin} = shift;
67   $self->{read} = shift;
68     
69   return $self;
70 }
71
72 sub workclean
73 {
74   my $ref = shift;
75   delete $ref->{lines};
76   delete $ref->{linesreq};
77   delete $ref->{tonode};
78   delete $ref->{fromnode};
79   delete $ref->{stream};
80   delete $ref->{lines};
81   delete $ref->{file};
82   delete $ref->{count};
83 }
84
85 sub process
86 {
87   my ($self, $line) = @_;
88   my @f = split /[\^\~]/, $line;
89   my ($pcno) = $f[0] =~ /^PC(\d\d)/;          # just get the number
90   
91   SWITCH: {
92     if ($pcno == 28) {                        # incoming message
93           my $t = cltounix($f[5], $f[6]);
94           my $stream = next_transno($f[2]);
95           my $ref = DXMsg->alloc($stream, $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0');
96           
97           # fill in various forwarding state variables
98       $ref->{fromnode} = $f[2];
99       $ref->{tonode} = $f[1];
100           $ref->{rrreq} = $f[11];
101           $ref->{linesreq} = $f[10];
102           $ref->{stream} = $stream;
103           $ref->{count} = 0;                      # no of lines between PC31s
104           dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
105       $work{"$f[1]$f[2]$stream"} = $ref;         # store in work
106           $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
107           last SWITCH;
108         }
109         
110     if ($pcno == 29) {                        # incoming text
111           my $ref = $work{"$f[1]$f[2]$f[3]"};
112           if ($ref) {
113             push @{$ref->{lines}}, $f[4];
114                 $ref->{count}++;
115                 if ($ref->{count} >= $ref->{linesreq}) {
116                   $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
117                   dbg('msg', "stream $f[3]: $ref->{linereq} lines received\n");
118                   $ref->{count} = 0;
119                 }
120           }
121           last SWITCH;
122         }
123         
124     if ($pcno == 30) {
125           last SWITCH;
126         }
127         
128     if ($pcno == 31) {
129           last SWITCH;
130         }
131         
132     if ($pcno == 32) {                         # incoming EOM
133           dbg('msg', "stream $f[3]: EOM received\n");
134           my $ref = $work{"$f[1]$f[2]$f[3]"};
135           if ($ref) {
136             $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it
137                 
138                 # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
139                 my $msgno = next_transno("Msgno") if !$ref->{file};
140
141                 $ref->store($ref->{lines});                    # store it (whatever that may mean)
142                 $ref->workclean;
143                 delete $work{"$f[1]$f[2]$f[3]"};       # remove the reference from the work vector
144                 push @msg, $ref;           # add this message to the incore message list
145           }
146           last SWITCH;
147         }
148         
149     if ($pcno == 33) {
150           last SWITCH;
151         }
152         
153         if ($pcno == 40) {                         # this is a file request
154           $f[3] =~ s/\\/\//og;                     # change the slashes
155           $f[3] =~ s/\.//og;                       # remove dots
156           $f[3] = lc $f[3];                        # to lower case;
157           dbg('msg', "incoming file $f[3]\n");
158           last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|msg)\//;    # prevent access to executables
159           
160           # create any directories
161           my @part = split /\//, $f[3];
162           my $part;
163           my $fn = "$main::root";
164           pop @part;         # remove last part
165           foreach $part (@part) {
166             $fn .= "/$part";
167                 next if -e $fn;
168             last SWITCH if !mkdir $fn, 0777;
169         dbg('msg', "created directory $fn\n");
170           }
171           my $stream = next_transno($f[2]);
172           my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0');
173           
174           # forwarding variables
175       $ref->{fromnode} = $f[1];
176       $ref->{tonode} = $f[2];
177           $ref->{linesreq} = $f[5];
178           $ref->{stream} = $stream;
179           $ref->{count} = 0;                      # no of lines between PC31s
180           $ref->{file} = 1;
181       $work{"$f[1]$f[2]$stream"} = $ref;         # store in work
182           $self->send(DXProt::pc30($f[2], $f[1], $stream));  # send ack 
183           
184           last SWITCH;
185         }
186   }
187 }
188
189
190 # store a message away on disc or whatever
191 sub store
192 {
193   my $ref = shift;
194   my $lines = shift;
195   
196   # we only proceed if there are actually any lines in the file
197   if (@{$lines} == 0) {
198         return;
199   }
200   
201   if ($ref->{file}) {   # a file
202     dbg('msg', "To be stored in $ref->{to}\n");
203   
204     my $fh = new FileHandle "$ref->{to}", "w";
205         if (defined $fh) {
206           my $line;
207           foreach $line (@{$lines}) {
208                 print $fh "$line\n";
209           }
210           $fh->close;
211           dbg('msg', "file $ref->{to} stored\n");
212     } else {
213       confess "can't open file $ref->{to} $!";  
214     }
215 #       push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode};
216   } else {              # a normal message
217
218     # attempt to open the message file
219         my $fn = filename($ref->{msgno});
220
221     dbg('msg', "To be stored in $fn\n");
222   
223     my $fh = new FileHandle "$fn", "w";
224         if (defined $fh) {
225       print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$ref->{private}^$ref->{subject}^$ref->{origin}^$ref->{read}\n";
226           print $fh "=== $ref->{fromnode}\n";
227           my $line;
228           foreach $line (@{$lines}) {
229         $ref->{size} += (length $line) + 1;
230                 print $fh "$line\n";
231           }
232           $ref->{gotit} = [];
233           push @{$ref->{gotit}}, $ref->{fromnode} if $ref->{fromnode};
234           $fh->close;
235           dbg('msg', "msg $ref->{msgno} stored\n");
236     } else {
237       confess "can't open msg file $fn $!";  
238     }
239   }
240 }
241
242 # delete a message
243 sub del_msg
244 {
245   my $self = shift;
246
247   # remove it from the active message list
248   @msg = map { $_ != $self ? $_ : () } @msg;
249   
250   # remove the file
251   unlink filename($self->{msgno});
252 }
253
254 # read in a message header
255 sub read_msg_header
256
257   my $fn = shift;
258   my $file;
259   my $line;
260   my $ref;
261   my @f;
262   my $size;
263
264   $file = new FileHandle;
265   if (!open($file, $fn)) {
266     print "Error reading $fn $!\n";
267     return undef;
268   }
269   $size = -s $fn;
270   $line = <$file>;       # first line
271   chomp $line;
272   $size -= length $line;
273   if (! $line =~ /^===/o) {
274     print "corrupt first line in $fn ($line)\n";
275     return undef;
276   }
277   $line =~ s/^=== //o;
278   @f = split /\^/, $line;
279   $ref = DXMsg->alloc(@f);
280   
281   $line = <$file>;       # second line
282   chomp $line;
283   $size -= length $line;
284   if (! $line =~ /^===/o) {
285     print "corrupt second line in $fn ($line)\n";
286     return undef;
287   }
288   $line =~ s/^=== //o;
289   $ref->{gotit} = [];
290   @f = split /\^/, $line;
291   push @{$ref->{goit}}, @f;
292   $ref->{size} = $size;
293  
294   close($file);
295   
296   return $ref;
297 }
298
299 # read in a message header
300 sub read_msg_body
301 {
302   my $self = shift;
303   my $msgno = $self->{msgno};
304   my $file;
305   my $line;
306   my $fn = filename($msgno);
307   my @out;
308
309   $file = new FileHandle;
310   if (!open($file, $fn)) {
311     print "Error reading $fn $!\n";
312     return undef;
313   }
314   chomp (@out = <$file>);
315   close($file);
316   
317   shift @out if $out[0] =~ /^=== /;
318   shift @out if $out[0] =~ /^=== /;
319   return @out;
320 }
321
322 # get a new transaction number from the file specified
323 sub next_transno
324 {
325   my $name = shift;
326   $name =~ s/\W//og;      # remove non-word characters
327   my $fn = "$msgdir/$name";
328   my $msgno;
329   
330   my $fh = new FileHandle;
331   if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
332     $fh->autoflush(1);
333         $msgno = $fh->getline;
334         chomp $msgno;
335         $msgno++;
336         seek $fh, 0, 0;
337         $fh->print("$msgno\n");
338         dbg('msg', "msgno $msgno allocated for $name\n");
339         $fh->close;
340   } else {
341     confess "can't open $fn $!";
342   }
343   return $msgno;
344 }
345
346 # initialise the message 'system', read in all the message headers
347 sub init
348 {
349   my $dir = new FileHandle;
350   my @dir;
351   my $ref;
352
353   # read in the directory
354   opendir($dir, $msgdir) or confess "can't open $msgdir $!";
355   @dir = readdir($dir);
356   closedir($dir);
357   
358   for (sort @dir) {
359     next if /^\./o;
360         next if ! /^m\d+/o;
361
362     $ref = read_msg_header("$msgdir/$_");
363         next if !$ref;
364         
365         # add the clusters that have this
366         push @msg, $ref; 
367         
368   }
369 }
370
371 # return all the current messages
372 sub get_all
373 {
374   return @msg;
375 }
376
377 # get a particular message
378 sub get
379 {
380   my $msgno = shift;
381   for (@msg) {
382     return $_ if $_->{msgno} == $msgno;
383     last if $_->{msgno} > $msgno;
384   }
385   return undef;
386 }
387
388 # return the official filename for a message no
389 sub filename
390 {
391   return sprintf "$msgdir/m%06d", shift;
392 }
393
394 #
395 # return a list of valid elements 
396
397
398 sub fields
399 {
400   return keys(%valid);
401 }
402
403 #
404 # return a prompt for a field
405 #
406
407 sub field_prompt
408
409   my ($self, $ele) = @_;
410   return $valid{$ele};
411 }
412
413 no strict;
414 sub AUTOLOAD
415 {
416   my $self = shift;
417   my $name = $AUTOLOAD;
418   return if $name =~ /::DESTROY$/;
419   $name =~ s/.*:://o;
420   
421   confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
422   @_ ? $self->{$name} = shift : $self->{$name} ;
423 }
424
425
426 1;
427
428 __END__